diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 2289f743..69a837b0 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -19,7 +19,7 @@ Kraken backend. """ from contextlib import asynccontextmanager from dataclasses import dataclass, asdict, field -from typing import List, Dict, Any, Tuple, Optional +from typing import List, Dict, Any, Tuple, Optional, AsyncIterator import json import time @@ -291,38 +291,199 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: # @tractor.msg.pub -async def stream_quotes( - # get_topics: Callable, - shm_token: Tuple[str, str, List[tuple]], - symbols: List[str] = ['XBTUSD', 'XMRUSD'], - # These are the symbols not expected by the ws api - # they are looked up inside this routine. - sub_type: str = 'ohlc', - loglevel: str = None, - # compat with eventual ``tractor.msg.pub`` - topics: Optional[List[str]] = None, -) -> None: - """Subscribe for ohlc stream of quotes for ``pairs``. +#async def stream_quotes( +# # get_topics: Callable, +# shm_token: Tuple[str, str, List[tuple]], +# symbols: List[str] = ['XBTUSD', 'XMRUSD'], +# # These are the symbols not expected by the ws api +# # they are looked up inside this routine. +# sub_type: str = 'ohlc', +# loglevel: str = None, +# # compat with eventual ``tractor.msg.pub`` +# topics: Optional[List[str]] = None, +#) -> None: +# """Subscribe for ohlc stream of quotes for ``pairs``. +# +# ``pairs`` must be formatted /. +# """ +# # XXX: required to propagate ``tractor`` loglevel to piker logging +# get_console_log(loglevel or tractor.current_actor().loglevel) +# +# ws_pairs = {} +# async with get_client() as client: +# +# # keep client cached for real-time section +# for sym in symbols: +# ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] +# +# # maybe load historical ohlcv in to shared mem +# # check if shm has already been created by previous +# # feed initialization +# writer_exists = get_shm_token(shm_token['shm_name']) +# +# symbol = symbols[0] +# +# if not writer_exists: +# shm = attach_shm_array( +# token=shm_token, +# # we are writer +# readonly=False, +# ) +# bars = await client.bars(symbol=symbol) +# +# shm.push(bars) +# shm_token = shm.token +# +# times = shm.array['time'] +# delay_s = times[-1] - times[times != times[-1]][-1] +# subscribe_ohlc_for_increment(shm, delay_s) +# +# yield shm_token, not writer_exists +# +# while True: +# try: +# async with trio_websocket.open_websocket_url( +# 'wss://ws.kraken.com/', +# ) as ws: +# +# # XXX: setup subs +# # https://docs.kraken.com/websockets/#message-subscribe +# # specific logic for this in kraken's shitty sync client: +# # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 +# ohlc_sub = make_sub( +# list(ws_pairs.values()), +# {'name': 'ohlc', 'interval': 1} +# ) +# +# # TODO: we want to eventually allow unsubs which should +# # be completely fine to request from a separate task +# # since internally the ws methods appear to be FIFO +# # locked. +# await ws.send_message(json.dumps(ohlc_sub)) +# +# # trade data (aka L1) +# l1_sub = make_sub( +# list(ws_pairs.values()), +# {'name': 'spread'} # 'depth': 10} +# +# ) +# await ws.send_message(json.dumps(l1_sub)) +# +# async def recv(): +# return json.loads(await ws.get_message()) +# +# # pull a first quote and deliver +# msg_gen = recv_msg(recv) +# typ, ohlc_last = await msg_gen.__anext__() +# +# topic, quote = normalize(ohlc_last) +# +# # packetize as {topic: quote} +# yield {topic: quote} +# +# # keep start of last interval for volume tracking +# last_interval_start = ohlc_last.etime +# +# # start streaming +# async for typ, ohlc in msg_gen: +# +# if typ == 'ohlc': +# +# # TODO: can get rid of all this by using +# # ``trades`` subscription... +# +# # generate tick values to match time & sales pane: +# # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m +# volume = ohlc.volume +# +# # new interval +# if ohlc.etime > last_interval_start: +# last_interval_start = ohlc.etime +# tick_volume = volume +# else: +# # this is the tick volume *within the interval* +# tick_volume = volume - ohlc_last.volume +# +# last = ohlc.close +# if tick_volume: +# ohlc.ticks.append({ +# 'type': 'trade', +# 'price': last, +# 'size': tick_volume, +# }) +# +# topic, quote = normalize(ohlc) +# +# # if we are the lone tick writer start writing +# # the buffer with appropriate trade data +# if not writer_exists: +# # update last entry +# # benchmarked in the 4-5 us range +# o, high, low, v = shm.array[-1][ +# ['open', 'high', 'low', 'volume'] +# ] +# new_v = tick_volume +# +# if v == 0 and new_v: +# # no trades for this bar yet so the open +# # is also the close/last trade price +# o = last +# +# # write shm +# shm.array[ +# ['open', +# 'high', +# 'low', +# 'close', +# 'bar_wap', # in this case vwap of bar +# 'volume'] +# ][-1] = ( +# o, +# max(high, last), +# min(low, last), +# last, +# ohlc.vwap, +# volume, +# ) +# ohlc_last = ohlc +# +# elif typ == 'l1': +# quote = ohlc +# topic = quote['symbol'] +# +# # XXX: format required by ``tractor.msg.pub`` +# # requires a ``Dict[topic: str, quote: dict]`` +# yield {topic: quote} +# +# except (ConnectionClosed, DisconnectionTimeout): +# log.exception("Good job kraken...reconnecting") - ``pairs`` must be formatted /. - """ + +@tractor.stream +async def stream_quotes( + ctx: tractor.Context, + symbols: List[str], + shm_token: Tuple[str, str, List[tuple]], + loglevel: str = None, + # compat for @tractor.msg.pub + topics: Any = None +) -> AsyncIterator[Dict[str, Any]]: + # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - + ws_pairs = {} async with get_client() as client: - + # keep client cached for real-time section for sym in symbols: ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] - # maybe load historical ohlcv in to shared mem - # check if shm has already been created by previous - # feed initialization writer_exists = get_shm_token(shm_token['shm_name']) - symbol = symbols[0] + await tractor.breakpoint() + if not writer_exists: shm = attach_shm_array( token=shm_token, @@ -337,15 +498,16 @@ async def stream_quotes( times = shm.array['time'] delay_s = times[-1] - times[times != times[-1]][-1] subscribe_ohlc_for_increment(shm, delay_s) - - yield shm_token, not writer_exists + + # pass back token, and bool, signalling if we're the writer + await ctx.send_yield((shm_token, not writer_exists)) while True: try: async with trio_websocket.open_websocket_url( - 'wss://ws.kraken.com/', + 'wss://ws.kraken.com', ) as ws: - + # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe # specific logic for this in kraken's shitty sync client: @@ -379,7 +541,7 @@ async def stream_quotes( topic, quote = normalize(ohlc_last) # packetize as {topic: quote} - yield {topic: quote} + await ctx.send_yield({topic: quote}) # keep start of last interval for volume tracking last_interval_start = ohlc_last.etime @@ -453,7 +615,8 @@ async def stream_quotes( # XXX: format required by ``tractor.msg.pub`` # requires a ``Dict[topic: str, quote: dict]`` - yield {topic: quote} + await ctx.send_yield({topic: quote}) except (ConnectionClosed, DisconnectionTimeout): log.exception("Good job kraken...reconnecting") +