From e06f9dc5c0a05378aaa0e2841f30930497f6eb29 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 May 2023 16:22:09 -0400 Subject: [PATCH] kucoin: port to new `NoBsWs` api semantics No longer need to implement connection timeout logic in the streaming code, instead we just `async for` that bby B) Further refining: - better `KucoinTrade` msg parsing and handling with object cases. - make `subscribe()` do sub request in a loop wand wair for acks. --- piker/brokers/kucoin.py | 203 +++++++++++++++++++++------------------- 1 file changed, 105 insertions(+), 98 deletions(-) diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index aaa35f34..015b248c 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -1,4 +1,6 @@ -# Copyright (C) Jared Goldman (in stewardship for pikers) +# Copyright (C) (in stewardship for pikers) +# - Jared Goldman +# - Tyler Goodlet # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -693,7 +695,6 @@ async def stream_quotes( ), ) as ws, open_ping_task(ws, ping_interval, connect_id), - # subscribe(ws, connect_id, kucoin_sym), aclosing(stream_messages(ws, sym_str)) as msg_gen, ): typ, quote = await anext(msg_gen) @@ -716,43 +717,48 @@ async def subscribe( connect_id, bs_mktid, -) -> AsyncGenerator[None, None]: - # level 2 sub - await ws.send_msg( - { - 'id': connect_id, - 'type': 'subscribe', - 'topic': f'/spotMarket/level2Depth5:{bs_mktid}', - 'privateChannel': False, - 'response': True, - } - ) + # subs are filled in with `bs_mktid` from avbove + topics: list[str] = [ + '/market/ticker:{bs_mktid}', # clearing events + '/spotMarket/level2Depth5:{bs_mktid}', # level 2 + ], - # watch trades - await ws.send_msg( - { - 'id': connect_id, - 'type': 'subscribe', - 'topic': f'/market/ticker:{bs_mktid}', - 'privateChannel': False, - 'response': True, - } - ) +) -> AsyncGenerator[None, None]: + + eps: list[str] = [] + for topic in topics: + ep: str = topic.format(bs_mktid=bs_mktid) + eps.append(ep) + await ws.send_msg( + { + 'id': connect_id, + 'type': 'subscribe', + 'topic': ep, + # 'topic': f'/spotMarket/level2Depth5:{bs_mktid}', + 'privateChannel': False, + 'response': True, + } + ) + + for _ in topics: + ack_msg = await ws.recv_msg() + log.info(f'Sub ACK: {ack_msg}') yield # unsub if ws.connected(): log.info(f'Unsubscribing to {bs_mktid} feed') - await ws.send_msg( - { - 'id': connect_id, - 'type': 'unsubscribe', - 'topic': f'/market/ticker:{bs_mktid}', - 'privateChannel': False, - 'response': True, - } - ) + for ep in eps: + await ws.send_msg( + { + 'id': connect_id, + 'type': 'unsubscribe', + 'topic': ep, + 'privateChannel': False, + 'response': True, + } + ) async def stream_messages( @@ -760,80 +766,81 @@ async def stream_messages( sym: str, ) -> AsyncGenerator[tuple[str, dict], None]: - timeouts = 0 - last_trade_ts = 0 + ''' + Core (live) feed msg handler: relay market events + to the piker-ized tick-stream format. - while True: - with trio.move_on_after(3) as cs: - msg = await ws.recv_msg() - if cs.cancelled_caught: - timeouts += 1 - if timeouts > 2: - log.error( - 'kucoin feed is sh**ing the bed... rebooting...') - await ws._connect() + ''' + last_trade_ts: float = 0 + async for dict_msg in ws: + if 'subject' not in dict_msg: + log.warn(f'Unhandled message: {dict_msg}') continue - if msg.get('subject'): - msg = KucoinMsg(**msg) - match msg.subject: - case 'trade.ticker': - trade_data = KucoinTrade(**msg.data) - # XXX: Filter for duplicate messages as ws feed will - # send duplicate market state - # https://docs.kucoin.com/#level2-5-best-ask-bid-orders - if trade_data.time == last_trade_ts: - continue + msg = KucoinMsg(**dict_msg) + match msg: + case KucoinMsg( + subject='trade.ticker', + ): + trade_data = KucoinTrade(**msg.data) - last_trade_ts = trade_data.time + # XXX: Filter for duplicate messages as ws feed will + # send duplicate market state + # https://docs.kucoin.com/#level2-5-best-ask-bid-orders + if trade_data.time == last_trade_ts: + continue - yield 'trade', { - 'symbol': sym, - 'last': trade_data.price, - 'brokerd_ts': last_trade_ts, - 'ticks': [ - { - 'type': 'trade', - 'price': float(trade_data.price), - 'size': float(trade_data.size), - 'broker_ts': last_trade_ts, - } - ], - } + last_trade_ts = trade_data.time - case 'level2': - l2_data = KucoinL2(**msg.data) - first_ask = l2_data.asks[0] - first_bid = l2_data.bids[0] - yield 'l1', { - 'symbol': sym, - 'ticks': [ - { - 'type': 'bid', - 'price': float(first_bid[0]), - 'size': float(first_bid[1]), - }, - { - 'type': 'bsize', - 'price': float(first_bid[0]), - 'size': float(first_bid[1]), - }, - { - 'type': 'ask', - 'price': float(first_ask[0]), - 'size': float(first_ask[1]), - }, - { - 'type': 'asize', - 'price': float(first_ask[0]), - 'size': float(first_ask[1]), - }, - ], - } + yield 'trade', { + 'symbol': sym, + 'last': trade_data.price, + 'brokerd_ts': last_trade_ts, + 'ticks': [ + { + 'type': 'trade', + 'price': float(trade_data.price), + 'size': float(trade_data.size), + 'broker_ts': last_trade_ts, + } + ], + } - case _: - log.warn(f'Unhandled message: {msg}') + case KucoinMsg( + subject='level2', + ): + l2_data = KucoinL2(**msg.data) + first_ask = l2_data.asks[0] + first_bid = l2_data.bids[0] + yield 'l1', { + 'symbol': sym, + 'ticks': [ + { + 'type': 'bid', + 'price': float(first_bid[0]), + 'size': float(first_bid[1]), + }, + { + 'type': 'bsize', + 'price': float(first_bid[0]), + 'size': float(first_bid[1]), + }, + { + 'type': 'ask', + 'price': float(first_ask[0]), + 'size': float(first_ask[1]), + }, + { + 'type': 'asize', + 'price': float(first_ask[0]), + 'size': float(first_ask[1]), + }, + ], + } + + case _: + log.warn(f'Unhandled message: {msg}') @acm