From e56d065dbcb48291bbccf1173e3666c5c810e6e7 Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sat, 2 Jan 2021 12:34:17 -0300 Subject: [PATCH] Add fill_bars function to kraken --- piker/brokers/kraken.py | 476 +++++++++++++++------------------------- 1 file changed, 180 insertions(+), 296 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 69a837b0..cc04cbf1 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -17,6 +17,8 @@ """ Kraken backend. """ + +import sys from contextlib import asynccontextmanager from dataclasses import dataclass, asdict, field from typing import List, Dict, Any, Tuple, Optional, AsyncIterator @@ -290,173 +292,53 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: } -# @tractor.msg.pub -#async def stream_quotes( -# # get_topics: Callable, -# shm_token: Tuple[str, str, List[tuple]], -# symbols: List[str] = ['XBTUSD', 'XMRUSD'], -# # These are the symbols not expected by the ws api -# # they are looked up inside this routine. -# sub_type: str = 'ohlc', -# loglevel: str = None, -# # compat with eventual ``tractor.msg.pub`` -# topics: Optional[List[str]] = None, -#) -> None: -# """Subscribe for ohlc stream of quotes for ``pairs``. -# -# ``pairs`` must be formatted /. -# """ -# # XXX: required to propagate ``tractor`` loglevel to piker logging -# get_console_log(loglevel or tractor.current_actor().loglevel) -# -# ws_pairs = {} -# async with get_client() as client: -# -# # keep client cached for real-time section -# for sym in symbols: -# ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] -# -# # maybe load historical ohlcv in to shared mem -# # check if shm has already been created by previous -# # feed initialization -# writer_exists = get_shm_token(shm_token['shm_name']) -# -# symbol = symbols[0] -# -# if not writer_exists: -# shm = attach_shm_array( -# token=shm_token, -# # we are writer -# readonly=False, -# ) -# bars = await client.bars(symbol=symbol) -# -# shm.push(bars) -# shm_token = shm.token -# -# times = shm.array['time'] -# delay_s = times[-1] - times[times != times[-1]][-1] -# subscribe_ohlc_for_increment(shm, delay_s) -# -# yield shm_token, not writer_exists -# -# while True: -# try: -# async with trio_websocket.open_websocket_url( -# 'wss://ws.kraken.com/', -# ) as ws: -# -# # XXX: setup subs -# # https://docs.kraken.com/websockets/#message-subscribe -# # specific logic for this in kraken's shitty sync client: -# # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 -# ohlc_sub = make_sub( -# list(ws_pairs.values()), -# {'name': 'ohlc', 'interval': 1} -# ) -# -# # TODO: we want to eventually allow unsubs which should -# # be completely fine to request from a separate task -# # since internally the ws methods appear to be FIFO -# # locked. -# await ws.send_message(json.dumps(ohlc_sub)) -# -# # trade data (aka L1) -# l1_sub = make_sub( -# list(ws_pairs.values()), -# {'name': 'spread'} # 'depth': 10} -# -# ) -# await ws.send_message(json.dumps(l1_sub)) -# -# async def recv(): -# return json.loads(await ws.get_message()) -# -# # pull a first quote and deliver -# msg_gen = recv_msg(recv) -# typ, ohlc_last = await msg_gen.__anext__() -# -# topic, quote = normalize(ohlc_last) -# -# # packetize as {topic: quote} -# yield {topic: quote} -# -# # keep start of last interval for volume tracking -# last_interval_start = ohlc_last.etime -# -# # start streaming -# async for typ, ohlc in msg_gen: -# -# if typ == 'ohlc': -# -# # TODO: can get rid of all this by using -# # ``trades`` subscription... -# -# # generate tick values to match time & sales pane: -# # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m -# volume = ohlc.volume -# -# # new interval -# if ohlc.etime > last_interval_start: -# last_interval_start = ohlc.etime -# tick_volume = volume -# else: -# # this is the tick volume *within the interval* -# tick_volume = volume - ohlc_last.volume -# -# last = ohlc.close -# if tick_volume: -# ohlc.ticks.append({ -# 'type': 'trade', -# 'price': last, -# 'size': tick_volume, -# }) -# -# topic, quote = normalize(ohlc) -# -# # if we are the lone tick writer start writing -# # the buffer with appropriate trade data -# if not writer_exists: -# # update last entry -# # benchmarked in the 4-5 us range -# o, high, low, v = shm.array[-1][ -# ['open', 'high', 'low', 'volume'] -# ] -# new_v = tick_volume -# -# if v == 0 and new_v: -# # no trades for this bar yet so the open -# # is also the close/last trade price -# o = last -# -# # write shm -# shm.array[ -# ['open', -# 'high', -# 'low', -# 'close', -# 'bar_wap', # in this case vwap of bar -# 'volume'] -# ][-1] = ( -# o, -# max(high, last), -# min(low, last), -# last, -# ohlc.vwap, -# volume, -# ) -# ohlc_last = ohlc -# -# elif typ == 'l1': -# quote = ohlc -# topic = quote['symbol'] -# -# # XXX: format required by ``tractor.msg.pub`` -# # requires a ``Dict[topic: str, quote: dict]`` -# yield {topic: quote} -# -# except (ConnectionClosed, DisconnectionTimeout): -# log.exception("Good job kraken...reconnecting") +async def fill_bars( + first_bars, + shm, + symbol: str, + count: int = 75 +) -> None: + + async with get_client() as client: + + next_dt = first_bars[0][1] + i = 0 + while i < count: + + try: + bars_array = await client.bars( + symbol=symbol, + since=arrow.get(next_dt).floor('minute') + .shift(minutes=-720).timestamp + ) + shm.push(bars_array, prepend=True) + i += 1 + next_dt = bars_array[0][1] + + await trio.sleep(5) + + except BaseException as e: + log.exception(e) + await tractor.breakpoint() + + +_local_buffer_writers = {} + + +@asynccontextmanager +async def activate_writer(key: str) -> (bool, trio.Nursery): + try: + writer_already_exists = _local_buffer_writers.get(key, False) + + if not writer_already_exists: + _local_buffer_writers[key] = True + + async with trio.open_nursery() as n: + yield writer_already_exists, n + else: + yield writer_already_exists, None + finally: + _local_buffer_writers.pop(key, None) @tractor.stream @@ -473,150 +355,152 @@ async def stream_quotes( get_console_log(loglevel or tractor.current_actor().loglevel) ws_pairs = {} - async with get_client() as client: + async with activate_writer( + shm_token['shm_name'] + ) as (writer_already_exists, ln): + async with get_client() as client: - # keep client cached for real-time section - for sym in symbols: - ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] + # keep client cached for real-time section + for sym in symbols: + ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] - writer_exists = get_shm_token(shm_token['shm_name']) - symbol = symbols[0] + symbol = symbols[0] - await tractor.breakpoint() + if not writer_already_exists: + shm = attach_shm_array( + token=shm_token, + # we are writer + readonly=False, + ) + bars = await client.bars(symbol=symbol) - if not writer_exists: - shm = attach_shm_array( - token=shm_token, - # we are writer - readonly=False, - ) - bars = await client.bars(symbol=symbol) + shm.push(bars) + shm_token = shm.token - shm.push(bars) - shm_token = shm.token + ln.start_soon(fill_bars, bars, shm, symbol) - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - subscribe_ohlc_for_increment(shm, delay_s) + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + subscribe_ohlc_for_increment(shm, delay_s) - # pass back token, and bool, signalling if we're the writer - await ctx.send_yield((shm_token, not writer_exists)) + # pass back token, and bool, signalling if we're the writer + await ctx.send_yield((shm_token, not writer_already_exists)) - while True: - try: - async with trio_websocket.open_websocket_url( - 'wss://ws.kraken.com', - ) as ws: - - # XXX: setup subs - # https://docs.kraken.com/websockets/#message-subscribe - # specific logic for this in kraken's shitty sync client: - # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 - ohlc_sub = make_sub( - list(ws_pairs.values()), - {'name': 'ohlc', 'interval': 1} - ) + while True: + try: + async with trio_websocket.open_websocket_url( + 'wss://ws.kraken.com', + ) as ws: + + # XXX: setup subs + # https://docs.kraken.com/websockets/#message-subscribe + # specific logic for this in kraken's shitty sync client: + # https://github.com/krakenfx/kraken-wsclient-py/blob/master/kraken_wsclient_py/kraken_wsclient_py.py#L188 + ohlc_sub = make_sub( + list(ws_pairs.values()), + {'name': 'ohlc', 'interval': 1} + ) - # TODO: we want to eventually allow unsubs which should - # be completely fine to request from a separate task - # since internally the ws methods appear to be FIFO - # locked. - await ws.send_message(json.dumps(ohlc_sub)) + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. + await ws.send_message(json.dumps(ohlc_sub)) - # trade data (aka L1) - l1_sub = make_sub( - list(ws_pairs.values()), - {'name': 'spread'} # 'depth': 10} + # trade data (aka L1) + l1_sub = make_sub( + list(ws_pairs.values()), + {'name': 'spread'} # 'depth': 10} - ) - await ws.send_message(json.dumps(l1_sub)) + ) + await ws.send_message(json.dumps(l1_sub)) - async def recv(): - return json.loads(await ws.get_message()) + async def recv(): + return json.loads(await ws.get_message()) - # pull a first quote and deliver - msg_gen = recv_msg(recv) - typ, ohlc_last = await msg_gen.__anext__() + # pull a first quote and deliver + msg_gen = recv_msg(recv) + typ, ohlc_last = await msg_gen.__anext__() - topic, quote = normalize(ohlc_last) + topic, quote = normalize(ohlc_last) - # packetize as {topic: quote} - await ctx.send_yield({topic: quote}) - - # keep start of last interval for volume tracking - last_interval_start = ohlc_last.etime - - # start streaming - async for typ, ohlc in msg_gen: - - if typ == 'ohlc': - - # TODO: can get rid of all this by using - # ``trades`` subscription... - - # generate tick values to match time & sales pane: - # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - volume = ohlc.volume - - # new interval - if ohlc.etime > last_interval_start: - last_interval_start = ohlc.etime - tick_volume = volume - else: - # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume - - last = ohlc.close - if tick_volume: - ohlc.ticks.append({ - 'type': 'trade', - 'price': last, - 'size': tick_volume, - }) - - topic, quote = normalize(ohlc) - - # if we are the lone tick writer start writing - # the buffer with appropriate trade data - if not writer_exists: - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - new_v = tick_volume - - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - # write shm - shm.array[ - ['open', - 'high', - 'low', - 'close', - 'bar_wap', # in this case vwap of bar - 'volume'] - ][-1] = ( - o, - max(high, last), - min(low, last), - last, - ohlc.vwap, - volume, - ) - ohlc_last = ohlc - - elif typ == 'l1': - quote = ohlc - topic = quote['symbol'] - - # XXX: format required by ``tractor.msg.pub`` - # requires a ``Dict[topic: str, quote: dict]`` + # packetize as {topic: quote} await ctx.send_yield({topic: quote}) - except (ConnectionClosed, DisconnectionTimeout): - log.exception("Good job kraken...reconnecting") + # keep start of last interval for volume tracking + last_interval_start = ohlc_last.etime + + # start streaming + async for typ, ohlc in msg_gen: + + if typ == 'ohlc': + + # TODO: can get rid of all this by using + # ``trades`` subscription... + + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + volume = ohlc.volume + + # new interval + if ohlc.etime > last_interval_start: + last_interval_start = ohlc.etime + tick_volume = volume + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume + + last = ohlc.close + if tick_volume: + ohlc.ticks.append({ + 'type': 'trade', + 'price': last, + 'size': tick_volume, + }) + + topic, quote = normalize(ohlc) + + # if we are the lone tick writer start writing + # the buffer with appropriate trade data + if not writer_already_exists: + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] + new_v = tick_volume + + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last + + # write shm + shm.array[ + ['open', + 'high', + 'low', + 'close', + 'bar_wap', # in this case vwap of bar + 'volume'] + ][-1] = ( + o, + max(high, last), + min(low, last), + last, + ohlc.vwap, + volume, + ) + ohlc_last = ohlc + + elif typ == 'l1': + quote = ohlc + topic = quote['symbol'] + + # XXX: format required by ``tractor.msg.pub`` + # requires a ``Dict[topic: str, quote: dict]`` + await ctx.send_yield({topic: quote}) + + except (ConnectionClosed, DisconnectionTimeout): + log.exception("Good job kraken...reconnecting")