diff --git a/piker/brokers/binance/__init__.py b/piker/brokers/binance/__init__.py
index c840f071..cfdbd3a5 100644
--- a/piker/brokers/binance/__init__.py
+++ b/piker/brokers/binance/__init__.py
@@ -23,16 +23,15 @@ binancial secs on the floor, in the office, behind the dumpster.
"""
from .api import (
get_client,
-# )
-# from .feed import (
+)
+from .feed import (
get_mkt_info,
open_history_client,
open_symbol_search,
stream_quotes,
-# )
-# from .broker import (
+)
+from .broker import (
trades_dialogue,
- # norm_trade_records,
)
@@ -43,13 +42,12 @@ __all__ = [
'open_history_client',
'open_symbol_search',
'stream_quotes',
- # 'norm_trade_records',
]
-# tractor RPC enable arg
+# `brokerd` modules
__enable_modules__: list[str] = [
'api',
- # 'feed',
- # 'broker',
+ 'feed',
+ 'broker',
]
diff --git a/piker/brokers/binance/api.py b/piker/brokers/binance/api.py
index 7b847bf8..9ab9f835 100644
--- a/piker/brokers/binance/api.py
+++ b/piker/brokers/binance/api.py
@@ -25,66 +25,32 @@ from __future__ import annotations
from collections import OrderedDict
from contextlib import (
asynccontextmanager as acm,
- aclosing,
)
from datetime import datetime
from decimal import Decimal
-import itertools
from typing import (
Any,
Union,
- AsyncIterator,
- AsyncGenerator,
- Callable,
)
import hmac
-import time
import hashlib
from pathlib import Path
import trio
-from trio_typing import TaskStatus
from pendulum import (
now,
- from_timestamp,
)
import asks
from fuzzywuzzy import process as fuzzy
import numpy as np
-import tractor
from piker import config
-from piker._cacheables import (
- async_lifo_cache,
- open_cached_client,
-)
-from piker.accounting._mktinfo import (
- Asset,
- MktPair,
- digits_to_dec,
-)
from piker.data.types import Struct
-from piker.data.validate import FeedInit
from piker.data import def_iohlcv_fields
-from piker.data._web_bs import (
- open_autorecon_ws,
- NoBsWs,
-)
-from piker.clearing._messages import (
- BrokerdOrder,
- BrokerdOrderAck,
- BrokerdStatus,
- BrokerdPosition,
- BrokerdFill,
- BrokerdCancel,
- # BrokerdError,
-)
from piker.brokers._util import (
resproc,
SymbolNotFound,
- DataUnavailable,
get_logger,
- get_console_log,
)
log = get_logger('piker.brokers.binance')
@@ -211,18 +177,6 @@ class OHLC(Struct):
bar_wap: float = 0.0
-class L1(Struct):
- # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
-
- update_id: int
- sym: str
-
- bid: float
- bsize: float
- ask: float
- asize: float
-
-
# convert datetime obj timestamp to unixtime in milliseconds
def binance_timestamp(
when: datetime
@@ -644,468 +598,3 @@ async def get_client() -> Client:
log.info('Caching exchange infos..')
await client.exch_info()
yield client
-
-
-# validation type
-class AggTrade(Struct, frozen=True):
- e: str # Event type
- E: int # Event time
- s: str # Symbol
- a: int # Aggregate trade ID
- p: float # Price
- q: float # Quantity
- f: int # First trade ID
- l: int # noqa Last trade ID
- T: int # Trade time
- m: bool # Is the buyer the market maker?
- M: bool # Ignore
-
-
-async def stream_messages(
- ws: NoBsWs,
-) -> AsyncGenerator[NoBsWs, dict]:
-
- # TODO: match syntax here!
- msg: dict[str, Any]
- async for msg in ws:
- match msg:
- # for l1 streams binance doesn't add an event type field so
- # identify those messages by matching keys
- # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
- case {
- # NOTE: this is never an old value it seems, so
- # they are always sending real L1 spread updates.
- 'u': upid, # update id
- 's': sym,
- 'b': bid,
- 'B': bsize,
- 'a': ask,
- 'A': asize,
- }:
- # TODO: it would be super nice to have a `L1` piker type
- # which "renders" incremental tick updates from a packed
- # msg-struct:
- # - backend msgs after packed into the type such that we
- # can reduce IPC usage but without each backend having
- # to do that incremental update logic manually B)
- # - would it maybe be more efficient to use this instead?
- # https://binance-docs.github.io/apidocs/spot/en/#diff-depth-stream
- l1 = L1(
- update_id=upid,
- sym=sym,
- bid=bid,
- bsize=bsize,
- ask=ask,
- asize=asize,
- )
- l1.typecast()
-
- # repack into piker's tick-quote format
- yield 'l1', {
- 'symbol': l1.sym,
- 'ticks': [
- {
- 'type': 'bid',
- 'price': l1.bid,
- 'size': l1.bsize,
- },
- {
- 'type': 'bsize',
- 'price': l1.bid,
- 'size': l1.bsize,
- },
- {
- 'type': 'ask',
- 'price': l1.ask,
- 'size': l1.asize,
- },
- {
- 'type': 'asize',
- 'price': l1.ask,
- 'size': l1.asize,
- }
- ]
- }
-
- # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
- case {
- 'e': 'aggTrade',
- }:
- # NOTE: this is purely for a definition,
- # ``msgspec.Struct`` does not runtime-validate until you
- # decode/encode, see:
- # https://jcristharif.com/msgspec/structs.html#type-validation
- msg = AggTrade(**msg) # TODO: should we .copy() ?
- piker_quote: dict = {
- 'symbol': msg.s,
- 'last': float(msg.p),
- 'brokerd_ts': time.time(),
- 'ticks': [{
- 'type': 'trade',
- 'price': float(msg.p),
- 'size': float(msg.q),
- 'broker_ts': msg.T,
- }],
- }
- yield 'trade', piker_quote
-
-
-def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
- '''
- Create a request subscription packet dict.
-
- - spot:
- https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams
-
- - futes:
- https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams
-
- '''
- return {
- 'method': 'SUBSCRIBE',
- 'params': [
- f'{pair.lower()}@{sub_name}'
- for pair in pairs
- ],
- 'id': uid
- }
-
-
-@acm
-async def open_history_client(
- mkt: MktPair,
-
-) -> tuple[Callable, int]:
-
- symbol: str = mkt.bs_fqme
-
- # TODO implement history getter for the new storage layer.
- async with open_cached_client('binance') as client:
-
- async def get_ohlc(
- timeframe: float,
- end_dt: datetime | None = None,
- start_dt: datetime | None = None,
-
- ) -> tuple[
- np.ndarray,
- datetime, # start
- datetime, # end
- ]:
- if timeframe != 60:
- raise DataUnavailable('Only 1m bars are supported')
-
- array = await client.bars(
- symbol,
- start_dt=start_dt,
- end_dt=end_dt,
- )
- times = array['time']
- if (
- end_dt is None
- ):
- inow = round(time.time())
- if (inow - times[-1]) > 60:
- await tractor.breakpoint()
-
- start_dt = from_timestamp(times[0])
- end_dt = from_timestamp(times[-1])
-
- return array, start_dt, end_dt
-
- yield get_ohlc, {'erlangs': 3, 'rate': 3}
-
-
-@async_lifo_cache()
-async def get_mkt_info(
- fqme: str,
-
-) -> tuple[MktPair, Pair]:
-
- async with open_cached_client('binance') as client:
-
- pair: Pair = await client.exch_info(fqme.upper())
- mkt = MktPair(
- dst=Asset(
- name=pair.baseAsset,
- atype='crypto',
- tx_tick=digits_to_dec(pair.baseAssetPrecision),
- ),
- src=Asset(
- name=pair.quoteAsset,
- atype='crypto',
- tx_tick=digits_to_dec(pair.quoteAssetPrecision),
- ),
- price_tick=pair.price_tick,
- size_tick=pair.size_tick,
- bs_mktid=pair.symbol,
- broker='binance',
- )
- both = mkt, pair
- return both
-
-
-async def stream_quotes(
-
- send_chan: trio.abc.SendChannel,
- symbols: list[str],
- feed_is_live: trio.Event,
- loglevel: str = None,
-
- # startup sync
- task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
-
-) -> None:
- # XXX: required to propagate ``tractor`` loglevel to piker logging
- get_console_log(loglevel or tractor.current_actor().loglevel)
-
- async with (
- send_chan as send_chan,
- ):
- init_msgs: list[FeedInit] = []
- for sym in symbols:
- mkt, pair = await get_mkt_info(sym)
-
- # build out init msgs according to latest spec
- init_msgs.append(
- FeedInit(mkt_info=mkt)
- )
-
- iter_subids = itertools.count()
-
- @acm
- async def subscribe(ws: NoBsWs):
- # setup subs
-
- subid: int = next(iter_subids)
-
- # trade data (aka L1)
- # https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker
- l1_sub = make_sub(symbols, 'bookTicker', subid)
- await ws.send_msg(l1_sub)
-
- # aggregate (each order clear by taker **not** by maker)
- # trades data:
- # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
- agg_trades_sub = make_sub(symbols, 'aggTrade', subid)
- await ws.send_msg(agg_trades_sub)
-
- # might get ack from ws server, or maybe some
- # other msg still in transit..
- res = await ws.recv_msg()
- subid: str | None = res.get('id')
- if subid:
- assert res['id'] == subid
-
- yield
-
- subs = []
- for sym in symbols:
- subs.append("{sym}@aggTrade")
- subs.append("{sym}@bookTicker")
-
- # unsub from all pairs on teardown
- if ws.connected():
- await ws.send_msg({
- "method": "UNSUBSCRIBE",
- "params": subs,
- "id": subid,
- })
-
- # XXX: do we need to ack the unsub?
- # await ws.recv_msg()
-
- async with (
- open_autorecon_ws(
- # XXX: see api docs which show diff addr?
- # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information
- # 'wss://ws-api.binance.com:443/ws-api/v3',
- 'wss://stream.binance.com/ws',
- fixture=subscribe,
- ) as ws,
-
- # avoid stream-gen closure from breaking trio..
- aclosing(stream_messages(ws)) as msg_gen,
- ):
- typ, quote = await anext(msg_gen)
-
- # pull a first quote and deliver
- while typ != 'trade':
- typ, quote = await anext(msg_gen)
-
- task_status.started((init_msgs, quote))
-
- # signal to caller feed is ready for consumption
- feed_is_live.set()
-
- # import time
- # last = time.time()
-
- # start streaming
- async for typ, msg in msg_gen:
-
- # period = time.time() - last
- # hz = 1/period if period else float('inf')
- # if hz > 60:
- # log.info(f'Binance quotez : {hz}')
- topic = msg['symbol'].lower()
- await send_chan.send({topic: msg})
- # last = time.time()
-
-
-async def handle_order_requests(
- ems_order_stream: tractor.MsgStream
-) -> None:
- async with open_cached_client('binance') as client:
- async for request_msg in ems_order_stream:
- log.info(f'Received order request {request_msg}')
-
- action = request_msg['action']
-
- if action in {'buy', 'sell'}:
- # validate
- order = BrokerdOrder(**request_msg)
-
- # call our client api to submit the order
- reqid = await client.submit_limit(
- order.symbol,
- order.action,
- order.size,
- order.price,
- oid=order.oid
- )
-
- # deliver ack that order has been submitted to broker routing
- await ems_order_stream.send(
- BrokerdOrderAck(
- # ems order request id
- oid=order.oid,
- # broker specific request id
- reqid=reqid,
- time_ns=time.time_ns(),
- ).dict()
- )
-
- elif action == 'cancel':
- msg = BrokerdCancel(**request_msg)
-
- await client.submit_cancel(msg.symbol, msg.reqid)
-
- else:
- log.error(f'Unknown order command: {request_msg}')
-
-
-@tractor.context
-async def trades_dialogue(
- ctx: tractor.Context,
- loglevel: str = None
-
-) -> AsyncIterator[dict[str, Any]]:
-
- async with open_cached_client('binance') as client:
- if not client.api_key:
- await ctx.started('paper')
- return
-
- # table: PpTable
- # ledger: TransactionLedger
-
- # TODO: load pps and accounts using accounting apis!
- positions: list[BrokerdPosition] = []
- accounts: list[str] = ['binance.default']
- await ctx.started((positions, accounts))
-
- async with (
- ctx.open_stream() as ems_stream,
- trio.open_nursery() as n,
- open_cached_client('binance') as client,
- client.manage_listen_key() as listen_key,
- ):
- n.start_soon(handle_order_requests, ems_stream)
- # await trio.sleep_forever()
-
- async with open_autorecon_ws(
- f'wss://stream.binance.com:9443/ws/{listen_key}',
- ) as ws:
- event = await ws.recv_msg()
-
- # https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
- if event.get('e') == 'executionReport':
-
- oid: str = event.get('c')
- side: str = event.get('S').lower()
- status: str = event.get('X')
- order_qty: float = float(event.get('q'))
- filled_qty: float = float(event.get('z'))
- cum_transacted_qty: float = float(event.get('Z'))
- price_avg: float = cum_transacted_qty / filled_qty
- broker_time: float = float(event.get('T'))
- commission_amount: float = float(event.get('n'))
- commission_asset: float = event.get('N')
-
- if status == 'TRADE':
- if order_qty == filled_qty:
- msg = BrokerdFill(
- reqid=oid,
- time_ns=time.time_ns(),
- action=side,
- price=price_avg,
- broker_details={
- 'name': 'binance',
- 'commissions': {
- 'amount': commission_amount,
- 'asset': commission_asset
- },
- 'broker_time': broker_time
- },
- broker_time=broker_time
- )
-
- else:
- if status == 'NEW':
- status = 'submitted'
-
- elif status == 'CANCELED':
- status = 'cancelled'
-
- msg = BrokerdStatus(
- reqid=oid,
- time_ns=time.time_ns(),
- status=status,
- filled=filled_qty,
- remaining=order_qty - filled_qty,
- broker_details={'name': 'binance'}
- )
-
- else:
- # XXX: temporary, to catch unhandled msgs
- breakpoint()
-
- await ems_stream.send(msg.dict())
-
-
-@tractor.context
-async def open_symbol_search(
- ctx: tractor.Context,
-) -> Client:
- async with open_cached_client('binance') as client:
-
- # load all symbols locally for fast search
- cache = await client.exch_info()
- await ctx.started()
-
- async with ctx.open_stream() as stream:
-
- async for pattern in stream:
- # results = await client.exch_info(sym=pattern.upper())
-
- matches = fuzzy.extractBests(
- pattern,
- cache,
- score_cutoff=50,
- )
- # repack in dict form
- await stream.send({
- item[0].symbol: item[0]
- for item in matches
- })
diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py
new file mode 100644
index 00000000..53dd7a64
--- /dev/null
+++ b/piker/brokers/binance/broker.py
@@ -0,0 +1,188 @@
+# piker: trading gear for hackers
+# Copyright (C)
+# Guillermo Rodriguez (aka ze jefe)
+# Tyler Goodlet
+# (in stewardship for pikers)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+'''
+Live order control B)
+
+'''
+from __future__ import annotations
+from typing import (
+ Any,
+ AsyncIterator,
+)
+import time
+
+import tractor
+import trio
+
+from piker.brokers._util import (
+ get_logger,
+)
+from piker.data._web_bs import (
+ open_autorecon_ws,
+ NoBsWs,
+)
+from piker._cacheables import (
+ open_cached_client,
+)
+from piker.clearing._messages import (
+ BrokerdOrder,
+ BrokerdOrderAck,
+ BrokerdStatus,
+ BrokerdPosition,
+ BrokerdFill,
+ BrokerdCancel,
+ # BrokerdError,
+)
+
+log = get_logger('piker.brokers.binance')
+
+
+async def handle_order_requests(
+ ems_order_stream: tractor.MsgStream
+) -> None:
+ async with open_cached_client('binance') as client:
+ async for request_msg in ems_order_stream:
+ log.info(f'Received order request {request_msg}')
+
+ action = request_msg['action']
+
+ if action in {'buy', 'sell'}:
+ # validate
+ order = BrokerdOrder(**request_msg)
+
+ # call our client api to submit the order
+ reqid = await client.submit_limit(
+ order.symbol,
+ order.action,
+ order.size,
+ order.price,
+ oid=order.oid
+ )
+
+ # deliver ack that order has been submitted to broker routing
+ await ems_order_stream.send(
+ BrokerdOrderAck(
+ # ems order request id
+ oid=order.oid,
+ # broker specific request id
+ reqid=reqid,
+ time_ns=time.time_ns(),
+ ).dict()
+ )
+
+ elif action == 'cancel':
+ msg = BrokerdCancel(**request_msg)
+
+ await client.submit_cancel(msg.symbol, msg.reqid)
+
+ else:
+ log.error(f'Unknown order command: {request_msg}')
+
+
+@tractor.context
+async def trades_dialogue(
+ ctx: tractor.Context,
+ loglevel: str = None
+
+) -> AsyncIterator[dict[str, Any]]:
+
+ async with open_cached_client('binance') as client:
+ if not client.api_key:
+ await ctx.started('paper')
+ return
+
+ # table: PpTable
+ # ledger: TransactionLedger
+
+ # TODO: load pps and accounts using accounting apis!
+ positions: list[BrokerdPosition] = []
+ accounts: list[str] = ['binance.default']
+ await ctx.started((positions, accounts))
+
+ async with (
+ ctx.open_stream() as ems_stream,
+ trio.open_nursery() as n,
+ open_cached_client('binance') as client,
+ client.manage_listen_key() as listen_key,
+ ):
+ n.start_soon(handle_order_requests, ems_stream)
+ # await trio.sleep_forever()
+
+ ws: NoBsWs
+ async with open_autorecon_ws(
+ f'wss://stream.binance.com:9443/ws/{listen_key}',
+ ) as ws:
+ event = await ws.recv_msg()
+
+ # https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update
+ if event.get('e') == 'executionReport':
+
+ oid: str = event.get('c')
+ side: str = event.get('S').lower()
+ status: str = event.get('X')
+ order_qty: float = float(event.get('q'))
+ filled_qty: float = float(event.get('z'))
+ cum_transacted_qty: float = float(event.get('Z'))
+ price_avg: float = cum_transacted_qty / filled_qty
+ broker_time: float = float(event.get('T'))
+ commission_amount: float = float(event.get('n'))
+ commission_asset: float = event.get('N')
+
+ if status == 'TRADE':
+ if order_qty == filled_qty:
+ msg = BrokerdFill(
+ reqid=oid,
+ time_ns=time.time_ns(),
+ action=side,
+ price=price_avg,
+ broker_details={
+ 'name': 'binance',
+ 'commissions': {
+ 'amount': commission_amount,
+ 'asset': commission_asset
+ },
+ 'broker_time': broker_time
+ },
+ broker_time=broker_time
+ )
+
+ else:
+ if status == 'NEW':
+ status = 'submitted'
+
+ elif status == 'CANCELED':
+ status = 'cancelled'
+
+ msg = BrokerdStatus(
+ reqid=oid,
+ time_ns=time.time_ns(),
+ status=status,
+ filled=filled_qty,
+ remaining=order_qty - filled_qty,
+ broker_details={'name': 'binance'}
+ )
+
+ else:
+ # XXX: temporary, to catch unhandled msgs
+ breakpoint()
+
+ await ems_stream.send(msg.dict())
+
+
diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py
new file mode 100644
index 00000000..9ecda184
--- /dev/null
+++ b/piker/brokers/binance/feed.py
@@ -0,0 +1,414 @@
+# piker: trading gear for hackers
+# Copyright (C) Tyler Goodlet (in stewardship for pikers)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+'''
+Real-time and historical data feed endpoints.
+
+'''
+from __future__ import annotations
+from contextlib import (
+ asynccontextmanager as acm,
+ aclosing,
+)
+from datetime import datetime
+import itertools
+from typing import (
+ Any,
+ AsyncGenerator,
+ Callable,
+)
+import time
+
+import trio
+from trio_typing import TaskStatus
+from pendulum import (
+ from_timestamp,
+)
+from fuzzywuzzy import process as fuzzy
+import numpy as np
+import tractor
+
+from piker._cacheables import (
+ async_lifo_cache,
+ open_cached_client,
+)
+from piker.accounting._mktinfo import (
+ Asset,
+ MktPair,
+ digits_to_dec,
+)
+from piker.data.types import Struct
+from piker.data.validate import FeedInit
+from piker.data._web_bs import (
+ open_autorecon_ws,
+ NoBsWs,
+)
+from piker.brokers._util import (
+ DataUnavailable,
+ get_logger,
+ get_console_log,
+)
+
+from .api import (
+ Client,
+ Pair,
+)
+
+log = get_logger('piker.brokers.binance')
+
+
+class L1(Struct):
+ # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
+
+ update_id: int
+ sym: str
+
+ bid: float
+ bsize: float
+ ask: float
+ asize: float
+
+
+# validation type
+class AggTrade(Struct, frozen=True):
+ e: str # Event type
+ E: int # Event time
+ s: str # Symbol
+ a: int # Aggregate trade ID
+ p: float # Price
+ q: float # Quantity
+ f: int # First trade ID
+ l: int # noqa Last trade ID
+ T: int # Trade time
+ m: bool # Is the buyer the market maker?
+ M: bool # Ignore
+
+
+async def stream_messages(
+ ws: NoBsWs,
+) -> AsyncGenerator[NoBsWs, dict]:
+
+ # TODO: match syntax here!
+ msg: dict[str, Any]
+ async for msg in ws:
+ match msg:
+ # for l1 streams binance doesn't add an event type field so
+ # identify those messages by matching keys
+ # https://binance-docs.github.io/apidocs/spot/en/#individual-symbol-book-ticker-streams
+ case {
+ # NOTE: this is never an old value it seems, so
+ # they are always sending real L1 spread updates.
+ 'u': upid, # update id
+ 's': sym,
+ 'b': bid,
+ 'B': bsize,
+ 'a': ask,
+ 'A': asize,
+ }:
+ # TODO: it would be super nice to have a `L1` piker type
+ # which "renders" incremental tick updates from a packed
+ # msg-struct:
+ # - backend msgs after packed into the type such that we
+ # can reduce IPC usage but without each backend having
+ # to do that incremental update logic manually B)
+ # - would it maybe be more efficient to use this instead?
+ # https://binance-docs.github.io/apidocs/spot/en/#diff-depth-stream
+ l1 = L1(
+ update_id=upid,
+ sym=sym,
+ bid=bid,
+ bsize=bsize,
+ ask=ask,
+ asize=asize,
+ )
+ l1.typecast()
+
+ # repack into piker's tick-quote format
+ yield 'l1', {
+ 'symbol': l1.sym,
+ 'ticks': [
+ {
+ 'type': 'bid',
+ 'price': l1.bid,
+ 'size': l1.bsize,
+ },
+ {
+ 'type': 'bsize',
+ 'price': l1.bid,
+ 'size': l1.bsize,
+ },
+ {
+ 'type': 'ask',
+ 'price': l1.ask,
+ 'size': l1.asize,
+ },
+ {
+ 'type': 'asize',
+ 'price': l1.ask,
+ 'size': l1.asize,
+ }
+ ]
+ }
+
+ # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
+ case {
+ 'e': 'aggTrade',
+ }:
+ # NOTE: this is purely for a definition,
+ # ``msgspec.Struct`` does not runtime-validate until you
+ # decode/encode, see:
+ # https://jcristharif.com/msgspec/structs.html#type-validation
+ msg = AggTrade(**msg) # TODO: should we .copy() ?
+ piker_quote: dict = {
+ 'symbol': msg.s,
+ 'last': float(msg.p),
+ 'brokerd_ts': time.time(),
+ 'ticks': [{
+ 'type': 'trade',
+ 'price': float(msg.p),
+ 'size': float(msg.q),
+ 'broker_ts': msg.T,
+ }],
+ }
+ yield 'trade', piker_quote
+
+
+def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]:
+ '''
+ Create a request subscription packet dict.
+
+ - spot:
+ https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams
+
+ - futes:
+ https://binance-docs.github.io/apidocs/futures/en/#websocket-market-streams
+
+ '''
+ return {
+ 'method': 'SUBSCRIBE',
+ 'params': [
+ f'{pair.lower()}@{sub_name}'
+ for pair in pairs
+ ],
+ 'id': uid
+ }
+
+
+@acm
+async def open_history_client(
+ mkt: MktPair,
+
+) -> tuple[Callable, int]:
+
+ symbol: str = mkt.bs_fqme
+
+ # TODO implement history getter for the new storage layer.
+ async with open_cached_client('binance') as client:
+
+ async def get_ohlc(
+ timeframe: float,
+ end_dt: datetime | None = None,
+ start_dt: datetime | None = None,
+
+ ) -> tuple[
+ np.ndarray,
+ datetime, # start
+ datetime, # end
+ ]:
+ if timeframe != 60:
+ raise DataUnavailable('Only 1m bars are supported')
+
+ array = await client.bars(
+ symbol,
+ start_dt=start_dt,
+ end_dt=end_dt,
+ )
+ times = array['time']
+ if (
+ end_dt is None
+ ):
+ inow = round(time.time())
+ if (inow - times[-1]) > 60:
+ await tractor.breakpoint()
+
+ start_dt = from_timestamp(times[0])
+ end_dt = from_timestamp(times[-1])
+
+ return array, start_dt, end_dt
+
+ yield get_ohlc, {'erlangs': 3, 'rate': 3}
+
+
+@async_lifo_cache()
+async def get_mkt_info(
+ fqme: str,
+
+) -> tuple[MktPair, Pair]:
+
+ async with open_cached_client('binance') as client:
+
+ pair: Pair = await client.exch_info(fqme.upper())
+ mkt = MktPair(
+ dst=Asset(
+ name=pair.baseAsset,
+ atype='crypto',
+ tx_tick=digits_to_dec(pair.baseAssetPrecision),
+ ),
+ src=Asset(
+ name=pair.quoteAsset,
+ atype='crypto',
+ tx_tick=digits_to_dec(pair.quoteAssetPrecision),
+ ),
+ price_tick=pair.price_tick,
+ size_tick=pair.size_tick,
+ bs_mktid=pair.symbol,
+ broker='binance',
+ )
+ both = mkt, pair
+ return both
+
+
+async def stream_quotes(
+
+ send_chan: trio.abc.SendChannel,
+ symbols: list[str],
+ feed_is_live: trio.Event,
+ loglevel: str = None,
+
+ # startup sync
+ task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED,
+
+) -> None:
+ # XXX: required to propagate ``tractor`` loglevel to piker logging
+ get_console_log(loglevel or tractor.current_actor().loglevel)
+
+ async with (
+ send_chan as send_chan,
+ ):
+ init_msgs: list[FeedInit] = []
+ for sym in symbols:
+ mkt, pair = await get_mkt_info(sym)
+
+ # build out init msgs according to latest spec
+ init_msgs.append(
+ FeedInit(mkt_info=mkt)
+ )
+
+ iter_subids = itertools.count()
+
+ @acm
+ async def subscribe(ws: NoBsWs):
+ # setup subs
+
+ subid: int = next(iter_subids)
+
+ # trade data (aka L1)
+ # https://binance-docs.github.io/apidocs/spot/en/#symbol-order-book-ticker
+ l1_sub = make_sub(symbols, 'bookTicker', subid)
+ await ws.send_msg(l1_sub)
+
+ # aggregate (each order clear by taker **not** by maker)
+ # trades data:
+ # https://binance-docs.github.io/apidocs/spot/en/#aggregate-trade-streams
+ agg_trades_sub = make_sub(symbols, 'aggTrade', subid)
+ await ws.send_msg(agg_trades_sub)
+
+ # might get ack from ws server, or maybe some
+ # other msg still in transit..
+ res = await ws.recv_msg()
+ subid: str | None = res.get('id')
+ if subid:
+ assert res['id'] == subid
+
+ yield
+
+ subs = []
+ for sym in symbols:
+ subs.append("{sym}@aggTrade")
+ subs.append("{sym}@bookTicker")
+
+ # unsub from all pairs on teardown
+ if ws.connected():
+ await ws.send_msg({
+ "method": "UNSUBSCRIBE",
+ "params": subs,
+ "id": subid,
+ })
+
+ # XXX: do we need to ack the unsub?
+ # await ws.recv_msg()
+
+ async with (
+ open_autorecon_ws(
+ # XXX: see api docs which show diff addr?
+ # https://developers.binance.com/docs/binance-trading-api/websocket_api#general-api-information
+ # 'wss://ws-api.binance.com:443/ws-api/v3',
+ 'wss://stream.binance.com/ws',
+ fixture=subscribe,
+ ) as ws,
+
+ # avoid stream-gen closure from breaking trio..
+ aclosing(stream_messages(ws)) as msg_gen,
+ ):
+ typ, quote = await anext(msg_gen)
+
+ # pull a first quote and deliver
+ while typ != 'trade':
+ typ, quote = await anext(msg_gen)
+
+ task_status.started((init_msgs, quote))
+
+ # signal to caller feed is ready for consumption
+ feed_is_live.set()
+
+ # import time
+ # last = time.time()
+
+ # start streaming
+ async for typ, msg in msg_gen:
+
+ # period = time.time() - last
+ # hz = 1/period if period else float('inf')
+ # if hz > 60:
+ # log.info(f'Binance quotez : {hz}')
+ topic = msg['symbol'].lower()
+ await send_chan.send({topic: msg})
+ # last = time.time()
+@tractor.context
+async def open_symbol_search(
+ ctx: tractor.Context,
+) -> Client:
+ async with open_cached_client('binance') as client:
+
+ # load all symbols locally for fast search
+ cache = await client.exch_info()
+ await ctx.started()
+
+ async with ctx.open_stream() as stream:
+
+ async for pattern in stream:
+ # results = await client.exch_info(sym=pattern.upper())
+
+ matches = fuzzy.extractBests(
+ pattern,
+ cache,
+ score_cutoff=50,
+ )
+ # repack in dict form
+ await stream.send({
+ item[0].symbol: item[0]
+ for item in matches
+ })