diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 503833db..c1dd568b 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -1015,11 +1015,10 @@ async def handle_order_updates( fill_msg = BrokerdFill( time_ns=time.time_ns(), reqid=reqid, - - # action=action, # just use size value - # for now? - size=vlm, - price=price, + # just use size value for now? + # action=action, + size=float(vlm), + price=float(price), # TODO: maybe capture more msg data # i.e fees? diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index e67d204c..6cc1cb7f 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -61,6 +61,7 @@ class Pair(Struct): quote: str # asset id of quote component lot: str # volume lot size + cost_decimals: int pair_decimals: int # scaling decimal places for pair lot_decimals: int # scaling decimal places for volume @@ -342,8 +343,8 @@ async def stream_quotes( # transform to upper since piker style is always lower sym = sym.upper() - - si = Pair(**await client.symbol_info(sym)) # validation + sym_info = await client.symbol_info(sym) + si = Pair(**sym_info) # validation syminfo = si.to_dict() syminfo['price_tick_size'] = 1 / 10**si.pair_decimals syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 045134bc..23b50ddf 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -18,7 +18,11 @@ In da suit parlances: "Execution management systems" """ -from collections import defaultdict, ChainMap +from __future__ import annotations +from collections import ( + defaultdict, + # ChainMap, +) from contextlib import asynccontextmanager from math import isnan from pprint import pformat @@ -134,12 +138,6 @@ class _DarkBook(Struct): # _ems_entries: dict[str, str] = {} _active: dict = {} - # mapping of ems dialog ids to msg flow history - _msgflows: defaultdict[ - int, - ChainMap[dict[str, dict]], - ] = defaultdict(ChainMap) - _ems2brokerd_ids: dict[str, str] = bidict() @@ -152,8 +150,8 @@ _DEFAULT_SIZE: float = 1.0 async def clear_dark_triggers( + router: Router, brokerd_orders_stream: tractor.MsgStream, - ems_client_order_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa broker: str, fqsn: str, @@ -288,15 +286,10 @@ async def clear_dark_triggers( book._active[oid] = status # send response to client-side - try: - await ems_client_order_stream.send(status) - except ( - trio.ClosedResourceError, - ): - log.warning( - f'{ems_client_order_stream} stream broke?' - ) - break + await router.client_broadcast( + fqsn, + status, + ) else: # condition scan loop complete log.debug(f'execs are {execs}') @@ -310,7 +303,7 @@ class TradesRelay(Struct): # for now we keep only a single connection open with # each ``brokerd`` for simplicity. - brokerd_dialogue: tractor.MsgStream + brokerd_stream: tractor.MsgStream # map of symbols to dicts of accounts to pp msgs positions: dict[ @@ -342,13 +335,31 @@ class Router(Struct): # order id to client stream map clients: set[tractor.MsgStream] = set() - dialogues: dict[ - str, - list[tractor.MsgStream] - ] = {} - # brokername to trades-dialogues streams with ``brokerd`` actors - relays: dict[str, TradesRelay] = {} + # sets of clients mapped from subscription keys + subscribers: defaultdict[ + str, # sub key, default fqsn + set[tractor.MsgStream], # unique client streams + ] = defaultdict(set) + + # sets of clients dynamically registered for specific + # order flows based on subscription config. + dialogs: defaultdict[ + str, # ems uuid (oid) + set[tractor.MsgStream] # client side msg stream + ] = defaultdict(set) + + # TODO: mapping of ems dialog ids to msg flow history + # msgflows: defaultdict[ + # str, + # ChainMap[dict[str, dict]], + # ] = defaultdict(ChainMap) + + # brokername to trades-dialogs streams with ``brokerd`` actors + relays: dict[ + str, # broker name + TradesRelay, + ] = {} def get_dark_book( self, @@ -358,6 +369,20 @@ class Router(Struct): return self.books.setdefault(brokername, _DarkBook(brokername)) + def get_subs( + self, + oid: str, + + ) -> set[tractor.MsgStream]: + ''' + Deliver list of non-closed subscriber client msg streams. + + ''' + return set( + stream for stream in self.dialogs[oid] + if not stream._closed + ) + @asynccontextmanager async def maybe_open_brokerd_trades_dialogue( self, @@ -373,7 +398,8 @@ class Router(Struct): none already exists. ''' - relay: TradesRelay = self.relays.get(feed.mod.name) + broker = feed.mod.name + relay: TradesRelay = self.relays.get(broker) if ( relay is None @@ -387,7 +413,7 @@ class Router(Struct): ): relay = await self.nursery.start( - open_brokerd_trades_dialogue, + open_brokerd_trades_dialog, self, feed, symbol, @@ -395,41 +421,53 @@ class Router(Struct): loglevel, ) + self.nursery.start_soon( + translate_and_relay_brokerd_events, + broker, + relay.brokerd_stream, + self, + ) + relay.consumers += 1 # TODO: get updated positions here? - assert relay.brokerd_dialogue + assert relay.brokerd_stream try: yield relay - finally: # TODO: what exactly needs to be torn down here or # are we just consumer tracking? - relay.consumers -= 1 async def client_broadcast( self, + sub_key: str, msg: dict, ) -> None: - for client_stream in self.clients.copy(): + to_remove: set[tractor.MsgStream] = set() + subs = self.subscribers[sub_key] + for client_stream in subs: try: await client_stream.send(msg) - except( + except ( trio.ClosedResourceError, trio.BrokenResourceError, ): + to_remove.add(client_stream) self.clients.remove(client_stream) log.warning( f'client for {client_stream} was already closed?') + if to_remove: + subs.difference_update(to_remove) + _router: Router = None -async def open_brokerd_trades_dialogue( +async def open_brokerd_trades_dialog( router: Router, feed: Feed, @@ -505,7 +543,7 @@ async def open_brokerd_trades_dialogue( # we cache the relay task and instead of running multiple # tasks (which will result in multiples of the same msg being # relayed for each EMS client) we just register each client - # stream to this single relay loop using _router.dialogues + # stream to this single relay loop in the dialog table. # begin processing order events from the target brokerd backend # by receiving order submission response messages, @@ -532,13 +570,13 @@ async def open_brokerd_trades_dialogue( ).append(msg) relay = TradesRelay( - brokerd_dialogue=brokerd_trades_stream, + brokerd_stream=brokerd_trades_stream, positions=pps, accounts=accounts, consumers=1, ) - _router.relays[broker] = relay + router.relays[broker] = relay # the ems scan loop may be cancelled by the client but we # want to keep the ``brokerd`` dialogue up regardless @@ -550,9 +588,9 @@ async def open_brokerd_trades_dialogue( await trio.sleep_forever() finally: - # parent context must have been closed - # remove from cache so next client will respawn if needed - relay = _router.relays.pop(broker, None) + # parent context must have been closed remove from cache so + # next client will respawn if needed + relay = router.relays.pop(broker, None) if not relay: log.warning(f'Relay for {broker} was already removed!?') @@ -607,8 +645,7 @@ async def translate_and_relay_brokerd_events( ''' book: _DarkBook = router.get_dark_book(broker) relay: TradesRelay = router.relays[broker] - - assert relay.brokerd_dialogue == brokerd_trades_stream + assert relay.brokerd_stream == brokerd_trades_stream brokerd_msg: dict[str, Any] async for brokerd_msg in brokerd_trades_stream: @@ -640,7 +677,7 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - await router.client_broadcast(pos_msg) + await router.client_broadcast(sym, pos_msg) continue # BrokerdOrderAck @@ -707,18 +744,18 @@ async def translate_and_relay_brokerd_events( # some unexpected failure - something we need to think more # about. In most default situations, with composed orders # (ex. brackets), most brokers seem to use a oca policy. - ems_client_order_stream = router.dialogues[oid] + status_msg.resp = 'error' status_msg.brokerd_msg = msg book._active[oid] = status_msg - await ems_client_order_stream.send(status_msg) + + await router.client_broadcast(sym, status_msg) # BrokerdStatus case { 'name': 'status', 'status': status, 'reqid': reqid, # brokerd generated order-request id - } if ( (oid := book._ems2brokerd_ids.inverse.get(reqid)) and status in ( @@ -732,16 +769,16 @@ async def translate_and_relay_brokerd_events( # TODO: maybe pack this into a composite type that # contains both the IPC stream as well the # msg-chain/dialog. - ems_client_order_stream = router.dialogues.get(oid) + ems_client_order_streams = router.get_subs(oid) status_msg = book._active.get(oid) if ( - not ems_client_order_stream + not ems_client_order_streams or not status_msg ): log.warning( - 'Received status for unknown dialog {oid}:\n' - '{fmsg}' + f'Received status for untracked dialog {oid}:\n' + f'{fmsg}' ) continue @@ -759,7 +796,11 @@ async def translate_and_relay_brokerd_events( status_msg.reqid = reqid # THIS LINE IS CRITICAL! status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - await ems_client_order_stream.send(status_msg) + + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) if status == 'closed': log.info(f'Execution for {oid} is complete!') @@ -793,8 +834,6 @@ async def translate_and_relay_brokerd_events( msg = BrokerdFill(**brokerd_msg) log.info(f'Fill for {oid} cleared with:\n{fmsg}') - ems_client_order_stream = router.dialogues[oid] - # XXX: bleh, a fill can come after 'closed' from `ib`? # only send a late fill event we haven't already closed # out the dialog status locally. @@ -803,7 +842,11 @@ async def translate_and_relay_brokerd_events( status_msg.resp = 'fill' status_msg.reqid = reqid status_msg.brokerd_msg = msg - await ems_client_order_stream.send(status_msg) + + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) # ``Status`` containing an embedded order msg which # should be loaded as a "pre-existing open order" from the @@ -855,7 +898,10 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - await router.client_broadcast(status_msg) + await router.client_broadcast( + order.symbol, + status_msg, + ) # don't fall through continue @@ -884,11 +930,15 @@ async def translate_and_relay_brokerd_events( oid = book._ems2brokerd_ids.inverse.get(reqid) msg = f'Unhandled broker status for dialog {reqid}:\n' if oid: - status_msg = book._active[oid] - msg += ( - f'last status msg: {pformat(status_msg)}\n\n' - f'this msg:{fmsg}\n' - ) + status_msg = book._active.get(oid) + # status msg may not have been set yet or popped? + # NOTE: have seen a key error here on kraken + # clearable limits.. + if status_msg: + msg += ( + f'last status msg: {pformat(status_msg)}\n\n' + f'this msg:{fmsg}\n' + ) log.warning(msg) @@ -899,26 +949,27 @@ async def translate_and_relay_brokerd_events( # if status_msg is not None: # del status_msg - # TODO: do we want this to keep things cleaned up? - # it might require a special status from brokerd to affirm the - # flow is complete? - # router.dialogues.pop(oid) - async def process_client_order_cmds( client_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream, - symbol: str, + fqsn: str, feed: Feed, dark_book: _DarkBook, router: Router, ) -> None: + ''' + Client-dialog request loop: accept order requests and deliver + initial status msg responses to subscribed clients. - client_dialogues = router.dialogues + This task-loop handles both management of dark triggered orders and + alerts by inserting them into the "dark book"-table as well as + submitting live orders immediately if requested by the client. + ''' # cmd: dict async for cmd in client_order_stream: log.info(f'Received order cmd:\n{pformat(cmd)}') @@ -926,11 +977,17 @@ async def process_client_order_cmds( # CAWT DAMN we need struct support! oid = str(cmd['oid']) - # register this stream as an active dialogue for this order id - # such that translated message from the brokerd backend can be - # routed (relayed) to **just** that client stream (and in theory - # others who are registered for such order affiliated msgs). - client_dialogues[oid] = client_order_stream + # register this stream as an active order dialog (msg flow) for + # this order id such that translated message from the brokerd + # backend can be routed and relayed to subscribed clients. + subs = router.dialogs[oid] + + # add all subscribed clients for this fqsn (should eventually be + # a more generalize subscription system) to received order msg + # updates (and thus show stuff in the UI). + subs.add(client_order_stream) + subs.update(router.subscribers[fqsn]) + reqid = dark_book._ems2brokerd_ids.inverse.get(oid) # any dark/live status which is current @@ -942,7 +999,7 @@ async def process_client_order_cmds( 'action': 'cancel', 'oid': oid, } if ( - (status := dark_book._active.get(oid)) + status and status.resp in ('open', 'pending') ): reqid = status.reqid @@ -978,11 +1035,11 @@ async def process_client_order_cmds( 'action': 'cancel', 'oid': oid, } if ( - status and status.resp == 'dark_open' - # or status and status.req + status + and status.resp == 'dark_open' ): # remove from dark book clearing - entry = dark_book.orders[symbol].pop(oid, None) + entry = dark_book.orders[fqsn].pop(oid, None) if entry: ( pred, @@ -997,13 +1054,18 @@ async def process_client_order_cmds( status.resp = 'canceled' status.req = cmd - await client_order_stream.send(status) - # de-register this client dialogue - router.dialogues.pop(oid) + await router.client_broadcast( + fqsn, + status, + ) + + # de-register this order dialogue from all clients + router.dialogs[oid].clear() + router.dialogs.pop(oid) dark_book._active.pop(oid) else: - log.exception(f'No dark order for {symbol}?') + log.exception(f'No dark order for {fqsn}?') # TODO: eventually we should be receiving # this struct on the wire unpacked in a scoped protocol @@ -1030,8 +1092,8 @@ async def process_client_order_cmds( if status is not None: # if we already had a broker order id then # this is likely an order update commmand. - log.info(f"Modifying live {broker} order: {reqid}") reqid = status.reqid + log.info(f"Modifying live {broker} order: {reqid}") status.req = req status.resp = 'pending' @@ -1162,7 +1224,12 @@ async def process_client_order_cmds( src='dark', ) dark_book._active[oid] = status - await client_order_stream.send(status) + + # broadcast status to all subscribed clients + await router.client_broadcast( + fqsn, + status, + ) @tractor.context @@ -1188,20 +1255,26 @@ async def _emsd_main( received in a stream from that client actor and then responses are streamed back up to the original calling task in the same client. - The primary ``emsd`` task tree is: + The primary ``emsd`` task trees are: - - ``_emsd_main()``: - sets up brokerd feed, order feed with ems client, trades dialogue with - brokderd trading api. - | - - ``clear_dark_triggers()``: - run (dark order) conditions on inputs and trigger brokerd "live" - order submissions. + - ``_setup_persistent_emsd()``: + is the ``emsd`` actor's primary root task which sets up an + actor-global ``Router`` instance and starts a relay loop task + which lives until the backend broker is shutdown or the ems is + terminated. | - (maybe) ``translate_and_relay_brokerd_events()``: accept normalized trades responses from brokerd, process and relay to ems client(s); this is a effectively a "trade event reponse" proxy-broker. + + - ``_emsd_main()``: + attaches a brokerd real-time quote feed and trades dialogue with + brokderd trading api for every connecting client. + | + - ``clear_dark_triggers()``: + run (dark order) conditions on inputs and trigger brokerd "live" + order submissions. | - ``process_client_order_cmds()``: accepts order cmds from requesting clients, registers dark orders and @@ -1248,11 +1321,10 @@ async def _emsd_main( loglevel, ) as relay, - trio.open_nursery() as n, ): - brokerd_stream = relay.brokerd_dialogue # .clone() + brokerd_stream = relay.brokerd_stream # signal to client that we're started and deliver # all known pps and accounts for this ``brokerd``. @@ -1264,26 +1336,24 @@ async def _emsd_main( # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates - async with ems_ctx.open_stream() as ems_client_order_stream: + async with ems_ctx.open_stream() as client_stream: - # register the client side before startingn the + # register the client side before starting the # brokerd-side relay task to ensure the client is # delivered all exisiting open orders on startup. - _router.clients.add(ems_client_order_stream) + _router.clients.add(client_stream) - n.start_soon( - translate_and_relay_brokerd_events, - broker, - brokerd_stream, - _router, - ) + # TODO: instead of by fqsn we need a subscription + # system/schema here to limit what each new client is + # allowed to see in terms of broadcasted order flow + # updates per dialog. + _router.subscribers[fqsn].add(client_stream) # trigger scan and exec loop n.start_soon( clear_dark_triggers, - + _router, brokerd_stream, - ems_client_order_stream, quote_stream, broker, fqsn, # form: ... @@ -1291,16 +1361,11 @@ async def _emsd_main( ) # start inbound (from attached client) order request processing + # main entrypoint, run here until cancelled. try: - - # main entrypoint, run here until cancelled. await process_client_order_cmds( - - ems_client_order_stream, - - # relay.brokerd_dialogue, + client_stream, brokerd_stream, - fqsn, feed, dark_book, @@ -1310,28 +1375,27 @@ async def _emsd_main( finally: # try to remove client from "registry" try: - _router.clients.remove(ems_client_order_stream) + _router.clients.remove(client_stream) except KeyError: log.warning( - f'Stream {ems_client_order_stream._ctx.chan.uid}' + f'Stream {client_stream._ctx.chan.uid}' ' was already dropped?' ) - dialogues = _router.dialogues + _router.subscribers[fqsn].remove(client_stream) + dialogs = _router.dialogs + for oid, client_streams in dialogs.items(): + if client_stream in client_streams: + client_streams.remove(client_stream) - for oid, client_stream in dialogues.copy().items(): - - if client_stream == ems_client_order_stream: - - log.warning( - f'client dialogue is being abandoned:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' - ) - dialogues.pop(oid) - - # TODO: for order dialogues left "alive" in + # TODO: for order dialogs left "alive" in # the ems this is where we should allow some # system to take over management. Likely we # want to allow the user to choose what kind # of policy to use (eg. cancel all orders # from client, run some algo, etc.) + if not client_streams: + log.warning( + f'Order dialog is being unmonitored:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' + ) diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index f8fd6937..af666f5a 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -261,10 +261,10 @@ class BrokerdFill(Struct): time_ns: int # order exeuction related - action: str size: float price: float + action: Optional[str] = None broker_details: dict = {} # meta-data (eg. commisions etc.) # brokerd timestamp required for order mode arrow placement on x-axis diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index b74780f5..211a29fc 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -61,13 +61,13 @@ log = get_logger(__name__) class PaperBoi(Struct): - """ - Emulates a broker order client providing the same API and - delivering an order-event response stream but with methods for + ''' + Emulates a broker order client providing approximately the same API + and delivering an order-event response stream but with methods for triggering desired events based on forward testing engine - requirements. + requirements (eg open, closed, fill msgs). - """ + ''' broker: str ems_trades_stream: tractor.MsgStream @@ -207,9 +207,10 @@ class PaperBoi(Struct): remaining: float = 0, ) -> None: - """Pretend to fill a broker order @ price and size. + ''' + Pretend to fill a broker order @ price and size. - """ + ''' # TODO: net latency model await trio.sleep(0.05) fill_time_ns = time.time_ns() @@ -230,6 +231,7 @@ class PaperBoi(Struct): 'name': self.broker + '_paper', }, ) + log.info(f'Fake filling order:\n{fill_msg}') await self.ems_trades_stream.send(fill_msg) self._trade_ledger.update(fill_msg.to_dict()) @@ -336,9 +338,10 @@ async def simulate_fills( return tick_price >= our_price match tick: + + # on an ask queue tick, only clear buy entries case { 'price': tick_price, - # 'type': ('ask' | 'trade' | 'last'), 'type': 'ask', }: client.last_ask = ( @@ -351,9 +354,9 @@ async def simulate_fills( itertools.repeat(buy_on_ask) ) + # on a bid queue tick, only clear sell entries case { 'price': tick_price, - # 'type': ('bid' | 'trade' | 'last'), 'type': 'bid', }: client.last_bid = ( @@ -366,6 +369,10 @@ async def simulate_fills( itertools.repeat(sell_on_bid) ) + # TODO: fix this block, though it definitely + # costs a lot more CPU-wise + # - doesn't seem like clears are happening still on + # "resting" limit orders? case { 'price': tick_price, 'type': ('trade' | 'last'), @@ -390,11 +397,19 @@ async def simulate_fills( iter_entries = interleave() + # NOTE: all other (non-clearable) tick event types + # - we don't want to sping the simulated clear loop + # below unecessarily and further don't want to pop + # simulated live orders prematurely. + case _: + continue + # iterate all potentially clearable book prices # in FIFO order per side. for order_info, pred in iter_entries: (our_price, size, reqid, action) = order_info + # print(order_info) clearable = pred(our_price) if clearable: # pop and retreive order info @@ -469,8 +484,8 @@ async def handle_order_requests( # counter - collision prone..) reqid=reqid, ) + log.info(f'Submitted paper LIMIT {reqid}:\n{order}') - # elif action == 'cancel': case {'action': 'cancel'}: msg = BrokerdCancel(**request_msg) await client.submit_cancel( @@ -532,7 +547,10 @@ async def trades_dialogue( # TODO: load paper positions per broker from .toml config file # and pass as symbol to position data mapping: ``dict[str, dict]`` - await ctx.started((pp_msgs, ['paper'])) + await ctx.started(( + pp_msgs, + ['paper'], + )) async with ( ctx.open_stream() as ems_stream, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index d2196b69..59b07758 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -152,11 +152,11 @@ class OrderMode: A callback applied for each level change to the line which will recompute the order size based on allocator settings. this is assigned inside - ``OrderMode.line_from_order()`` + ``OrderMode.new_line_from_order()`` ''' # NOTE: the ``Order.account`` is set at order stage time inside - # ``OrderMode.line_from_order()`` or is inside ``Order`` msg + # ``OrderMode.new_line_from_order()`` or is inside ``Order`` msg # field for loaded orders. order_info = tracker.alloc.next_order_info( startup_pp=tracker.startup_pp, @@ -174,7 +174,7 @@ class OrderMode: # reflect the corresponding account and pos info. self.pane.on_ui_settings_change('account', order.account) - def line_from_order( + def new_line_from_order( self, order: Order, chart: Optional[ChartPlotWidget] = None, @@ -240,7 +240,7 @@ class OrderMode: (self.hist_chart, {'only_show_markers_on_hover': True}), ]: kwargs.update(line_kwargs) - line = self.line_from_order( + line = self.new_line_from_order( order=order, chart=chart, **kwargs, @@ -300,7 +300,7 @@ class OrderMode: # `LineEditor.unstage_line()` on all staged lines.. # lines = self.lines_from_order( - line = self.line_from_order( + line = self.new_line_from_order( order, chart=chart, show_markers=True, @@ -428,10 +428,11 @@ class OrderMode: ) -> None: level = line.value() - # updated by level change callback set in ``.line_from_order()`` + # updated by level change callback set in ``.new_line_from_order()`` dialog = line.dialog size = dialog.order.size + # NOTE: sends modified order msg to EMS self.book.update( uuid=line.dialog.uuid, price=level, @@ -447,7 +448,8 @@ class OrderMode: # EMS response msg handlers def on_submit( self, - uuid: str + uuid: str, + order: Optional[Order] = None, ) -> Dialog: ''' @@ -464,6 +466,20 @@ class OrderMode: dialog.last_status_close() for line in lines: + + # if an order msg is provided update the line + # **from** that msg. + if order: + line.set_level(order.price) + self.on_level_change_update_next_order_info( + level=order.price, + line=line, + order=order, + # use the corresponding position tracker for the + # order's account. + tracker=self.trackers[order.account], + ) + # hide any lines not currently moused-over if not line.get_cursor(): line.hide_labels() @@ -980,9 +996,11 @@ async def process_trade_msg( match msg: case Status(resp='dark_open' | 'open'): + order = Order(**msg.req) + if dialog is not None: # show line label once order is live - mode.on_submit(oid) + mode.on_submit(oid, order=order) else: log.warning( @@ -992,7 +1010,6 @@ async def process_trade_msg( sym = mode.chart.linked.symbol fqsn = sym.front_fqsn() - order = Order(**msg.req) if ( ((order.symbol + f'.{msg.src}') == fqsn) @@ -1009,7 +1026,6 @@ async def process_trade_msg( msg.req = order dialog = mode.load_unknown_dialog_from_msg(msg) mode.on_submit(oid) - # return dialog, msg case Status(resp='error'): # delete level line from view