From 2738b548519ad66a1f34f06a37822787dc5c559f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 4 Jul 2020 18:59:02 -0400 Subject: [PATCH 01/15] Start kraken backend --- piker/brokers/kraken.py | 44 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 44 insertions(+) create mode 100644 piker/brokers/kraken.py diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py new file mode 100644 index 00000000..a55080d9 --- /dev/null +++ b/piker/brokers/kraken.py @@ -0,0 +1,44 @@ +""" +Kraken backend. +""" +from typing import List +import json + +import trio +import tractor +from trio_websocket import open_websocket_url + + +if __name__ == '__main__': + + async def stream_quotes( + pairs: List[str] = ['BTC/USD'], + ) -> None: + """Subscribe ohlc quotes for ``pairs``. + + ``pairs`` must be formatted like `crypto/fiat`. + """ + async with open_websocket_url( + 'wss://ws.kraken.com', + ) as ws: + # setup subs + subs = { + 'event': 'subscribe', + 'pair': pairs, + 'subscription': { + 'name': 'ohlc', + # 'name': 'ticker', + # 'name': 'openOrders', + # 'depth': '25', + }, + } + await ws.send_message(json.dumps(subs)) + + while True: + msg = json.loads(await ws.get_message()) + if isinstance(msg, dict) and msg.get('event') == 'heartbeat': + continue + + print(msg) + + trio.run(stream_quotes) From 7bccfc7b1036ef80cfdafbf14179cea7e2a25476 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Jul 2020 11:43:58 -0400 Subject: [PATCH 02/15] Convert to stream, parse into dataclass --- piker/brokers/kraken.py | 91 +++++++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 31 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index a55080d9..2afc2d7a 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -1,44 +1,73 @@ """ Kraken backend. """ +from dataclasses import dataclass, asdict from typing import List import json -import trio import tractor from trio_websocket import open_websocket_url +async def stream_quotes( + pairs: List[str] = ['BTC/USD', 'XRP/USD'], + sub_type: str = 'ohlc', +) -> None: + """Subscribe for ohlc stream of quotes for ``pairs``. + + ``pairs`` must be formatted /. + """ + async with open_websocket_url( + 'wss://ws.kraken.com', + ) as ws: + # setup subs + # see: https://docs.kraken.com/websockets/#message-subscribe + subs = { + 'pair': pairs, + 'event': 'subscribe', + 'subscription': { + 'name': sub_type, + 'interval': 1, # 1 min + # 'name': 'ticker', + # 'name': 'openOrders', + # 'depth': '25', + }, + } + await ws.send_message(json.dumps(subs)) + + async def recv(): + return json.loads(await ws.get_message()) + + @dataclass + class OHLC: + chan_id: int # internal kraken id + chan_name: str # eg. ohlc-1 (name-interval) + pair: str # fx pair + time: float # Begin time of interval, in seconds since epoch + etime: float # End time of interval, in seconds since epoch + open: float # Open price of interval + high: float # High price within interval + low: float # Low price within interval + close: float # Close price of interval + vwap: float # Volume weighted average price within interval + volume: int # Accumulated volume within interval + count: int # Number of trades within interval + + while True: + msg = await recv() + if isinstance(msg, dict): + if msg.get('event') == 'heartbeat': + continue + else: + chan_id, ohlc_array, chan_name, pair = msg + ohlc = OHLC(chan_id, chan_name, pair, *ohlc_array) + yield ohlc + + if __name__ == '__main__': - async def stream_quotes( - pairs: List[str] = ['BTC/USD'], - ) -> None: - """Subscribe ohlc quotes for ``pairs``. + async def stream_ohlc(): + async for msg in stream_quotes(): + print(asdict(msg)) - ``pairs`` must be formatted like `crypto/fiat`. - """ - async with open_websocket_url( - 'wss://ws.kraken.com', - ) as ws: - # setup subs - subs = { - 'event': 'subscribe', - 'pair': pairs, - 'subscription': { - 'name': 'ohlc', - # 'name': 'ticker', - # 'name': 'openOrders', - # 'depth': '25', - }, - } - await ws.send_message(json.dumps(subs)) - - while True: - msg = json.loads(await ws.get_message()) - if isinstance(msg, dict) and msg.get('event') == 'heartbeat': - continue - - print(msg) - - trio.run(stream_quotes) + tractor.run(stream_ohlc) From ffe47acf1dbd3adeef5fa7635e49d3b9debf5032 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Jul 2020 08:20:03 -0400 Subject: [PATCH 03/15] Add historical bars retreival --- piker/brokers/kraken.py | 129 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 123 insertions(+), 6 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 2afc2d7a..4b553dd8 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -1,29 +1,137 @@ """ Kraken backend. """ +from contextlib import asynccontextmanager from dataclasses import dataclass, asdict -from typing import List +from itertools import starmap +from typing import List, Dict, Any import json -import tractor from trio_websocket import open_websocket_url +import arrow +import asks +import numpy as np +import tractor + +from ._util import resproc, SymbolNotFound, BrokerError +from ..log import get_logger + +log = get_logger(__name__) + + +# // +_url = 'https://api.kraken.com/0' + + +# conversion to numpy worthy types +ohlc_dtype = [ + ('index', int), + ('time', int), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('vwap', float), + ('volume', float), + ('count', int) +] + + +class Client: + + def __init__(self) -> None: + self._sesh = asks.Session(connections=4) + self._sesh.base_location = _url + self._sesh.headers.update({ + 'User-Agent': + 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' + }) + + async def _public( + self, + method: str, + data: dict, + ) -> Dict[str, Any]: + resp = await self._sesh.post( + path=f'/public/{method}', + json=data, + timeout=float('inf') + ) + return resproc(resp, log) + + async def symbol_info( + self, + pair: str = 'all', + ): + resp = await self._public('AssetPairs', {'pair': pair}) + assert not resp['error'] + true_pair_key, data = next(iter(resp['result'].items())) + return data + + async def bars( + self, + symbol: str = 'XBTUSD', + # UTC 2017-07-02 12:53:20 + since: int = None, + count: int = 720, # <- max allowed per query + as_np: bool = True, + ) -> dict: + if since is None: + since = arrow.utcnow().floor('minute').shift( + minutes=-count).timestamp + # UTC 2017-07-02 12:53:20 is oldest seconds value + since = str(max(1499000000, since)) + json = await self._public( + 'OHLC', + data={ + 'pair': symbol, + 'since': since, + }, + ) + try: + res = json['result'] + res.pop('last') + bars = next(iter(res.values())) + + # convert all fields to native types + bars = list(starmap( + lambda i, bar: + (i,) + tuple( + ftype(bar[i]) for i, (name, ftype) + in enumerate(ohlc_dtype[1:]) + ), + enumerate(bars)) + ) + return np.array(bars, dtype=ohlc_dtype) if as_np else bars + except KeyError: + raise SymbolNotFound(json['error'][0] + f': {symbol}') + + +@asynccontextmanager +async def get_client() -> Client: + yield Client() async def stream_quotes( - pairs: List[str] = ['BTC/USD', 'XRP/USD'], + symbols: List[str] = ['BTC/USD', 'XRP/USD'], sub_type: str = 'ohlc', ) -> None: """Subscribe for ohlc stream of quotes for ``pairs``. ``pairs`` must be formatted /. """ + ws_pairs = {} + async with get_client() as client: + for sym in symbols: + ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] + async with open_websocket_url( 'wss://ws.kraken.com', ) as ws: # setup subs # see: https://docs.kraken.com/websockets/#message-subscribe subs = { - 'pair': pairs, + 'pair': list(ws_pairs.values()), 'event': 'subscribe', 'subscription': { 'name': sub_type, @@ -50,18 +158,27 @@ async def stream_quotes( low: float # Low price within interval close: float # Close price of interval vwap: float # Volume weighted average price within interval - volume: int # Accumulated volume within interval + volume: float # Accumulated volume within interval count: int # Number of trades within interval + # XXX: ugh, super hideous.. why doesn't + def __post_init__(self): + for field, val in self.__dataclass_fields__.items(): + setattr(self, field, val.type(getattr(self, field))) + while True: msg = await recv() if isinstance(msg, dict): if msg.get('event') == 'heartbeat': continue + err = msg.get('errorMessage') + if err: + raise BrokerError(err) else: chan_id, ohlc_array, chan_name, pair = msg ohlc = OHLC(chan_id, chan_name, pair, *ohlc_array) - yield ohlc + print(ohlc) + yield asdict(ohlc) if __name__ == '__main__': From 3655e449d65abfa54a750bf419ce5c7ec4dece1a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Jul 2020 13:26:48 -0400 Subject: [PATCH 04/15] Raise errors, fix module script entry --- piker/brokers/kraken.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 4b553dd8..cbfa3cbc 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -64,7 +64,9 @@ class Client: pair: str = 'all', ): resp = await self._public('AssetPairs', {'pair': pair}) - assert not resp['error'] + err = resp['error'] + if err: + raise BrokerError(err) true_pair_key, data = next(iter(resp['result'].items())) return data @@ -113,7 +115,9 @@ async def get_client() -> Client: async def stream_quotes( - symbols: List[str] = ['BTC/USD', 'XRP/USD'], + # These are the symbols not expected by the ws api + # they are looked up inside this routine. + symbols: List[str] = ['XBTUSD', 'XMRUSD'], sub_type: str = 'ohlc', ) -> None: """Subscribe for ohlc stream of quotes for ``pairs``. @@ -185,6 +189,6 @@ if __name__ == '__main__': async def stream_ohlc(): async for msg in stream_quotes(): - print(asdict(msg)) + print(msg) tractor.run(stream_ohlc) From 9976bc3a3b32e42f569d21f97d2cfb4ccc5f4837 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Jul 2020 14:45:18 -0400 Subject: [PATCH 05/15] Fix typo --- piker/brokers/kraken.py | 2 +- piker/brokers/questrade.py | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index cbfa3cbc..61658669 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -165,7 +165,7 @@ async def stream_quotes( volume: float # Accumulated volume within interval count: int # Number of trades within interval - # XXX: ugh, super hideous.. why doesn't + # XXX: ugh, super hideous.. needs built-in converters. def __post_init__(self): for field, val in self.__dataclass_fields__.items(): setattr(self, field, val.type(getattr(self, field))) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index de0d1e7a..58b31fcf 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -1164,10 +1164,10 @@ async def stream_quotes( packetizer=partial( packetizer, formatter=formatter, - symboal_data=sd, + symbol_data=sd, ), - # actual func args + # actual target "streaming func" args get_quotes=get_quotes, diff_cached=diff_cached, rate=rate, From ad921887036f89ab591e79917ae40d9918615ce1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 31 Jul 2020 00:11:17 -0400 Subject: [PATCH 06/15] Support new normalized ticks format with kraken Generate tick datums in a list under a `ticks` field in each quote kinda like how IB does it. --- piker/brokers/kraken.py | 53 ++++++++++++++++++++++++++++------------- 1 file changed, 37 insertions(+), 16 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 61658669..0ec14b40 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -2,7 +2,7 @@ Kraken backend. """ from contextlib import asynccontextmanager -from dataclasses import dataclass, asdict +from dataclasses import dataclass, asdict, field from itertools import starmap from typing import List, Dict, Any import json @@ -150,6 +150,19 @@ async def stream_quotes( async def recv(): return json.loads(await ws.get_message()) + async def recv_ohlc(): + while True: + msg = await recv() + if isinstance(msg, dict): + if msg.get('event') == 'heartbeat': + continue + err = msg.get('errorMessage') + if err: + raise BrokerError(err) + else: + chan_id, ohlc_array, chan_name, pair = msg + yield OHLC(chan_id, chan_name, pair, *ohlc_array) + @dataclass class OHLC: chan_id: int # internal kraken id @@ -164,25 +177,33 @@ async def stream_quotes( vwap: float # Volume weighted average price within interval volume: float # Accumulated volume within interval count: int # Number of trades within interval + # (sampled) generated tick data + ticks: List[Any] = field(default_factory=list) # XXX: ugh, super hideous.. needs built-in converters. def __post_init__(self): - for field, val in self.__dataclass_fields__.items(): - setattr(self, field, val.type(getattr(self, field))) + for f, val in self.__dataclass_fields__.items(): + if f == 'ticks': + continue + setattr(self, f, val.type(getattr(self, f))) - while True: - msg = await recv() - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - continue - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - chan_id, ohlc_array, chan_name, pair = msg - ohlc = OHLC(chan_id, chan_name, pair, *ohlc_array) - print(ohlc) - yield asdict(ohlc) + ohlc_gen = recv_ohlc() + ohlc_last = await ohlc_gen.__anext__() + yield asdict(ohlc_last) + + async for ohlc in ohlc_gen: + # debug + print(ohlc) + volume = ohlc.volume + vol_diff = volume - ohlc_last.volume + if vol_diff: + ohlc.ticks.append({ + 'type': 'trade', + 'price': ohlc.close, + 'size': vol_diff, + }) + yield asdict(ohlc) + ohlc_last = ohlc if __name__ == '__main__': From d976f3d074d684f8821cdd9ce3d61a33628faa6a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Aug 2020 16:52:51 -0400 Subject: [PATCH 07/15] Generate tick data correctly using .etime --- piker/brokers/kraken.py | 30 +++++++++++++++++++++++------- 1 file changed, 23 insertions(+), 7 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 0ec14b40..a0b3767c 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -14,7 +14,7 @@ import numpy as np import tractor from ._util import resproc, SymbolNotFound, BrokerError -from ..log import get_logger +from ..log import get_logger, get_console_log log = get_logger(__name__) @@ -119,11 +119,15 @@ async def stream_quotes( # they are looked up inside this routine. symbols: List[str] = ['XBTUSD', 'XMRUSD'], sub_type: str = 'ohlc', + loglevel: str = None, ) -> None: """Subscribe for ohlc stream of quotes for ``pairs``. ``pairs`` must be formatted /. """ + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + ws_pairs = {} async with get_client() as client: for sym in symbols: @@ -175,7 +179,7 @@ async def stream_quotes( low: float # Low price within interval close: float # Close price of interval vwap: float # Volume weighted average price within interval - volume: float # Accumulated volume within interval + volume: float # Accumulated volume **within interval** count: int # Number of trades within interval # (sampled) generated tick data ticks: List[Any] = field(default_factory=list) @@ -191,16 +195,28 @@ async def stream_quotes( ohlc_last = await ohlc_gen.__anext__() yield asdict(ohlc_last) + # keep start of last interval for volume tracking + last_interval_start = ohlc_last.etime + async for ohlc in ohlc_gen: - # debug - print(ohlc) + + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m volume = ohlc.volume - vol_diff = volume - ohlc_last.volume - if vol_diff: + if ohlc.etime > last_interval_start: # new interval + log.debug( + f"New interval last: {ohlc_last.time}, now: {ohlc.time}") + last_interval_start = ohlc.etime + tick_volume = volume + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume + + if tick_volume: ohlc.ticks.append({ 'type': 'trade', 'price': ohlc.close, - 'size': vol_diff, + 'size': tick_volume, }) yield asdict(ohlc) ohlc_last = ohlc From bf9a0136df21070e815c8d53f184c20370fc5623 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 1 Aug 2020 22:12:26 -0400 Subject: [PATCH 08/15] Make ws loop restart on connection failures --- piker/brokers/kraken.py | 165 ++++++++++++++++++++-------------------- 1 file changed, 84 insertions(+), 81 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index a0b3767c..448097e8 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -7,7 +7,7 @@ from itertools import starmap from typing import List, Dict, Any import json -from trio_websocket import open_websocket_url +import trio_websocket import arrow import asks import numpy as np @@ -114,6 +114,31 @@ async def get_client() -> Client: yield Client() +@dataclass +class OHLC: + chan_id: int # internal kraken id + chan_name: str # eg. ohlc-1 (name-interval) + pair: str # fx pair + time: float # Begin time of interval, in seconds since epoch + etime: float # End time of interval, in seconds since epoch + open: float # Open price of interval + high: float # High price within interval + low: float # Low price within interval + close: float # Close price of interval + vwap: float # Volume weighted average price within interval + volume: float # Accumulated volume **within interval** + count: int # Number of trades within interval + # (sampled) generated tick data + ticks: List[Any] = field(default_factory=list) + + # XXX: ugh, super hideous.. needs built-in converters. + def __post_init__(self): + for f, val in self.__dataclass_fields__.items(): + if f == 'ticks': + continue + setattr(self, f, val.type(getattr(self, f))) + + async def stream_quotes( # These are the symbols not expected by the ws api # they are looked up inside this routine. @@ -133,93 +158,71 @@ async def stream_quotes( for sym in symbols: ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] - async with open_websocket_url( - 'wss://ws.kraken.com', - ) as ws: - # setup subs - # see: https://docs.kraken.com/websockets/#message-subscribe - subs = { - 'pair': list(ws_pairs.values()), - 'event': 'subscribe', - 'subscription': { - 'name': sub_type, - 'interval': 1, # 1 min - # 'name': 'ticker', - # 'name': 'openOrders', - # 'depth': '25', - }, - } - await ws.send_message(json.dumps(subs)) + while True: + try: + async with trio_websocket.open_websocket_url( + 'wss://ws.kraken.com', + ) as ws: + # setup subs + # see: https://docs.kraken.com/websockets/#message-subscribe + subs = { + 'pair': list(ws_pairs.values()), + 'event': 'subscribe', + 'subscription': { + 'name': sub_type, + 'interval': 1, # 1 min + # 'name': 'ticker', + # 'name': 'openOrders', + # 'depth': '25', + }, + } + await ws.send_message(json.dumps(subs)) - async def recv(): - return json.loads(await ws.get_message()) + async def recv(): + return json.loads(await ws.get_message()) - async def recv_ohlc(): - while True: - msg = await recv() - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - continue - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - chan_id, ohlc_array, chan_name, pair = msg - yield OHLC(chan_id, chan_name, pair, *ohlc_array) + async def recv_ohlc(): + while True: + msg = await recv() + if isinstance(msg, dict): + if msg.get('event') == 'heartbeat': + continue + err = msg.get('errorMessage') + if err: + raise BrokerError(err) + else: + chan_id, ohlc_array, chan_name, pair = msg + yield OHLC(chan_id, chan_name, pair, *ohlc_array) - @dataclass - class OHLC: - chan_id: int # internal kraken id - chan_name: str # eg. ohlc-1 (name-interval) - pair: str # fx pair - time: float # Begin time of interval, in seconds since epoch - etime: float # End time of interval, in seconds since epoch - open: float # Open price of interval - high: float # High price within interval - low: float # Low price within interval - close: float # Close price of interval - vwap: float # Volume weighted average price within interval - volume: float # Accumulated volume **within interval** - count: int # Number of trades within interval - # (sampled) generated tick data - ticks: List[Any] = field(default_factory=list) + ohlc_gen = recv_ohlc() + ohlc_last = await ohlc_gen.__anext__() + yield asdict(ohlc_last) - # XXX: ugh, super hideous.. needs built-in converters. - def __post_init__(self): - for f, val in self.__dataclass_fields__.items(): - if f == 'ticks': - continue - setattr(self, f, val.type(getattr(self, f))) + # keep start of last interval for volume tracking + last_interval_start = ohlc_last.etime - ohlc_gen = recv_ohlc() - ohlc_last = await ohlc_gen.__anext__() - yield asdict(ohlc_last) + async for ohlc in ohlc_gen: - # keep start of last interval for volume tracking - last_interval_start = ohlc_last.etime + # generate tick values to match time & sales pane: + # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m + volume = ohlc.volume + if ohlc.etime > last_interval_start: # new interval + last_interval_start = ohlc.etime + tick_volume = volume + else: + # this is the tick volume *within the interval* + tick_volume = volume - ohlc_last.volume - async for ohlc in ohlc_gen: - - # generate tick values to match time & sales pane: - # https://trade.kraken.com/charts/KRAKEN:BTC-USD?period=1m - volume = ohlc.volume - if ohlc.etime > last_interval_start: # new interval - log.debug( - f"New interval last: {ohlc_last.time}, now: {ohlc.time}") - last_interval_start = ohlc.etime - tick_volume = volume - else: - # this is the tick volume *within the interval* - tick_volume = volume - ohlc_last.volume - - if tick_volume: - ohlc.ticks.append({ - 'type': 'trade', - 'price': ohlc.close, - 'size': tick_volume, - }) - yield asdict(ohlc) - ohlc_last = ohlc + if tick_volume: + ohlc.ticks.append({ + 'type': 'trade', + 'price': ohlc.close, + 'size': tick_volume, + }) + yield asdict(ohlc) + ohlc_last = ohlc + except trio_websocket._impl.ConnectionClosed: + log.error("Good job kraken...") if __name__ == '__main__': From e92abd376ab982539523089bd3df9c841271a27c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 2 Aug 2020 01:35:29 -0400 Subject: [PATCH 09/15] Trace log the heartbeat --- piker/brokers/kraken.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 448097e8..5fd9216b 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -181,11 +181,20 @@ async def stream_quotes( async def recv(): return json.loads(await ws.get_message()) + import time + async def recv_ohlc(): + last_hb = 0 while True: msg = await recv() if isinstance(msg, dict): if msg.get('event') == 'heartbeat': + log.trace( + f"Heartbeat after {time.time() - last_hb}") + last_hb = time.time() + # TODO: hmm i guess we should use this + # for determining when to do connection + # resets eh? continue err = msg.get('errorMessage') if err: From 03c5c7d2ba74da696c364c7f63e903bcef3e3ef3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 2 Aug 2020 12:17:03 -0400 Subject: [PATCH 10/15] Trigger connection reset on slowed heartbeat --- piker/brokers/kraken.py | 33 +++++++++++++++++++++++++-------- 1 file changed, 25 insertions(+), 8 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 5fd9216b..dd048d45 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -6,11 +6,13 @@ from dataclasses import dataclass, asdict, field from itertools import starmap from typing import List, Dict, Any import json +import time import trio_websocket import arrow import asks import numpy as np +import trio import tractor from ._util import resproc, SymbolNotFound, BrokerError @@ -116,6 +118,8 @@ async def get_client() -> Client: @dataclass class OHLC: + """Description of the flattened OHLC quote format. + """ chan_id: int # internal kraken id chan_name: str # eg. ohlc-1 (name-interval) pair: str # fx pair @@ -181,17 +185,28 @@ async def stream_quotes( async def recv(): return json.loads(await ws.get_message()) - import time - async def recv_ohlc(): - last_hb = 0 + too_slow_count = last_hb = 0 while True: - msg = await recv() + with trio.move_on_after(1.5) as cs: + msg = await recv() + + # trigger reconnection logic if too slow + if cs.cancelled_caught: + too_slow_count += 1 + if too_slow_count > 2: + log.warning( + "Heartbeat is to slow, " + "resetting ws connection") + raise trio_websocket._impl.ConnectionClosed( + "Reset Connection") + if isinstance(msg, dict): if msg.get('event') == 'heartbeat': - log.trace( - f"Heartbeat after {time.time() - last_hb}") - last_hb = time.time() + now = time.time() + delay = now - last_hb + last_hb = now + log.trace(f"Heartbeat after {delay}") # TODO: hmm i guess we should use this # for determining when to do connection # resets eh? @@ -203,6 +218,7 @@ async def stream_quotes( chan_id, ohlc_array, chan_name, pair = msg yield OHLC(chan_id, chan_name, pair, *ohlc_array) + # pull a first quote and deliver ohlc_gen = recv_ohlc() ohlc_last = await ohlc_gen.__anext__() yield asdict(ohlc_last) @@ -210,6 +226,7 @@ async def stream_quotes( # keep start of last interval for volume tracking last_interval_start = ohlc_last.etime + # start streaming async for ohlc in ohlc_gen: # generate tick values to match time & sales pane: @@ -231,7 +248,7 @@ async def stream_quotes( yield asdict(ohlc) ohlc_last = ohlc except trio_websocket._impl.ConnectionClosed: - log.error("Good job kraken...") + log.exception("Good job kraken...reconnecting") if __name__ == '__main__': From 44010abf4dbf19c64873747278d2b2ebb7c43b8f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Aug 2020 00:01:40 -0400 Subject: [PATCH 11/15] Handle (far end forced) disconnects --- piker/brokers/kraken.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index dd048d45..605c5177 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -9,6 +9,7 @@ import json import time import trio_websocket +from trio_websocket._impl import ConnectionClosed, DisconnectionTimeout import arrow import asks import numpy as np @@ -247,7 +248,7 @@ async def stream_quotes( }) yield asdict(ohlc) ohlc_last = ohlc - except trio_websocket._impl.ConnectionClosed: + except (ConnectionClosed, DisconnectionTimeout): log.exception("Good job kraken...reconnecting") From ea8205968ce9e457e60a59ab34b8b18cb1437f7f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 19 Aug 2020 07:42:49 -0400 Subject: [PATCH 12/15] Begin to use `@tractor.msg.pub` throughout streaming API Since the new FSP system will require time aligned data amongst actors, it makes sense to share broker data feeds as much as possible on a local system. There doesn't seem to be downside to this approach either since if not fanning-out in our code, the broker (server) has to do it anyway (and who knows how junk their implementation is) though with more clients, sockets etc. in memory on our end. It also preps the code for introducing a more "serious" pub-sub systems like zeromq/nanomessage. --- piker/brokers/kraken.py | 27 ++++++++++++++++++++++++--- 1 file changed, 24 insertions(+), 3 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 605c5177..ea3850fd 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -4,7 +4,7 @@ Kraken backend. from contextlib import asynccontextmanager from dataclasses import dataclass, asdict, field from itertools import starmap -from typing import List, Dict, Any +from typing import List, Dict, Any, Callable import json import time @@ -144,7 +144,9 @@ class OHLC: setattr(self, f, val.type(getattr(self, f))) +@tractor.msg.pub async def stream_quotes( + get_topics: Callable, # These are the symbols not expected by the ws api # they are looked up inside this routine. symbols: List[str] = ['XBTUSD', 'XMRUSD'], @@ -181,6 +183,10 @@ async def stream_quotes( # 'depth': '25', }, } + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. await ws.send_message(json.dumps(subs)) async def recv(): @@ -222,7 +228,14 @@ async def stream_quotes( # pull a first quote and deliver ohlc_gen = recv_ohlc() ohlc_last = await ohlc_gen.__anext__() - yield asdict(ohlc_last) + + # seriously eh? what's with this non-symmetry everywhere + # in subscription systems... + quote = asdict(ohlc_last) + topic = quote['pair'].replace('/', '') + + # packetize as {topic: quote} + yield {topic: quote} # keep start of last interval for volume tracking last_interval_start = ohlc_last.etime @@ -246,8 +259,16 @@ async def stream_quotes( 'price': ohlc.close, 'size': tick_volume, }) - yield asdict(ohlc) + + # XXX: format required by ``tractor.msg.pub`` + # requires a ``Dict[topic: str, quote: dict]`` + quote = asdict(ohlc) + print(quote) + topic = quote['pair'].replace('/', '') + yield {topic: quote} + ohlc_last = ohlc + except (ConnectionClosed, DisconnectionTimeout): log.exception("Good job kraken...reconnecting") From 0b42ac14201d57814b512ee55068e2af17446390 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 26 Aug 2020 21:44:03 -0400 Subject: [PATCH 13/15] Normalize kraken quotes for latency tracking --- piker/brokers/kraken.py | 98 ++++++++++++++++++++++++----------------- 1 file changed, 57 insertions(+), 41 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index ea3850fd..8029af04 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -120,6 +120,9 @@ async def get_client() -> Client: @dataclass class OHLC: """Description of the flattened OHLC quote format. + + For schema details see: + https://docs.kraken.com/websockets/#message-ohlc """ chan_id: int # internal kraken id chan_name: str # eg. ohlc-1 (name-interval) @@ -144,6 +147,56 @@ class OHLC: setattr(self, f, val.type(getattr(self, f))) +async def recv_ohlc(recv): + too_slow_count = last_hb = 0 + while True: + with trio.move_on_after(1.5) as cs: + msg = await recv() + + # trigger reconnection logic if too slow + if cs.cancelled_caught: + too_slow_count += 1 + if too_slow_count > 2: + log.warning( + "Heartbeat is to slow, " + "resetting ws connection") + raise trio_websocket._impl.ConnectionClosed( + "Reset Connection") + + if isinstance(msg, dict): + if msg.get('event') == 'heartbeat': + now = time.time() + delay = now - last_hb + last_hb = now + log.trace(f"Heartbeat after {delay}") + # TODO: hmm i guess we should use this + # for determining when to do connection + # resets eh? + continue + err = msg.get('errorMessage') + if err: + raise BrokerError(err) + else: + chan_id, ohlc_array, chan_name, pair = msg + yield OHLC(chan_id, chan_name, pair, *ohlc_array) + + +def normalize( + ohlc: OHLC, +) -> dict: + quote = asdict(ohlc) + quote['broker_ts'] = quote['time'] + quote['brokerd_ts'] = time.time() + quote['pair'] = quote['pair'].replace('/', '') + + # seriously eh? what's with this non-symmetry everywhere + # in subscription systems... + topic = quote['pair'].replace('/', '') + + print(quote) + return topic, quote + + @tractor.msg.pub async def stream_quotes( get_topics: Callable, @@ -192,47 +245,11 @@ async def stream_quotes( async def recv(): return json.loads(await ws.get_message()) - async def recv_ohlc(): - too_slow_count = last_hb = 0 - while True: - with trio.move_on_after(1.5) as cs: - msg = await recv() - - # trigger reconnection logic if too slow - if cs.cancelled_caught: - too_slow_count += 1 - if too_slow_count > 2: - log.warning( - "Heartbeat is to slow, " - "resetting ws connection") - raise trio_websocket._impl.ConnectionClosed( - "Reset Connection") - - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - now = time.time() - delay = now - last_hb - last_hb = now - log.trace(f"Heartbeat after {delay}") - # TODO: hmm i guess we should use this - # for determining when to do connection - # resets eh? - continue - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - chan_id, ohlc_array, chan_name, pair = msg - yield OHLC(chan_id, chan_name, pair, *ohlc_array) - # pull a first quote and deliver - ohlc_gen = recv_ohlc() + ohlc_gen = recv_ohlc(recv) ohlc_last = await ohlc_gen.__anext__() - # seriously eh? what's with this non-symmetry everywhere - # in subscription systems... - quote = asdict(ohlc_last) - topic = quote['pair'].replace('/', '') + topic, quote = normalize(ohlc_last) # packetize as {topic: quote} yield {topic: quote} @@ -260,11 +277,10 @@ async def stream_quotes( 'size': tick_volume, }) + topic, quote = normalize(ohlc) + # XXX: format required by ``tractor.msg.pub`` # requires a ``Dict[topic: str, quote: dict]`` - quote = asdict(ohlc) - print(quote) - topic = quote['pair'].replace('/', '') yield {topic: quote} ohlc_last = ohlc From b13da849d04d64d62df9976f023e8fb9401f13a6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Sep 2020 21:20:07 -0400 Subject: [PATCH 14/15] Include vwap in kraken historical bars --- piker/brokers/kraken.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 8029af04..f10cce72 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -27,7 +27,7 @@ _url = 'https://api.kraken.com/0' # conversion to numpy worthy types -ohlc_dtype = [ +_ohlc_dtype = [ ('index', int), ('time', int), ('open', float), @@ -39,6 +39,10 @@ ohlc_dtype = [ ('count', int) ] +# UI components allow this to be declared such that additional +# (historical) fields can be exposed. +ohlc_dtype = np.dtype(_ohlc_dtype) + class Client: @@ -103,11 +107,11 @@ class Client: lambda i, bar: (i,) + tuple( ftype(bar[i]) for i, (name, ftype) - in enumerate(ohlc_dtype[1:]) + in enumerate(_ohlc_dtype[1:]) ), enumerate(bars)) ) - return np.array(bars, dtype=ohlc_dtype) if as_np else bars + return np.array(bars, dtype=_ohlc_dtype) if as_np else bars except KeyError: raise SymbolNotFound(json['error'][0] + f': {symbol}') @@ -193,7 +197,7 @@ def normalize( # in subscription systems... topic = quote['pair'].replace('/', '') - print(quote) + # print(quote) return topic, quote From 14a5d047c3a72a3e9e57c3fef495678d2fadbe06 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Sep 2020 16:16:03 -0400 Subject: [PATCH 15/15] Copy forward stupid kraken zeroed vwaps --- piker/brokers/kraken.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index f10cce72..75333b9d 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -103,15 +103,22 @@ class Client: bars = next(iter(res.values())) # convert all fields to native types - bars = list(starmap( - lambda i, bar: + new_bars = [] + last_nz_vwap = None + for i, bar in enumerate(bars): + # normalize weird zero-ed vwap values..cmon kraken.. + vwap = float(bar[-3]) + if vwap != 0: + last_nz_vwap = vwap + if vwap == 0: + bar[-3] = last_nz_vwap + + new_bars.append( (i,) + tuple( - ftype(bar[i]) for i, (name, ftype) - in enumerate(_ohlc_dtype[1:]) - ), - enumerate(bars)) - ) - return np.array(bars, dtype=_ohlc_dtype) if as_np else bars + ftype(bar[j]) for j, (name, ftype) in enumerate(_ohlc_dtype[1:]) + ) + ) + return np.array(new_bars, dtype=_ohlc_dtype) if as_np else bars except KeyError: raise SymbolNotFound(json['error'][0] + f': {symbol}')