diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 9f384166..88fa183d 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -42,6 +42,7 @@ from ib_insync.contract import ( from ib_insync.order import ( Trade, OrderStatus, + Order, ) from ib_insync.objects import ( Fill, @@ -439,7 +440,6 @@ async def trades_dialogue( # we might also want to delegate a specific actor for # ledger writing / reading for speed? async with ( - # trio.open_nursery() as nurse, open_client_proxies() as (proxies, aioclients), ): # Open a trade ledgers stack for appending trade records over @@ -468,6 +468,41 @@ async def trades_dialogue( client = aioclients[account] + trades: list[Trade] = client.ib.openTrades() + order_msgs = [] + for trade in trades: + + order = trade.order + quant = trade.order.totalQuantity + size = { + 'SELL': -1, + 'BUY': 1, + }[order.action] * quant + fqsn, _ = con2fqsn(trade.contract) + + # TODO: maybe embed a ``BrokerdOrder`` instead + # since then we can directly load it on the client + # side in the order mode loop? + msg = BrokerdStatus( + reqid=order.orderId, + time_ns=time.time_ns(), + account=accounts_def.inverse[order.account], + status='submitted', + size=size, + price=order.lmtPrice, + filled=0, + reason='Existing live order', + + # this seems to not be necessarily up to date in the + # execDetails event.. so we have to send it here I guess? + remaining=quant, + broker_details={ + 'name': 'ib', + 'fqsn': fqsn, + }, + ) + order_msgs.append(msg) + # process pp value reported from ib's system. we only use these # to cross-check sizing since average pricing on their end uses # the so called (bs) "FIFO" style which more or less results in @@ -523,7 +558,12 @@ async def trades_dialogue( table.update_from_trans(trans) updated = table.update_from_trans(trans) - assert msg.size == pp.size, 'WTF' + if msg.size != pp.size: + log.error( + 'Position mismatch {pp.symbol.front_fqsn()}:\n' + f'ib: {msg.size}\n' + f'piker: {pp.size}\n' + ) active_pps, closed_pps = table.dump_active() @@ -575,6 +615,10 @@ async def trades_dialogue( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): + # relay existing open orders to ems + for msg in order_msgs: + await ems_stream.send(msg) + trade_event_stream = await n.start(open_trade_event_stream) clients.append((client, trade_event_stream))