diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 6da6496a..1e6d2cd0 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -176,17 +176,6 @@ class KucoinL2(Struct, frozen=True): timestamp: float -class KucoinMsg(Struct, frozen=True): - ''' - Generic outer-wrapper for any Kucoin ws msg - - ''' - type: str - topic: str - subject: str - data: list[KucoinTrade | KucoinL2] - - class Currency(Struct, frozen=True): ''' Currency (asset) info: @@ -743,12 +732,14 @@ async def subscribe( 'id': connect_id, 'type': 'subscribe', 'topic': ep, - # 'topic': f'/spotMarket/level2Depth5:{bs_mktid}', 'privateChannel': False, 'response': True, } ) + welcome_msg = await ws.recv_msg() + log.info(f'WS welcome: {welcome_msg}') + for _ in topics: ack_msg = await ws.recv_msg() log.info(f'Sub ACK: {ack_msg}') @@ -782,19 +773,16 @@ async def stream_messages( ''' last_trade_ts: float = 0 + dict_msg: dict[str, Any] async for dict_msg in ws: - if 'subject' not in dict_msg: - log.warn(f'Unhandled message: {dict_msg}') - continue + match dict_msg: + case { + 'subject': 'trade.ticker', + 'data': trade_data_dict, + }: + trade_data = KucoinTrade(**trade_data_dict) - msg = KucoinMsg(**dict_msg) - match msg: - case KucoinMsg( - subject='trade.ticker', - ): - trade_data = KucoinTrade(**msg.data) - - # XXX: Filter for duplicate messages as ws feed will + # XXX: Filter out duplicate messages as ws feed will # send duplicate market state # https://docs.kucoin.com/#level2-5-best-ask-bid-orders if trade_data.time == last_trade_ts: @@ -816,10 +804,11 @@ async def stream_messages( ], } - case KucoinMsg( - subject='level2', - ): - l2_data = KucoinL2(**msg.data) + case { + 'subject': 'level2', + 'data': trade_data_dict, + }: + l2_data = KucoinL2(**trade_data_dict) first_ask = l2_data.asks[0] first_bid = l2_data.bids[0] yield 'l1', { @@ -848,8 +837,12 @@ async def stream_messages( ], } + case {'type': 'pong'}: + # resp to ping task req + continue + case _: - log.warn(f'Unhandled message: {msg}') + log.warn(f'Unhandled message: {dict_msg}') @acm