From ffe47acf1dbd3adeef5fa7635e49d3b9debf5032 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Jul 2020 08:20:03 -0400 Subject: [PATCH] Add historical bars retreival --- piker/brokers/kraken.py | 129 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 123 insertions(+), 6 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 2afc2d7a..4b553dd8 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -1,29 +1,137 @@ """ Kraken backend. """ +from contextlib import asynccontextmanager from dataclasses import dataclass, asdict -from typing import List +from itertools import starmap +from typing import List, Dict, Any import json -import tractor from trio_websocket import open_websocket_url +import arrow +import asks +import numpy as np +import tractor + +from ._util import resproc, SymbolNotFound, BrokerError +from ..log import get_logger + +log = get_logger(__name__) + + +# // +_url = 'https://api.kraken.com/0' + + +# conversion to numpy worthy types +ohlc_dtype = [ + ('index', int), + ('time', int), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('vwap', float), + ('volume', float), + ('count', int) +] + + +class Client: + + def __init__(self) -> None: + self._sesh = asks.Session(connections=4) + self._sesh.base_location = _url + self._sesh.headers.update({ + 'User-Agent': + 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' + }) + + async def _public( + self, + method: str, + data: dict, + ) -> Dict[str, Any]: + resp = await self._sesh.post( + path=f'/public/{method}', + json=data, + timeout=float('inf') + ) + return resproc(resp, log) + + async def symbol_info( + self, + pair: str = 'all', + ): + resp = await self._public('AssetPairs', {'pair': pair}) + assert not resp['error'] + true_pair_key, data = next(iter(resp['result'].items())) + return data + + async def bars( + self, + symbol: str = 'XBTUSD', + # UTC 2017-07-02 12:53:20 + since: int = None, + count: int = 720, # <- max allowed per query + as_np: bool = True, + ) -> dict: + if since is None: + since = arrow.utcnow().floor('minute').shift( + minutes=-count).timestamp + # UTC 2017-07-02 12:53:20 is oldest seconds value + since = str(max(1499000000, since)) + json = await self._public( + 'OHLC', + data={ + 'pair': symbol, + 'since': since, + }, + ) + try: + res = json['result'] + res.pop('last') + bars = next(iter(res.values())) + + # convert all fields to native types + bars = list(starmap( + lambda i, bar: + (i,) + tuple( + ftype(bar[i]) for i, (name, ftype) + in enumerate(ohlc_dtype[1:]) + ), + enumerate(bars)) + ) + return np.array(bars, dtype=ohlc_dtype) if as_np else bars + except KeyError: + raise SymbolNotFound(json['error'][0] + f': {symbol}') + + +@asynccontextmanager +async def get_client() -> Client: + yield Client() async def stream_quotes( - pairs: List[str] = ['BTC/USD', 'XRP/USD'], + symbols: List[str] = ['BTC/USD', 'XRP/USD'], sub_type: str = 'ohlc', ) -> None: """Subscribe for ohlc stream of quotes for ``pairs``. ``pairs`` must be formatted /. """ + ws_pairs = {} + async with get_client() as client: + for sym in symbols: + ws_pairs[sym] = (await client.symbol_info(sym))['wsname'] + async with open_websocket_url( 'wss://ws.kraken.com', ) as ws: # setup subs # see: https://docs.kraken.com/websockets/#message-subscribe subs = { - 'pair': pairs, + 'pair': list(ws_pairs.values()), 'event': 'subscribe', 'subscription': { 'name': sub_type, @@ -50,18 +158,27 @@ async def stream_quotes( low: float # Low price within interval close: float # Close price of interval vwap: float # Volume weighted average price within interval - volume: int # Accumulated volume within interval + volume: float # Accumulated volume within interval count: int # Number of trades within interval + # XXX: ugh, super hideous.. why doesn't + def __post_init__(self): + for field, val in self.__dataclass_fields__.items(): + setattr(self, field, val.type(getattr(self, field))) + while True: msg = await recv() if isinstance(msg, dict): if msg.get('event') == 'heartbeat': continue + err = msg.get('errorMessage') + if err: + raise BrokerError(err) else: chan_id, ohlc_array, chan_name, pair = msg ohlc = OHLC(chan_id, chan_name, pair, *ohlc_array) - yield ohlc + print(ohlc) + yield asdict(ohlc) if __name__ == '__main__':