diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 8f00c97e..b1e94020 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -32,7 +32,6 @@ import hmac import hashlib import wsproto from uuid import uuid4 -from functools import partial import asks import tractor @@ -281,7 +280,10 @@ class Client: self, ) -> dict[str, KucoinMktPair]: entries = await self._request('GET', '/symbols') - syms = {kucoin_sym_to_fqsn(item['name']): KucoinMktPair(**item) for item in entries} + syms = { + kucoin_sym_to_fqsn( + item['name']): KucoinMktPair( + **item) for item in entries} log.info(f' {len(syms)} Kucoin market pairs fetched') return syms @@ -445,6 +447,7 @@ async def open_symbol_search( await stream.send(await client.search_symbols(pattern)) log.info('Kucoin symbol search opened') + @acm async def open_ping_task(ws: wsproto.WSConnection, ping_interval, connect_id): ''' @@ -513,14 +516,14 @@ async def stream_quotes( } } - async with ( subscribe(ws, connect_id, kucoin_sym), stream_messages(ws, sym) as msg_gen, ): typ, quote = await anext(msg_gen) while typ != 'trade': - # take care to not unblock here until we get a real trade quote + # take care to not unblock here until we get a real + # trade quote typ, quote = await anext(msg_gen) task_status.started((init_msgs, quote)) @@ -529,6 +532,7 @@ async def stream_quotes( async for typ, msg in msg_gen: await send_chan.send({sym: msg}) + @acm async def subscribe(ws: wsproto.WSConnection, connect_id, sym): # level 2 sub