diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 743a78c2..55e73f5d 100755 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -18,29 +18,45 @@ Kucoin broker backend ''' -from typing import Any, Callable, Literal, AsyncGenerator -from contextlib import asynccontextmanager as acm +from contextlib import ( + asynccontextmanager as acm, + aclosing, +) from datetime import datetime -import time +from decimal import Decimal import base64 import hmac import hashlib +import time +from functools import partial +from typing import ( + Any, + Callable, + Literal, + AsyncGenerator, +) import wsproto from uuid import uuid4 import asks import tractor import trio -from trio_util import trio_async_generator from trio_typing import TaskStatus from fuzzywuzzy import process as fuzzy import pendulum import numpy as np -from piker._cacheables import open_cached_client +from piker.accounting._mktinfo import ( + Asset, + MktPair, +) +from piker import config +from piker._cacheables import ( + open_cached_client, + async_lifo_cache, +) from piker.log import get_logger from ._util import DataUnavailable -from piker.pp import config from ..data.types import Struct from ..data._web_bs import ( open_autorecon_ws, @@ -67,11 +83,20 @@ class KucoinMktPair(Struct, frozen=True): https://docs.kucoin.com/#get-symbols-list ''' - baseCurrency: str baseIncrement: float + + @property + def price_tick(self) -> Decimal: + return Decimal(str(self.self.baseIncrement)) + baseMaxSize: float baseMinSize: float + + @property + def size_tick(self) -> Decimal: + return Decimal(str(self.baseMinSize)) + enableTrading: bool feeCurrency: str isMarginEnabled: bool @@ -84,7 +109,7 @@ class KucoinMktPair(Struct, frozen=True): quoteIncrement: float quoteMaxSize: float quoteMinSize: float - symbol: str + symbol: str # our bs_mktid, kucoin's internal id class AccountTrade(Struct, frozen=True): @@ -293,7 +318,7 @@ class Client: ) -> dict[str, KucoinMktPair]: entries = await self._request('GET', '/symbols') syms = { - kucoin_sym_to_fqsn(item['name']): KucoinMktPair(**item) + item['name'].lower().replace('-', ''): KucoinMktPair(**item) for item in entries } @@ -439,15 +464,15 @@ class Client: return array -def fqsn_to_kucoin_sym(fqsn: str, pairs: dict[str, KucoinMktPair]) -> str: +def fqsn_to_kucoin_sym( + fqsn: str, + pairs: dict[str, KucoinMktPair], + +) -> str: pair_data = pairs[fqsn] return pair_data.baseCurrency + '-' + pair_data.quoteCurrency -def kucoin_sym_to_fqsn(sym: str) -> str: - return sym.lower().replace('-', '') - - @acm async def get_client() -> AsyncGenerator[Client, None]: client = Client() @@ -497,14 +522,51 @@ async def open_ping_task( n.cancel_scope.cancel() +@async_lifo_cache() +async def get_mkt_info( + fqme: str, + +) -> tuple[MktPair, KucoinMktPair]: + ''' + Query for and return a `MktPair` and `KucoinMktPair`. + + ''' + async with open_cached_client('kucoin') as client: + # split off any fqme broker part + bs_fqme, _, broker = fqme.partition('.') + + pairs: dict[str, KucoinMktPair] = await client.cache_pairs() + pair: KucoinMktPair = pairs[bs_fqme] + bs_mktid: str = pair.symbol + + # pair: KucoinMktPair = await client.pair_info(pair_str) + + # assets = client.assets + # dst_asset: Asset = assets[pair.base] + # src_asset: Asset = assets[pair.quote] + + mkt = MktPair( + dst=dst_asset, + src=src_asset, + + price_tick=pair.price_tick, + size_tick=pair.size_tick, + bs_mktid=bs_mktid, + + broker='kucoin', + ) + return mkt, pair + + async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, - loglevel: str = '', - # startup sync - task_status: TaskStatus[tuple[dict, dict] - ] = trio.TASK_STATUS_IGNORED, + + task_status: TaskStatus[ + tuple[dict, dict] + ] = trio.TASK_STATUS_IGNORED, + ) -> None: ''' Required piker api to stream real-time data. @@ -512,64 +574,71 @@ async def stream_quotes( ''' async with open_cached_client('kucoin') as client: + + log.info('Starting up quote stream') + # loop through symbols and sub to feedz + for sym_str in symbols: + mkt, pair = await get_mkt_info(sym_str) + + init_msgs = { + # pass back token, and bool, signalling if we're the + # writer and that history has been written + sym_str: { + 'symbol_info': { + 'asset_type': 'crypto', + 'price_tick_size': pair.baseIncrement, + 'lot_tick_size': pair.baseMinSize, + }, + 'shm_write_opts': {'sum_tick_vml': False}, + 'fqsn': sym_str, + } + } + token, ping_interval = await client._get_ws_token() connect_id = str(uuid4()) - pairs = await client.cache_pairs() - ws_url = ( - f'wss://ws-api-spot.kucoin.com/?' - f'token={token}&[connectId={connect_id}]' - ) - # open ping task async with ( - open_autorecon_ws(ws_url) as ws, + open_autorecon_ws( + ( + f'wss://ws-api-spot.kucoin.com/?' + f'token={token}&[connectId={connect_id}]' + ), + fixture=partial( + subscribe, + connect_id=connect_id, + kucoin_sym=pair.sym, + ), + ) as ws, open_ping_task(ws, ping_interval, connect_id), + # subscribe(ws, connect_id, kucoin_sym), + aclosing(stream_messages(ws, sym_str)) as msg_gen, ): - log.info('Starting up quote stream') - # loop through symbols and sub to feedz - for sym in symbols: - pair: KucoinMktPair = pairs[sym] - kucoin_sym = pair.symbol + typ, quote = await anext(msg_gen) + while typ != 'trade': + # take care to not unblock here until we get a real + # trade quote + typ, quote = await anext(msg_gen) - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': { - 'asset_type': 'crypto', - 'price_tick_size': float(pair.baseIncrement), - 'lot_tick_size': float(pair.baseMinSize), - }, - 'shm_write_opts': {'sum_tick_vml': False}, - 'fqsn': sym, - } - } + task_status.started((init_msgs, quote)) + feed_is_live.set() - async with ( - subscribe(ws, connect_id, kucoin_sym), - stream_messages(ws, sym) as msg_gen, - ): - typ, quote = await anext(msg_gen) - while typ != 'trade': - # take care to not unblock here until we get a real - # trade quote - typ, quote = await anext(msg_gen) - - task_status.started((init_msgs, quote)) - feed_is_live.set() - - async for typ, msg in msg_gen: - await send_chan.send({sym: msg}) + async for typ, msg in msg_gen: + await send_chan.send({sym_str: msg}) @acm -async def subscribe(ws: wsproto.WSConnection, connect_id, sym) -> AsyncGenerator[None, None]: +async def subscribe( + ws: NoBsWs, + connect_id, + bs_mktid, + +) -> AsyncGenerator[None, None]: # level 2 sub await ws.send_msg( { 'id': connect_id, 'type': 'subscribe', - 'topic': f'/spotMarket/level2Depth5:{sym}', + 'topic': f'/spotMarket/level2Depth5:{bs_mktid}', 'privateChannel': False, 'response': True, } @@ -580,7 +649,7 @@ async def subscribe(ws: wsproto.WSConnection, connect_id, sym) -> AsyncGenerator { 'id': connect_id, 'type': 'subscribe', - 'topic': f'/market/ticker:{sym}', + 'topic': f'/market/ticker:{bs_mktid}', 'privateChannel': False, 'response': True, } @@ -590,21 +659,22 @@ async def subscribe(ws: wsproto.WSConnection, connect_id, sym) -> AsyncGenerator # unsub if ws.connected(): - log.info(f'Unsubscribing to {sym} feed') + log.info(f'Unsubscribing to {bs_mktid} feed') await ws.send_msg( { 'id': connect_id, 'type': 'unsubscribe', - 'topic': f'/market/ticker:{sym}', + 'topic': f'/market/ticker:{bs_mktid}', 'privateChannel': False, 'response': True, } ) -@trio_async_generator async def stream_messages( - ws: NoBsWs, sym: str + ws: NoBsWs, + sym: str, + ) -> AsyncGenerator[tuple[str, dict], None]: timeouts = 0 last_trade_ts = 0