From bf9a0136df21070e815c8d53f184c20370fc5623 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Aug 2020 22:12:26 -0400 Subject: [PATCH] Make ws loop restart on connection failures --- piker/brokers/kraken.py | 165 ++++++++++++++++++++-------------------- 1 file changed, 84 insertions(+), 81 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index a0b3767c..448097e8 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -7,7 +7,7 @@ from itertools import starmap from typing import List, Dict, Any import json -from trio_websocket import open_websocket_url +import trio_websocket import arrow import asks import numpy as np @@ -114,6 +114,31 @@ async def get_client() -> Client: yield Client() +@dataclass +class OHLC: + chan_id: int # internal kraken id + chan_name: str # eg. ohlc-1 (name-interval) + pair: str # fx pair + time: float # Begin time of interval, in seconds since epoch + etime: float # End time of interval, in seconds since epoch + open: float # Open price of interval + high: float # High price within interval + low: float # Low price within interval + close: float # Close price of interval + vwap: float # Volume weighted average price within interval + volume: float # Accumulated volume **within interval** + count: int # Number of trades within interval + # (sampled) generated tick data + ticks: List[Any] = field(default_factory=list) + + # XXX: ugh, super hideous.. needs built-in converters. + def __post_init__(self): + for f, val in self.__dataclass_fields__.items(): + if f == 'ticks': + continue + setattr(self, f, val.type(getattr(self, f))) + + async def stream_quotes( # These are the symbols not expected by the ws api # they are looked up inside this routine. @@ -133,93 +158,71 @@ async def stream_quotes( for sym in symbols: ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] - async with open_websocket_url( - 'wss://ws.kraken.com', - ) as ws: - # setup subs - # see: https://docs.kraken.com/websockets/#message-subscribe - subs = { - 'pair': list(ws_pairs.values()), - 'event': 'subscribe', - 'subscription': { - 'name': sub_type, - 'interval': 1, # 1 min - # 'name': 'ticker', - # 'name': 'openOrders', - # 'depth': '25', - }, - } - await ws.send_message(json.dumps(subs)) + while True: + try: + async with trio_websocket.open_websocket_url( + 'wss://ws.kraken.com', + ) as ws: + # setup subs + # see: https://docs.kraken.com/websockets/#message-subscribe + subs = { + 'pair': list(ws_pairs.values()), + 'event': 'subscribe', + 'subscription': { + 'name': sub_type, + 'interval': 1, # 1 min + # 'name': 'ticker', + # 'name': 'openOrders', + # 'depth': '25', + }, + } + await ws.send_message(json.dumps(subs)) - async def recv(): - return json.loads(await ws.get_message()) + async def recv(): + return json.loads(await ws.get_message()) - async def recv_ohlc(): - while True: - msg = await recv() - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - continue - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - chan_id, ohlc_array, chan_name, pair = msg - yield OHLC(chan_id, chan_name, pair, *ohlc_array) + async def recv_ohlc(): + while True: + msg = await recv() + if isinstance(msg, dict): + if msg.get('event') == 'heartbeat': + continue + err = msg.get('errorMessage') + if err: + raise BrokerError(err) + else: + chan_id, ohlc_array, chan_name, pair = msg + yield OHLC(chan_id, chan_name, pair, *ohlc_array) - @dataclass - class OHLC: - chan_id: int # internal kraken id - chan_name: str # eg. ohlc-1 (name-interval) - pair: str # fx pair - time: float # Begin time of interval, in seconds since epoch - etime: float # End time of interval, in seconds since epoch - open: float # Open price of interval - high: float # High price within interval - low: float # Low price within interval - close: float # Close price of interval - vwap: float # Volume weighted average price within interval - volume: float # Accumulated volume **within interval** - count: int # Number of trades within interval - # (sampled) generated tick data - ticks: List[Any] = field(default_factory=list) + ohlc_gen = recv_ohlc() + ohlc_last = await ohlc_gen.__anext__() + yield asdict(ohlc_last) - # XXX: ugh, super hideous.. needs built-in converters. - def __post_init__(self): - for f, val in self.__dataclass_fields__.items(): - if f == 'ticks': - continue - setattr(self, f, val.type(getattr(self, f))) + # keep start of last interval for volume tracking + last_interval_start = ohlc_last.etime - ohlc_gen = recv_ohlc() - ohlc_last = await ohlc_gen.__anext__() - yield asdict(ohlc_last) + async for ohlc in ohlc_gen: - # keep start of last interval for volume tracking - last_interval_start = ohlc_last.etime + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + volume = ohlc.volume + if ohlc.etime > last_interval_start: # new interval + last_interval_start = ohlc.etime + tick_volume = volume + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume - async for ohlc in ohlc_gen: - - # generate tick values to match time & sales pane: - # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - volume = ohlc.volume - if ohlc.etime > last_interval_start: # new interval - log.debug( - f"New interval last: {ohlc_last.time}, now: {ohlc.time}") - last_interval_start = ohlc.etime - tick_volume = volume - else: - # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume - - if tick_volume: - ohlc.ticks.append({ - 'type': 'trade', - 'price': ohlc.close, - 'size': tick_volume, - }) - yield asdict(ohlc) - ohlc_last = ohlc + if tick_volume: + ohlc.ticks.append({ + 'type': 'trade', + 'price': ohlc.close, + 'size': tick_volume, + }) + yield asdict(ohlc) + ohlc_last = ohlc + except trio_websocket._impl.ConnectionClosed: + log.error("Good job kraken...") if __name__ == '__main__':