diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 61658669..0ec14b40 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -2,7 +2,7 @@ Kraken backend. """ from contextlib import asynccontextmanager -from dataclasses import dataclass, asdict +from dataclasses import dataclass, asdict, field from itertools import starmap from typing import List, Dict, Any import json @@ -150,6 +150,19 @@ async def stream_quotes( async def recv(): return json.loads(await ws.get_message()) + async def recv_ohlc(): + while True: + msg = await recv() + if isinstance(msg, dict): + if msg.get('event') == 'heartbeat': + continue + err = msg.get('errorMessage') + if err: + raise BrokerError(err) + else: + chan_id, ohlc_array, chan_name, pair = msg + yield OHLC(chan_id, chan_name, pair, *ohlc_array) + @dataclass class OHLC: chan_id: int # internal kraken id @@ -164,25 +177,33 @@ async def stream_quotes( vwap: float # Volume weighted average price within interval volume: float # Accumulated volume within interval count: int # Number of trades within interval + # (sampled) generated tick data + ticks: List[Any] = field(default_factory=list) # XXX: ugh, super hideous.. needs built-in converters. def __post_init__(self): - for field, val in self.__dataclass_fields__.items(): - setattr(self, field, val.type(getattr(self, field))) + for f, val in self.__dataclass_fields__.items(): + if f == 'ticks': + continue + setattr(self, f, val.type(getattr(self, f))) - while True: - msg = await recv() - if isinstance(msg, dict): - if msg.get('event') == 'heartbeat': - continue - err = msg.get('errorMessage') - if err: - raise BrokerError(err) - else: - chan_id, ohlc_array, chan_name, pair = msg - ohlc = OHLC(chan_id, chan_name, pair, *ohlc_array) - print(ohlc) - yield asdict(ohlc) + ohlc_gen = recv_ohlc() + ohlc_last = await ohlc_gen.__anext__() + yield asdict(ohlc_last) + + async for ohlc in ohlc_gen: + # debug + print(ohlc) + volume = ohlc.volume + vol_diff = volume - ohlc_last.volume + if vol_diff: + ohlc.ticks.append({ + 'type': 'trade', + 'price': ohlc.close, + 'size': vol_diff, + }) + yield asdict(ohlc) + ohlc_last = ohlc if __name__ == '__main__':