From a44e926c2f70d220da43f713c645f61ef085a022 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 May 2023 11:59:19 -0400 Subject: [PATCH] kucoin: handle ws welcome, subs-ack and pong msgs Previously the subscription response handling was a bit sloppy what with ignoring the welcome msg; this now correctly expects the correct startup sequence. Also this avoids warn logging on pong messages by expecting them in the msg loop and further drops the `KucoinMsg` struct and instead changes the msg loop to expect `dict`s and only cast to structs on live feed msgs that we actually process/relay. --- piker/brokers/kucoin.py | 49 ++++++++++++++++++----------------------- 1 file changed, 21 insertions(+), 28 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 6da6496a..1e6d2cd0 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -176,17 +176,6 @@ class KucoinL2(Struct, frozen=True): timestamp: float -class KucoinMsg(Struct, frozen=True): - ''' - Generic outer-wrapper for any Kucoin ws msg - - ''' - type: str - topic: str - subject: str - data: list[KucoinTrade | KucoinL2] - - class Currency(Struct, frozen=True): ''' Currency (asset) info: @@ -743,12 +732,14 @@ async def subscribe( 'id': connect_id, 'type': 'subscribe', 'topic': ep, - # 'topic': f'/spotMarket/level2Depth5:{bs_mktid}', 'privateChannel': False, 'response': True, } ) + welcome_msg = await ws.recv_msg() + log.info(f'WS welcome: {welcome_msg}') + for _ in topics: ack_msg = await ws.recv_msg() log.info(f'Sub ACK: {ack_msg}') @@ -782,19 +773,16 @@ async def stream_messages( ''' last_trade_ts: float = 0 + dict_msg: dict[str, Any] async for dict_msg in ws: - if 'subject' not in dict_msg: - log.warn(f'Unhandled message: {dict_msg}') - continue + match dict_msg: + case { + 'subject': 'trade.ticker', + 'data': trade_data_dict, + }: + trade_data = KucoinTrade(**trade_data_dict) - msg = KucoinMsg(**dict_msg) - match msg: - case KucoinMsg( - subject='trade.ticker', - ): - trade_data = KucoinTrade(**msg.data) - - # XXX: Filter for duplicate messages as ws feed will + # XXX: Filter out duplicate messages as ws feed will # send duplicate market state # https://docs.kucoin.com/#level2-5-best-ask-bid-orders if trade_data.time == last_trade_ts: @@ -816,10 +804,11 @@ async def stream_messages( ], } - case KucoinMsg( - subject='level2', - ): - l2_data = KucoinL2(**msg.data) + case { + 'subject': 'level2', + 'data': trade_data_dict, + }: + l2_data = KucoinL2(**trade_data_dict) first_ask = l2_data.asks[0] first_bid = l2_data.bids[0] yield 'l1', { @@ -848,8 +837,12 @@ async def stream_messages( ], } + case {'type': 'pong'}: + # resp to ping task req + continue + case _: - log.warn(f'Unhandled message: {msg}') + log.warn(f'Unhandled message: {dict_msg}') @acm