From ae1c5a0db071ab838addc83b2bbe296dc8bbfcf9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 9 Jun 2023 16:45:02 -0400 Subject: [PATCH] binance: breakout into `feed` and `broker` mods like other backends --- piker/brokers/binance/__init__.py | 16 +- piker/brokers/binance/api.py | 511 ------------------------------ piker/brokers/binance/broker.py | 188 +++++++++++ piker/brokers/binance/feed.py | 414 ++++++++++++++++++++++++ 4 files changed, 609 insertions(+), 520 deletions(-) create mode 100644 piker/brokers/binance/broker.py create mode 100644 piker/brokers/binance/feed.py diff --git a/piker/brokers/binance/__init__.py b/piker/brokers/binance/__init__.py index c840f071..cfdbd3a5 100644 --- a/piker/brokers/binance/__init__.py +++ b/piker/brokers/binance/__init__.py @@ -23,16 +23,15 @@ binancial secs on the floor, in the office, behind the dumpster. """ from .api import ( get_client, -# ) -# from .feed import ( +) +from .feed import ( get_mkt_info, open_history_client, open_symbol_search, stream_quotes, -# ) -# from .broker import ( +) +from .broker import ( trades_dialogue, - # norm_trade_records, ) @@ -43,13 +42,12 @@ __all__ = [ 'open_history_client', 'open_symbol_search', 'stream_quotes', - # 'norm_trade_records', ] -# tractor RPC enable arg +# `brokerd` modules __enable_modules__: list[str] = [ 'api', - # 'feed', - # 'broker', + 'feed', + 'broker', ] diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py index 7b847bf8..9ab9f835 100644 --- a/piker/brokers/binance/api.py +++ b/piker/brokers/binance/api.py @@ -25,66 +25,32 @@ from __future__ import annotations from collections import OrderedDict from contextlib import ( asynccontextmanager as acm, - aclosing, ) from datetime import datetime from decimal import Decimal -import itertools from typing import ( Any, Union, - AsyncIterator, - AsyncGenerator, - Callable, ) import hmac -import time import hashlib from pathlib import Path import trio -from trio_typing import TaskStatus from pendulum import ( now, - from_timestamp, ) import asks from fuzzywuzzy import process as fuzzy import numpy as np -import tractor from piker import config -from piker._cacheables import ( - async_lifo_cache, - open_cached_client, -) -from piker.accounting._mktinfo import ( - Asset, - MktPair, - digits_to_dec, -) from piker.data.types import Struct -from piker.data.validate import FeedInit from piker.data import def_iohlcv_fields -from piker.data._web_bs import ( - open_autorecon_ws, - NoBsWs, -) -from piker.clearing._messages import ( - BrokerdOrder, - BrokerdOrderAck, - BrokerdStatus, - BrokerdPosition, - BrokerdFill, - BrokerdCancel, - # BrokerdError, -) from piker.brokers._util import ( resproc, SymbolNotFound, - DataUnavailable, get_logger, - get_console_log, ) log = get_logger('piker.brokers.binance') @@ -211,18 +177,6 @@ class OHLC(Struct): bar_wap: float = 0.0 -class L1(Struct): - # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams - - update_id: int - sym: str - - bid: float - bsize: float - ask: float - asize: float - - # convert datetime obj timestamp to unixtime in milliseconds def binance_timestamp( when: datetime @@ -644,468 +598,3 @@ async def get_client() -> Client: log.info('Caching exchange infos..') await client.exch_info() yield client - - -# validation type -class AggTrade(Struct, frozen=True): - e: str # Event type - E: int # Event time - s: str # Symbol - a: int # Aggregate trade ID - p: float # Price - q: float # Quantity - f: int # First trade ID - l: int # noqa Last trade ID - T: int # Trade time - m: bool # Is the buyer the market maker? - M: bool # Ignore - - -async def stream_messages( - ws: NoBsWs, -) -> AsyncGenerator[NoBsWs, dict]: - - # TODO: match syntax here! - msg: dict[str, Any] - async for msg in ws: - match msg: - # for l1 streams binance doesn't add an event type field so - # identify those messages by matching keys - # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams - case { - # NOTE: this is never an old value it seems, so - # they are always sending real L1 spread updates. - 'u': upid, # update id - 's': sym, - 'b': bid, - 'B': bsize, - 'a': ask, - 'A': asize, - }: - # TODO: it would be super nice to have a `L1` piker type - # which "renders" incremental tick updates from a packed - # msg-struct: - # - backend msgs after packed into the type such that we - # can reduce IPC usage but without each backend having - # to do that incremental update logic manually B) - # - would it maybe be more efficient to use this instead? - # https://binance-docs.github.io/apidocs/spot/en/#diff-depth-stream - l1 = L1( - update_id=upid, - sym=sym, - bid=bid, - bsize=bsize, - ask=ask, - asize=asize, - ) - l1.typecast() - - # repack into piker's tick-quote format - yield 'l1', { - 'symbol': l1.sym, - 'ticks': [ - { - 'type': 'bid', - 'price': l1.bid, - 'size': l1.bsize, - }, - { - 'type': 'bsize', - 'price': l1.bid, - 'size': l1.bsize, - }, - { - 'type': 'ask', - 'price': l1.ask, - 'size': l1.asize, - }, - { - 'type': 'asize', - 'price': l1.ask, - 'size': l1.asize, - } - ] - } - - # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams - case { - 'e': 'aggTrade', - }: - # NOTE: this is purely for a definition, - # ``msgspec.Struct`` does not runtime-validate until you - # decode/encode, see: - # https://jcristharif.com/msgspec/structs.html#type-validation - msg = AggTrade(**msg) # TODO: should we .copy() ? - piker_quote: dict = { - 'symbol': msg.s, - 'last': float(msg.p), - 'brokerd_ts': time.time(), - 'ticks': [{ - 'type': 'trade', - 'price': float(msg.p), - 'size': float(msg.q), - 'broker_ts': msg.T, - }], - } - yield 'trade', piker_quote - - -def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: - ''' - Create a request subscription packet dict. - - - spot: - https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams - - - futes: - https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams - - ''' - return { - 'method': 'SUBSCRIBE', - 'params': [ - f'{pair.lower()}@{sub_name}' - for pair in pairs - ], - 'id': uid - } - - -@acm -async def open_history_client( - mkt: MktPair, - -) -> tuple[Callable, int]: - - symbol: str = mkt.bs_fqme - - # TODO implement history getter for the new storage layer. - async with open_cached_client('binance') as client: - - async def get_ohlc( - timeframe: float, - end_dt: datetime | None = None, - start_dt: datetime | None = None, - - ) -> tuple[ - np.ndarray, - datetime, # start - datetime, # end - ]: - if timeframe != 60: - raise DataUnavailable('Only 1m bars are supported') - - array = await client.bars( - symbol, - start_dt=start_dt, - end_dt=end_dt, - ) - times = array['time'] - if ( - end_dt is None - ): - inow = round(time.time()) - if (inow - times[-1]) > 60: - await tractor.breakpoint() - - start_dt = from_timestamp(times[0]) - end_dt = from_timestamp(times[-1]) - - return array, start_dt, end_dt - - yield get_ohlc, {'erlangs': 3, 'rate': 3} - - -@async_lifo_cache() -async def get_mkt_info( - fqme: str, - -) -> tuple[MktPair, Pair]: - - async with open_cached_client('binance') as client: - - pair: Pair = await client.exch_info(fqme.upper()) - mkt = MktPair( - dst=Asset( - name=pair.baseAsset, - atype='crypto', - tx_tick=digits_to_dec(pair.baseAssetPrecision), - ), - src=Asset( - name=pair.quoteAsset, - atype='crypto', - tx_tick=digits_to_dec(pair.quoteAssetPrecision), - ), - price_tick=pair.price_tick, - size_tick=pair.size_tick, - bs_mktid=pair.symbol, - broker='binance', - ) - both = mkt, pair - return both - - -async def stream_quotes( - - send_chan: trio.abc.SendChannel, - symbols: list[str], - feed_is_live: trio.Event, - loglevel: str = None, - - # startup sync - task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, - -) -> None: - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - async with ( - send_chan as send_chan, - ): - init_msgs: list[FeedInit] = [] - for sym in symbols: - mkt, pair = await get_mkt_info(sym) - - # build out init msgs according to latest spec - init_msgs.append( - FeedInit(mkt_info=mkt) - ) - - iter_subids = itertools.count() - - @acm - async def subscribe(ws: NoBsWs): - # setup subs - - subid: int = next(iter_subids) - - # trade data (aka L1) - # https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker - l1_sub = make_sub(symbols, 'bookTicker', subid) - await ws.send_msg(l1_sub) - - # aggregate (each order clear by taker **not** by maker) - # trades data: - # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams - agg_trades_sub = make_sub(symbols, 'aggTrade', subid) - await ws.send_msg(agg_trades_sub) - - # might get ack from ws server, or maybe some - # other msg still in transit.. - res = await ws.recv_msg() - subid: str | None = res.get('id') - if subid: - assert res['id'] == subid - - yield - - subs = [] - for sym in symbols: - subs.append("{sym}@aggTrade") - subs.append("{sym}@bookTicker") - - # unsub from all pairs on teardown - if ws.connected(): - await ws.send_msg({ - "method": "UNSUBSCRIBE", - "params": subs, - "id": subid, - }) - - # XXX: do we need to ack the unsub? - # await ws.recv_msg() - - async with ( - open_autorecon_ws( - # XXX: see api docs which show diff addr? - # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information - # 'wss://ws-api.binance.com:443/ws-api/v3', - 'wss://stream.binance.com/ws', - fixture=subscribe, - ) as ws, - - # avoid stream-gen closure from breaking trio.. - aclosing(stream_messages(ws)) as msg_gen, - ): - typ, quote = await anext(msg_gen) - - # pull a first quote and deliver - while typ != 'trade': - typ, quote = await anext(msg_gen) - - task_status.started((init_msgs, quote)) - - # signal to caller feed is ready for consumption - feed_is_live.set() - - # import time - # last = time.time() - - # start streaming - async for typ, msg in msg_gen: - - # period = time.time() - last - # hz = 1/period if period else float('inf') - # if hz > 60: - # log.info(f'Binance quotez : {hz}') - topic = msg['symbol'].lower() - await send_chan.send({topic: msg}) - # last = time.time() - - -async def handle_order_requests( - ems_order_stream: tractor.MsgStream -) -> None: - async with open_cached_client('binance') as client: - async for request_msg in ems_order_stream: - log.info(f'Received order request {request_msg}') - - action = request_msg['action'] - - if action in {'buy', 'sell'}: - # validate - order = BrokerdOrder(**request_msg) - - # call our client api to submit the order - reqid = await client.submit_limit( - order.symbol, - order.action, - order.size, - order.price, - oid=order.oid - ) - - # deliver ack that order has been submitted to broker routing - await ems_order_stream.send( - BrokerdOrderAck( - # ems order request id - oid=order.oid, - # broker specific request id - reqid=reqid, - time_ns=time.time_ns(), - ).dict() - ) - - elif action == 'cancel': - msg = BrokerdCancel(**request_msg) - - await client.submit_cancel(msg.symbol, msg.reqid) - - else: - log.error(f'Unknown order command: {request_msg}') - - -@tractor.context -async def trades_dialogue( - ctx: tractor.Context, - loglevel: str = None - -) -> AsyncIterator[dict[str, Any]]: - - async with open_cached_client('binance') as client: - if not client.api_key: - await ctx.started('paper') - return - - # table: PpTable - # ledger: TransactionLedger - - # TODO: load pps and accounts using accounting apis! - positions: list[BrokerdPosition] = [] - accounts: list[str] = ['binance.default'] - await ctx.started((positions, accounts)) - - async with ( - ctx.open_stream() as ems_stream, - trio.open_nursery() as n, - open_cached_client('binance') as client, - client.manage_listen_key() as listen_key, - ): - n.start_soon(handle_order_requests, ems_stream) - # await trio.sleep_forever() - - async with open_autorecon_ws( - f'wss://stream.binance.com:9443/ws/{listen_key}', - ) as ws: - event = await ws.recv_msg() - - # https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update - if event.get('e') == 'executionReport': - - oid: str = event.get('c') - side: str = event.get('S').lower() - status: str = event.get('X') - order_qty: float = float(event.get('q')) - filled_qty: float = float(event.get('z')) - cum_transacted_qty: float = float(event.get('Z')) - price_avg: float = cum_transacted_qty / filled_qty - broker_time: float = float(event.get('T')) - commission_amount: float = float(event.get('n')) - commission_asset: float = event.get('N') - - if status == 'TRADE': - if order_qty == filled_qty: - msg = BrokerdFill( - reqid=oid, - time_ns=time.time_ns(), - action=side, - price=price_avg, - broker_details={ - 'name': 'binance', - 'commissions': { - 'amount': commission_amount, - 'asset': commission_asset - }, - 'broker_time': broker_time - }, - broker_time=broker_time - ) - - else: - if status == 'NEW': - status = 'submitted' - - elif status == 'CANCELED': - status = 'cancelled' - - msg = BrokerdStatus( - reqid=oid, - time_ns=time.time_ns(), - status=status, - filled=filled_qty, - remaining=order_qty - filled_qty, - broker_details={'name': 'binance'} - ) - - else: - # XXX: temporary, to catch unhandled msgs - breakpoint() - - await ems_stream.send(msg.dict()) - - -@tractor.context -async def open_symbol_search( - ctx: tractor.Context, -) -> Client: - async with open_cached_client('binance') as client: - - # load all symbols locally for fast search - cache = await client.exch_info() - await ctx.started() - - async with ctx.open_stream() as stream: - - async for pattern in stream: - # results = await client.exch_info(sym=pattern.upper()) - - matches = fuzzy.extractBests( - pattern, - cache, - score_cutoff=50, - ) - # repack in dict form - await stream.send({ - item[0].symbol: item[0] - for item in matches - }) diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py new file mode 100644 index 00000000..53dd7a64 --- /dev/null +++ b/piker/brokers/binance/broker.py @@ -0,0 +1,188 @@ +# piker: trading gear for hackers +# Copyright (C) +# Guillermo Rodriguez (aka ze jefe) +# Tyler Goodlet +# (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Live order control B) + +''' +from __future__ import annotations +from typing import ( + Any, + AsyncIterator, +) +import time + +import tractor +import trio + +from piker.brokers._util import ( + get_logger, +) +from piker.data._web_bs import ( + open_autorecon_ws, + NoBsWs, +) +from piker._cacheables import ( + open_cached_client, +) +from piker.clearing._messages import ( + BrokerdOrder, + BrokerdOrderAck, + BrokerdStatus, + BrokerdPosition, + BrokerdFill, + BrokerdCancel, + # BrokerdError, +) + +log = get_logger('piker.brokers.binance') + + +async def handle_order_requests( + ems_order_stream: tractor.MsgStream +) -> None: + async with open_cached_client('binance') as client: + async for request_msg in ems_order_stream: + log.info(f'Received order request {request_msg}') + + action = request_msg['action'] + + if action in {'buy', 'sell'}: + # validate + order = BrokerdOrder(**request_msg) + + # call our client api to submit the order + reqid = await client.submit_limit( + order.symbol, + order.action, + order.size, + order.price, + oid=order.oid + ) + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + # ems order request id + oid=order.oid, + # broker specific request id + reqid=reqid, + time_ns=time.time_ns(), + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + + await client.submit_cancel(msg.symbol, msg.reqid) + + else: + log.error(f'Unknown order command: {request_msg}') + + +@tractor.context +async def trades_dialogue( + ctx: tractor.Context, + loglevel: str = None + +) -> AsyncIterator[dict[str, Any]]: + + async with open_cached_client('binance') as client: + if not client.api_key: + await ctx.started('paper') + return + + # table: PpTable + # ledger: TransactionLedger + + # TODO: load pps and accounts using accounting apis! + positions: list[BrokerdPosition] = [] + accounts: list[str] = ['binance.default'] + await ctx.started((positions, accounts)) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + open_cached_client('binance') as client, + client.manage_listen_key() as listen_key, + ): + n.start_soon(handle_order_requests, ems_stream) + # await trio.sleep_forever() + + ws: NoBsWs + async with open_autorecon_ws( + f'wss://stream.binance.com:9443/ws/{listen_key}', + ) as ws: + event = await ws.recv_msg() + + # https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update + if event.get('e') == 'executionReport': + + oid: str = event.get('c') + side: str = event.get('S').lower() + status: str = event.get('X') + order_qty: float = float(event.get('q')) + filled_qty: float = float(event.get('z')) + cum_transacted_qty: float = float(event.get('Z')) + price_avg: float = cum_transacted_qty / filled_qty + broker_time: float = float(event.get('T')) + commission_amount: float = float(event.get('n')) + commission_asset: float = event.get('N') + + if status == 'TRADE': + if order_qty == filled_qty: + msg = BrokerdFill( + reqid=oid, + time_ns=time.time_ns(), + action=side, + price=price_avg, + broker_details={ + 'name': 'binance', + 'commissions': { + 'amount': commission_amount, + 'asset': commission_asset + }, + 'broker_time': broker_time + }, + broker_time=broker_time + ) + + else: + if status == 'NEW': + status = 'submitted' + + elif status == 'CANCELED': + status = 'cancelled' + + msg = BrokerdStatus( + reqid=oid, + time_ns=time.time_ns(), + status=status, + filled=filled_qty, + remaining=order_qty - filled_qty, + broker_details={'name': 'binance'} + ) + + else: + # XXX: temporary, to catch unhandled msgs + breakpoint() + + await ems_stream.send(msg.dict()) + + diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py new file mode 100644 index 00000000..9ecda184 --- /dev/null +++ b/piker/brokers/binance/feed.py @@ -0,0 +1,414 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Real-time and historical data feed endpoints. + +''' +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, + aclosing, +) +from datetime import datetime +import itertools +from typing import ( + Any, + AsyncGenerator, + Callable, +) +import time + +import trio +from trio_typing import TaskStatus +from pendulum import ( + from_timestamp, +) +from fuzzywuzzy import process as fuzzy +import numpy as np +import tractor + +from piker._cacheables import ( + async_lifo_cache, + open_cached_client, +) +from piker.accounting._mktinfo import ( + Asset, + MktPair, + digits_to_dec, +) +from piker.data.types import Struct +from piker.data.validate import FeedInit +from piker.data._web_bs import ( + open_autorecon_ws, + NoBsWs, +) +from piker.brokers._util import ( + DataUnavailable, + get_logger, + get_console_log, +) + +from .api import ( + Client, + Pair, +) + +log = get_logger('piker.brokers.binance') + + +class L1(Struct): + # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams + + update_id: int + sym: str + + bid: float + bsize: float + ask: float + asize: float + + +# validation type +class AggTrade(Struct, frozen=True): + e: str # Event type + E: int # Event time + s: str # Symbol + a: int # Aggregate trade ID + p: float # Price + q: float # Quantity + f: int # First trade ID + l: int # noqa Last trade ID + T: int # Trade time + m: bool # Is the buyer the market maker? + M: bool # Ignore + + +async def stream_messages( + ws: NoBsWs, +) -> AsyncGenerator[NoBsWs, dict]: + + # TODO: match syntax here! + msg: dict[str, Any] + async for msg in ws: + match msg: + # for l1 streams binance doesn't add an event type field so + # identify those messages by matching keys + # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams + case { + # NOTE: this is never an old value it seems, so + # they are always sending real L1 spread updates. + 'u': upid, # update id + 's': sym, + 'b': bid, + 'B': bsize, + 'a': ask, + 'A': asize, + }: + # TODO: it would be super nice to have a `L1` piker type + # which "renders" incremental tick updates from a packed + # msg-struct: + # - backend msgs after packed into the type such that we + # can reduce IPC usage but without each backend having + # to do that incremental update logic manually B) + # - would it maybe be more efficient to use this instead? + # https://binance-docs.github.io/apidocs/spot/en/#diff-depth-stream + l1 = L1( + update_id=upid, + sym=sym, + bid=bid, + bsize=bsize, + ask=ask, + asize=asize, + ) + l1.typecast() + + # repack into piker's tick-quote format + yield 'l1', { + 'symbol': l1.sym, + 'ticks': [ + { + 'type': 'bid', + 'price': l1.bid, + 'size': l1.bsize, + }, + { + 'type': 'bsize', + 'price': l1.bid, + 'size': l1.bsize, + }, + { + 'type': 'ask', + 'price': l1.ask, + 'size': l1.asize, + }, + { + 'type': 'asize', + 'price': l1.ask, + 'size': l1.asize, + } + ] + } + + # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams + case { + 'e': 'aggTrade', + }: + # NOTE: this is purely for a definition, + # ``msgspec.Struct`` does not runtime-validate until you + # decode/encode, see: + # https://jcristharif.com/msgspec/structs.html#type-validation + msg = AggTrade(**msg) # TODO: should we .copy() ? + piker_quote: dict = { + 'symbol': msg.s, + 'last': float(msg.p), + 'brokerd_ts': time.time(), + 'ticks': [{ + 'type': 'trade', + 'price': float(msg.p), + 'size': float(msg.q), + 'broker_ts': msg.T, + }], + } + yield 'trade', piker_quote + + +def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: + ''' + Create a request subscription packet dict. + + - spot: + https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams + + - futes: + https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams + + ''' + return { + 'method': 'SUBSCRIBE', + 'params': [ + f'{pair.lower()}@{sub_name}' + for pair in pairs + ], + 'id': uid + } + + +@acm +async def open_history_client( + mkt: MktPair, + +) -> tuple[Callable, int]: + + symbol: str = mkt.bs_fqme + + # TODO implement history getter for the new storage layer. + async with open_cached_client('binance') as client: + + async def get_ohlc( + timeframe: float, + end_dt: datetime | None = None, + start_dt: datetime | None = None, + + ) -> tuple[ + np.ndarray, + datetime, # start + datetime, # end + ]: + if timeframe != 60: + raise DataUnavailable('Only 1m bars are supported') + + array = await client.bars( + symbol, + start_dt=start_dt, + end_dt=end_dt, + ) + times = array['time'] + if ( + end_dt is None + ): + inow = round(time.time()) + if (inow - times[-1]) > 60: + await tractor.breakpoint() + + start_dt = from_timestamp(times[0]) + end_dt = from_timestamp(times[-1]) + + return array, start_dt, end_dt + + yield get_ohlc, {'erlangs': 3, 'rate': 3} + + +@async_lifo_cache() +async def get_mkt_info( + fqme: str, + +) -> tuple[MktPair, Pair]: + + async with open_cached_client('binance') as client: + + pair: Pair = await client.exch_info(fqme.upper()) + mkt = MktPair( + dst=Asset( + name=pair.baseAsset, + atype='crypto', + tx_tick=digits_to_dec(pair.baseAssetPrecision), + ), + src=Asset( + name=pair.quoteAsset, + atype='crypto', + tx_tick=digits_to_dec(pair.quoteAssetPrecision), + ), + price_tick=pair.price_tick, + size_tick=pair.size_tick, + bs_mktid=pair.symbol, + broker='binance', + ) + both = mkt, pair + return both + + +async def stream_quotes( + + send_chan: trio.abc.SendChannel, + symbols: list[str], + feed_is_live: trio.Event, + loglevel: str = None, + + # startup sync + task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, + +) -> None: + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + async with ( + send_chan as send_chan, + ): + init_msgs: list[FeedInit] = [] + for sym in symbols: + mkt, pair = await get_mkt_info(sym) + + # build out init msgs according to latest spec + init_msgs.append( + FeedInit(mkt_info=mkt) + ) + + iter_subids = itertools.count() + + @acm + async def subscribe(ws: NoBsWs): + # setup subs + + subid: int = next(iter_subids) + + # trade data (aka L1) + # https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker + l1_sub = make_sub(symbols, 'bookTicker', subid) + await ws.send_msg(l1_sub) + + # aggregate (each order clear by taker **not** by maker) + # trades data: + # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams + agg_trades_sub = make_sub(symbols, 'aggTrade', subid) + await ws.send_msg(agg_trades_sub) + + # might get ack from ws server, or maybe some + # other msg still in transit.. + res = await ws.recv_msg() + subid: str | None = res.get('id') + if subid: + assert res['id'] == subid + + yield + + subs = [] + for sym in symbols: + subs.append("{sym}@aggTrade") + subs.append("{sym}@bookTicker") + + # unsub from all pairs on teardown + if ws.connected(): + await ws.send_msg({ + "method": "UNSUBSCRIBE", + "params": subs, + "id": subid, + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + async with ( + open_autorecon_ws( + # XXX: see api docs which show diff addr? + # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information + # 'wss://ws-api.binance.com:443/ws-api/v3', + 'wss://stream.binance.com/ws', + fixture=subscribe, + ) as ws, + + # avoid stream-gen closure from breaking trio.. + aclosing(stream_messages(ws)) as msg_gen, + ): + typ, quote = await anext(msg_gen) + + # pull a first quote and deliver + while typ != 'trade': + typ, quote = await anext(msg_gen) + + task_status.started((init_msgs, quote)) + + # signal to caller feed is ready for consumption + feed_is_live.set() + + # import time + # last = time.time() + + # start streaming + async for typ, msg in msg_gen: + + # period = time.time() - last + # hz = 1/period if period else float('inf') + # if hz > 60: + # log.info(f'Binance quotez : {hz}') + topic = msg['symbol'].lower() + await send_chan.send({topic: msg}) + # last = time.time() +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, +) -> Client: + async with open_cached_client('binance') as client: + + # load all symbols locally for fast search + cache = await client.exch_info() + await ctx.started() + + async with ctx.open_stream() as stream: + + async for pattern in stream: + # results = await client.exch_info(sym=pattern.upper()) + + matches = fuzzy.extractBests( + pattern, + cache, + score_cutoff=50, + ) + # repack in dict form + await stream.send({ + item[0].symbol: item[0] + for item in matches + })