diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index ff4f57a9..5b654970 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -36,15 +36,20 @@ import tractor import trio from piker.accounting._mktinfo import ( + Asset, MktPair, ) -from piker._cacheables import open_cached_client +from piker._cacheables import ( + open_cached_client, + async_lifo_cache, +) from piker.brokers._util import ( BrokerError, DataThrottle, DataUnavailable, ) from piker.data.types import Struct +from piker.data.validate import FeedInit from piker.data._web_bs import open_autorecon_ws, NoBsWs from . import log from .api import ( @@ -278,6 +283,7 @@ async def open_history_client( yield get_ohlc, {'erlangs': 1, 'rate': 1} +@async_lifo_cache() async def get_mkt_info( fqme: str, @@ -293,9 +299,25 @@ async def get_mkt_info( async with open_cached_client('kraken') as client: # uppercase since kraken bs_mktid is always upper - sym_str = fqme.upper() - pair: Pair = await client.pair_info(sym_str) - mkt: MktPair = await client.mkt_info(sym_str) + bs_fqme, _, broker = fqme.partition('.') + pair_str: str = bs_fqme.upper() + bs_mktid: str = Client.normalize_symbol(pair_str) + pair: Pair = await client.pair_info(pair_str) + + assets = client.assets + dst_asset: Asset = assets[pair.base] + src_asset: Asset = assets[pair.quote] + + mkt = MktPair( + dst=dst_asset, + src=src_asset, + + price_tick=pair.price_tick, + size_tick=pair.size_tick, + bs_mktid=bs_mktid, + + broker='kraken', + ) return mkt, pair @@ -321,30 +343,24 @@ async def stream_quotes( ''' ws_pairs: list[str] = [] - mkt_infos: dict[str, MktPair] = {} + init_msgs: list[FeedInit] = [] async with ( send_chan as send_chan, ): for sym_str in symbols: mkt, pair = await get_mkt_info(sym_str) - mkt_infos[sym_str] = mkt + init_msgs.append( + FeedInit( + mkt_info=mkt, + shm_write_opts={ + 'sum_tick_vml': False, + }, + ) + ) + ws_pairs.append(pair.wsname) - symbol = symbols[0].lower() - - # sync with `.data.feed` caller - # TODO: should we make this init msg a `Struct`? - init_msgs = { - symbol: { - 'fqsn': sym_str, - 'mkt_info': mkt_infos[sym_str], - 'shm_write_opts': { - 'sum_tick_vml': False, - }, - }, - } - @acm async def subscribe(ws: NoBsWs):