From 572badb4d847155007c603cfd71231def6c3a18b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 17 Jun 2023 18:00:23 -0400 Subject: [PATCH] Add full real-time position update support B) There was one trick which was that it seems that binance will often send the account/position update event over the user stream *before* the actual clearing (aka FILLED) order update event, so make sure we put an entry in the `dialogs: OrderDialogs` as soon as an order request comes in such that even if the account update arrives first the `BrokerdPosition` msg can be relayed without delay / order event order considerations. --- piker/brokers/binance/broker.py | 103 +++++++++++++++++++++----------- 1 file changed, 69 insertions(+), 34 deletions(-) diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index d43f2a99..10248f9f 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -174,21 +174,23 @@ async def handle_order_requests( # TODO: figure out what special params we have to send? # https://binance-docs.github.io/apidocs/futures/en/#modify-order-trade - # lookup the binance-native symbol - # bs_mktid: str = client._pairs[order.symbol.upper()].symbol + # track latest request state such that map + # lookups start at the most recent msg and then + # scan reverse-chronologically. + dialogs.add_msg(oid, msg) + + # XXX: ACK the request **immediately** before sending + # the api side request to ensure the ems maps the oid -> + # reqid correctly! + resp = BrokerdOrderAck( + oid=oid, # ems order request id + reqid=oid, # our custom int mapping + account='binance', # piker account + ) + await ems_order_stream.send(resp) # call our client api to submit the order try: - # XXX: ACK the request **immediately** before sending - # the api side request to ensure the ems maps the oid -> - # reqid correctly! - resp = BrokerdOrderAck( - oid=oid, # ems order request id - reqid=oid, # our custom int mapping - account='binance', # piker account - ) - await ems_order_stream.send(resp) - reqid = await client.submit_limit( symbol=order.symbol, side=order.action, @@ -201,11 +203,6 @@ async def handle_order_requests( # assert reqid == order.oid dids[order.oid] = reqid - # track latest request state such that map - # lookups start at the most recent msg and then - # scan reverse-chronologically. - dialogs.add_msg(oid, msg) - except BrokerError as be: await ems_order_stream.send( BrokerdError( @@ -347,9 +344,9 @@ async def open_trade_dialog( bs_mktid: str = entry['symbol'] entry_size: float = float(entry['positionAmt']) - pair: Pair | None + pair: Pair | None = client._venue2pairs[venue].get(bs_mktid) if ( - pair := client._venue2pairs[venue].get(bs_mktid) + pair and entry_size > 0 ): entry_price: float = float(entry['entryPrice']) @@ -406,6 +403,8 @@ async def open_trade_dialog( ) tn.start_soon( handle_order_updates, + venue, + client, ems_stream, wss, dialogs, @@ -416,21 +415,12 @@ async def open_trade_dialog( async def handle_order_updates( + venue: str, + client: Client, ems_stream: tractor.MsgStream, wss: NoBsWs, dialogs: OrderDialogs, - # apiflows: dict[int, ChainMap[dict[str, dict]]], - # ids: bidict[str, int], - # reqids2txids: bidict[int, str], - - # table: PpTable, - # ledger_trans: dict[str, Transaction], - - # acctid: str, - # acc_name: str, - # token: str, - ) -> None: ''' Main msg handling loop for all things order management. @@ -443,9 +433,6 @@ async def handle_order_updates( log.info(f'Rx USERSTREAM msg:\n{pformat(msg)}') match msg: - # TODO: - # POSITION update - # ORDER update # spot: https://binance-docs.github.io/apidocs/spot/en/#payload-balance-update # futes: https://binance-docs.github.io/apidocs/futures/en/#event-order-update @@ -488,7 +475,6 @@ async def handle_order_updates( 'e': 'ORDER_TRADE_UPDATE', 'T': int(epoch_ms), 'o': { - 'i': reqid, 's': bs_mktid, # XXX NOTE XXX see special ids for market @@ -499,6 +485,7 @@ async def handle_order_updates( # // "settlement_autoclose-": settlement order # for delisting or delivery 'c': oid, + # 'i': reqid, # binance internal int id # prices 'a': submit_price, @@ -579,6 +566,8 @@ async def handle_order_updates( status = 'canceled' del dialogs._dialogs[oid] + # case 'TRADE': + case _: status = status.lower() @@ -599,6 +588,52 @@ async def handle_order_updates( ) await ems_stream.send(resp) + # ACCOUNT and POSITION update B) + # { + # 'E': 1687036749218, + # 'e': 'ACCOUNT_UPDATE' + # 'T': 1687036749215, + # 'a': {'B': [{'a': 'USDT', + # 'bc': '0', + # 'cw': '1267.48920735', + # 'wb': '1410.90245576'}], + # 'P': [{'cr': '-3292.10973007', + # 'ep': '26349.90000', + # 'iw': '143.41324841', + # 'ma': 'USDT', + # 'mt': 'isolated', + # 'pa': '0.038', + # 'ps': 'BOTH', + # 's': 'BTCUSDT', + # 'up': '5.17555453'}], + # 'm': 'ORDER'}, + # } + case { + 'T': int(epoch_ms), + 'e': 'ACCOUNT_UPDATE', + 'a': { + 'P': [{ + 's': bs_mktid, + 'pa': pos_amount, + 'ep': entry_price, + }], + }, + }: + # real-time relay position updates back to EMS + pair: Pair | None = client._venue2pairs[venue].get(bs_mktid) + ppmsg = BrokerdPosition( + broker='binance', + account='binance.usdtm', + + # TODO: maybe we should be passing back + # a `MktPair` here? + symbol=pair.bs_fqme.lower() + '.binance', + + size=float(pos_amount), + avg_price=float(entry_price), + ) + await ems_stream.send(ppmsg) + case _: log.warning( 'Unhandled event:\n'