From 47f81b31af507503f5d97a5de754961a10ff6bd7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 16 Sep 2022 16:41:26 -0400 Subject: [PATCH 01/10] Kraken can cause status msg key error!? --- piker/clearing/_ems.py | 14 +++++++++----- 1 file changed, 9 insertions(+), 5 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 045134bc..fe60a551 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -884,11 +884,15 @@ async def translate_and_relay_brokerd_events( oid = book._ems2brokerd_ids.inverse.get(reqid) msg = f'Unhandled broker status for dialog {reqid}:\n' if oid: - status_msg = book._active[oid] - msg += ( - f'last status msg: {pformat(status_msg)}\n\n' - f'this msg:{fmsg}\n' - ) + status_msg = book._active.get(oid) + # status msg may not have been set yet or popped? + # NOTE: have seen a key error here on kraken + # clearable limits.. + if status_msg: + msg += ( + f'last status msg: {pformat(status_msg)}\n\n' + f'this msg:{fmsg}\n' + ) log.warning(msg) From 91397b85a4554f3ad51748b1f9ef9338f2821ee6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Sep 2022 16:09:59 -0400 Subject: [PATCH 02/10] Fix missing f-str in ems msg sender err block --- piker/clearing/_ems.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index fe60a551..9e6be589 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -417,7 +417,7 @@ class Router(Struct): for client_stream in self.clients.copy(): try: await client_stream.send(msg) - except( + except ( trio.ClosedResourceError, trio.BrokenResourceError, ): @@ -740,8 +740,8 @@ async def translate_and_relay_brokerd_events( or not status_msg ): log.warning( - 'Received status for unknown dialog {oid}:\n' - '{fmsg}' + f'Received status for unknown dialog {oid}:\n' + f'{fmsg}' ) continue From 45b97bf6c31f77d128b23d559fef445e359a96b9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Sep 2022 16:10:37 -0400 Subject: [PATCH 03/10] Make fill msg `.action: str` optional for `kraken` --- piker/clearing/_messages.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index f8fd6937..af666f5a 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -261,10 +261,10 @@ class BrokerdFill(Struct): time_ns: int # order exeuction related - action: str size: float price: float + action: Optional[str] = None broker_details: dict = {} # meta-data (eg. commisions etc.) # brokerd timestamp required for order mode arrow placement on x-axis From 887583d27f9cbad861f9eb098550f1a73dff5c23 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Sep 2022 16:12:25 -0400 Subject: [PATCH 04/10] Bleh, convert fill data to `float`s in kraken broker.. --- piker/brokers/kraken/broker.py | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 503833db..c1dd568b 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -1015,11 +1015,10 @@ async def handle_order_updates( fill_msg = BrokerdFill( time_ns=time.time_ns(), reqid=reqid, - - # action=action, # just use size value - # for now? - size=vlm, - price=price, + # just use size value for now? + # action=action, + size=float(vlm), + price=float(price), # TODO: maybe capture more msg data # i.e fees? From 48ff4859e664b74ce6a3d9495acc37dc4f2c574f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 23 Sep 2022 15:23:37 -0400 Subject: [PATCH 05/10] Update to new pair schema, adds `.cost_decimals` field --- piker/brokers/kraken/feed.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index e67d204c..6cc1cb7f 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -61,6 +61,7 @@ class Pair(Struct): quote: str # asset id of quote component lot: str # volume lot size + cost_decimals: int pair_decimals: int # scaling decimal places for pair lot_decimals: int # scaling decimal places for volume @@ -342,8 +343,8 @@ async def stream_quotes( # transform to upper since piker style is always lower sym = sym.upper() - - si = Pair(**await client.symbol_info(sym)) # validation + sym_info = await client.symbol_info(sym) + si = Pair(**sym_info) # validation syminfo = si.to_dict() syminfo['price_tick_size'] = 1 / 10**si.pair_decimals syminfo['lot_tick_size'] = 1 / 10**si.lot_decimals From 30bce42c0b6a1572fab9957b50e1c961859dfdbd Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 19 Sep 2022 16:11:41 -0400 Subject: [PATCH 06/10] Don't spin paper clear loop on non-clearing ticks Not sure what exactly happened but it seemed clears weren't working in some cases without this, also there's no point in spinning the simulated clearing loop if we're handling a non-clearing tick type. --- piker/clearing/_paper_engine.py | 31 +++++++++++++++++++++++-------- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index b74780f5..a40cf7c6 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -61,13 +61,13 @@ log = get_logger(__name__) class PaperBoi(Struct): - """ - Emulates a broker order client providing the same API and - delivering an order-event response stream but with methods for + ''' + Emulates a broker order client providing approximately the same API + and delivering an order-event response stream but with methods for triggering desired events based on forward testing engine - requirements. + requirements (eg open, closed, fill msgs). - """ + ''' broker: str ems_trades_stream: tractor.MsgStream @@ -336,9 +336,10 @@ async def simulate_fills( return tick_price >= our_price match tick: + + # on an ask queue tick, only clear buy entries case { 'price': tick_price, - # 'type': ('ask' | 'trade' | 'last'), 'type': 'ask', }: client.last_ask = ( @@ -351,9 +352,9 @@ async def simulate_fills( itertools.repeat(buy_on_ask) ) + # on a bid queue tick, only clear sell entries case { 'price': tick_price, - # 'type': ('bid' | 'trade' | 'last'), 'type': 'bid', }: client.last_bid = ( @@ -366,6 +367,10 @@ async def simulate_fills( itertools.repeat(sell_on_bid) ) + # TODO: fix this block, though it definitely + # costs a lot more CPU-wise + # - doesn't seem like clears are happening still on + # "resting" limit orders? case { 'price': tick_price, 'type': ('trade' | 'last'), @@ -390,6 +395,13 @@ async def simulate_fills( iter_entries = interleave() + # NOTE: all other (non-clearable) tick event types + # - we don't want to sping the simulated clear loop + # below unecessarily and further don't want to pop + # simulated live orders prematurely. + case _: + continue + # iterate all potentially clearable book prices # in FIFO order per side. for order_info, pred in iter_entries: @@ -532,7 +544,10 @@ async def trades_dialogue( # TODO: load paper positions per broker from .toml config file # and pass as symbol to position data mapping: ``dict[str, dict]`` - await ctx.started((pp_msgs, ['paper'])) + await ctx.started(( + pp_msgs, + ['paper'], + )) async with ( ctx.open_stream() as ems_stream, From cf835b97cac61b1b4aff6db504d50e94b67b7352 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Sep 2022 14:26:07 -0400 Subject: [PATCH 07/10] Add some info logs around paper fills --- piker/clearing/_paper_engine.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index a40cf7c6..211a29fc 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -207,9 +207,10 @@ class PaperBoi(Struct): remaining: float = 0, ) -> None: - """Pretend to fill a broker order @ price and size. + ''' + Pretend to fill a broker order @ price and size. - """ + ''' # TODO: net latency model await trio.sleep(0.05) fill_time_ns = time.time_ns() @@ -230,6 +231,7 @@ class PaperBoi(Struct): 'name': self.broker + '_paper', }, ) + log.info(f'Fake filling order:\n{fill_msg}') await self.ems_trades_stream.send(fill_msg) self._trade_ledger.update(fill_msg.to_dict()) @@ -407,6 +409,7 @@ async def simulate_fills( for order_info, pred in iter_entries: (our_price, size, reqid, action) = order_info + # print(order_info) clearable = pred(our_price) if clearable: # pop and retreive order info @@ -481,8 +484,8 @@ async def handle_order_requests( # counter - collision prone..) reqid=reqid, ) + log.info(f'Submitted paper LIMIT {reqid}:\n{order}') - # elif action == 'cancel': case {'action': 'cancel'}: msg = BrokerdCancel(**request_msg) await client.submit_cancel( From 909e06812143acdac3c1761833957ab72631e8a2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 30 Sep 2022 16:28:50 -0400 Subject: [PATCH 08/10] Support multi-client order-dialog management This patch was originally to fix a bug where new clients who re-connected to an `emsd` that was running a paper engine were not getting updates from new fills and/or cancels. It turns out the solution is more general: now, any client that creates a order dialog will be subscribing to receive updates on the order flow set mapped for that symbol/instrument as long as the client has registered for that particular fqsn with the EMS. This means re-connecting clients as well as "monitoring" clients can see the same orders, alerts, fills and clears. Impl details: - change all var names spelled as `dialogues` -> `dialogs` to be murican. - make `Router.dialogs: dict[str, defaultdict[str, list]]` so that each dialog id (oid) maps to a set of potential subscribing ems clients. - add `Router.fqsn2dialogs: dict[str, list[str]]` a map of fqsn entries to sets of oids. - adjust all core task code to make appropriate lookups into these 2 new tables instead of being handed specific client streams as input. - start the `translate_and_relay_brokerd_events` task as a daemon task that lives with the particular `TradesRelay` such that dialogs cleared while no client is connected are still processed. - rename `TradesRelay.brokerd_dialogue` -> `.brokerd_stream` - broadcast all status msgs to all subscribed clients in the relay loop. - always de-reg each client stream from the `Router.dialogs` table on close. --- piker/clearing/_ems.py | 190 ++++++++++++++++++++++------------------- 1 file changed, 102 insertions(+), 88 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 9e6be589..70eece72 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -18,7 +18,11 @@ In da suit parlances: "Execution management systems" """ -from collections import defaultdict, ChainMap +from __future__ import annotations +from collections import ( + defaultdict, + ChainMap, +) from contextlib import asynccontextmanager from math import isnan from pprint import pformat @@ -134,12 +138,6 @@ class _DarkBook(Struct): # _ems_entries: dict[str, str] = {} _active: dict = {} - # mapping of ems dialog ids to msg flow history - _msgflows: defaultdict[ - int, - ChainMap[dict[str, dict]], - ] = defaultdict(ChainMap) - _ems2brokerd_ids: dict[str, str] = bidict() @@ -152,8 +150,8 @@ _DEFAULT_SIZE: float = 1.0 async def clear_dark_triggers( + router: Router, brokerd_orders_stream: tractor.MsgStream, - ems_client_order_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa broker: str, fqsn: str, @@ -288,15 +286,16 @@ async def clear_dark_triggers( book._active[oid] = status # send response to client-side - try: - await ems_client_order_stream.send(status) - except ( - trio.ClosedResourceError, - ): - log.warning( - f'{ems_client_order_stream} stream broke?' - ) - break + for client_stream in router.dialogs[oid]: + try: + await client_stream.send(status) + except ( + trio.ClosedResourceError, + ): + log.warning( + f'{client_stream} stream broke?' + ) + break else: # condition scan loop complete log.debug(f'execs are {execs}') @@ -310,7 +309,7 @@ class TradesRelay(Struct): # for now we keep only a single connection open with # each ``brokerd`` for simplicity. - brokerd_dialogue: tractor.MsgStream + brokerd_stream: tractor.MsgStream # map of symbols to dicts of accounts to pp msgs positions: dict[ @@ -342,13 +341,28 @@ class Router(Struct): # order id to client stream map clients: set[tractor.MsgStream] = set() - dialogues: dict[ - str, - list[tractor.MsgStream] - ] = {} - # brokername to trades-dialogues streams with ``brokerd`` actors - relays: dict[str, TradesRelay] = {} + fqsn2dialogs: defaultdict[ + str, # fqsn + list[str], # oids + ] = defaultdict(list) + + dialogs: defaultdict[ + str, # ems uuid (oid) + list[tractor.MsgStream] # client side msg stream + ] = defaultdict(list) + + # mapping of ems dialog ids to msg flow history + msgflows: defaultdict[ + str, + ChainMap[dict[str, dict]], + ] = defaultdict(ChainMap) + + # brokername to trades-dialogs streams with ``brokerd`` actors + relays: dict[ + str, # broker name + TradesRelay, + ] = {} def get_dark_book( self, @@ -373,7 +387,8 @@ class Router(Struct): none already exists. ''' - relay: TradesRelay = self.relays.get(feed.mod.name) + broker = feed.mod.name + relay: TradesRelay = self.relays.get(broker) if ( relay is None @@ -387,7 +402,7 @@ class Router(Struct): ): relay = await self.nursery.start( - open_brokerd_trades_dialogue, + open_brokerd_trades_dialog, self, feed, symbol, @@ -395,18 +410,23 @@ class Router(Struct): loglevel, ) + self.nursery.start_soon( + translate_and_relay_brokerd_events, + broker, + relay.brokerd_stream, + self, + ) + relay.consumers += 1 # TODO: get updated positions here? - assert relay.brokerd_dialogue + assert relay.brokerd_stream try: yield relay - finally: # TODO: what exactly needs to be torn down here or # are we just consumer tracking? - relay.consumers -= 1 async def client_broadcast( @@ -429,7 +449,7 @@ class Router(Struct): _router: Router = None -async def open_brokerd_trades_dialogue( +async def open_brokerd_trades_dialog( router: Router, feed: Feed, @@ -505,7 +525,7 @@ async def open_brokerd_trades_dialogue( # we cache the relay task and instead of running multiple # tasks (which will result in multiples of the same msg being # relayed for each EMS client) we just register each client - # stream to this single relay loop using _router.dialogues + # stream to this single relay loop in the dialog table. # begin processing order events from the target brokerd backend # by receiving order submission response messages, @@ -532,7 +552,7 @@ async def open_brokerd_trades_dialogue( ).append(msg) relay = TradesRelay( - brokerd_dialogue=brokerd_trades_stream, + brokerd_stream=brokerd_trades_stream, positions=pps, accounts=accounts, consumers=1, @@ -550,8 +570,8 @@ async def open_brokerd_trades_dialogue( await trio.sleep_forever() finally: - # parent context must have been closed - # remove from cache so next client will respawn if needed + # parent context must have been closed remove from cache so + # next client will respawn if needed relay = _router.relays.pop(broker, None) if not relay: log.warning(f'Relay for {broker} was already removed!?') @@ -608,7 +628,7 @@ async def translate_and_relay_brokerd_events( book: _DarkBook = router.get_dark_book(broker) relay: TradesRelay = router.relays[broker] - assert relay.brokerd_dialogue == brokerd_trades_stream + assert relay.brokerd_stream == brokerd_trades_stream brokerd_msg: dict[str, Any] async for brokerd_msg in brokerd_trades_stream: @@ -707,11 +727,14 @@ async def translate_and_relay_brokerd_events( # some unexpected failure - something we need to think more # about. In most default situations, with composed orders # (ex. brackets), most brokers seem to use a oca policy. - ems_client_order_stream = router.dialogues[oid] + ems_client_order_streams = router.dialogs[oid] + status_msg.resp = 'error' status_msg.brokerd_msg = msg book._active[oid] = status_msg - await ems_client_order_stream.send(status_msg) + + for stream in ems_client_order_streams: + await stream.send(status_msg) # BrokerdStatus case { @@ -732,15 +755,15 @@ async def translate_and_relay_brokerd_events( # TODO: maybe pack this into a composite type that # contains both the IPC stream as well the # msg-chain/dialog. - ems_client_order_stream = router.dialogues.get(oid) + ems_client_order_streams = router.dialogs[oid] status_msg = book._active.get(oid) if ( - not ems_client_order_stream + not ems_client_order_streams or not status_msg ): log.warning( - f'Received status for unknown dialog {oid}:\n' + f'Received status for untracked dialog {oid}:\n' f'{fmsg}' ) continue @@ -759,7 +782,9 @@ async def translate_and_relay_brokerd_events( status_msg.reqid = reqid # THIS LINE IS CRITICAL! status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - await ems_client_order_stream.send(status_msg) + + for stream in ems_client_order_streams: + await stream.send(status_msg) if status == 'closed': log.info(f'Execution for {oid} is complete!') @@ -793,7 +818,7 @@ async def translate_and_relay_brokerd_events( msg = BrokerdFill(**brokerd_msg) log.info(f'Fill for {oid} cleared with:\n{fmsg}') - ems_client_order_stream = router.dialogues[oid] + ems_client_order_streams = router.dialogs[oid] # XXX: bleh, a fill can come after 'closed' from `ib`? # only send a late fill event we haven't already closed @@ -803,7 +828,10 @@ async def translate_and_relay_brokerd_events( status_msg.resp = 'fill' status_msg.reqid = reqid status_msg.brokerd_msg = msg - await ems_client_order_stream.send(status_msg) + + for stream in ems_client_order_streams: + await stream.send(status_msg) + # await ems_client_order_stream.send(status_msg) # ``Status`` containing an embedded order msg which # should be loaded as a "pre-existing open order" from the @@ -903,11 +931,6 @@ async def translate_and_relay_brokerd_events( # if status_msg is not None: # del status_msg - # TODO: do we want this to keep things cleaned up? - # it might require a special status from brokerd to affirm the - # flow is complete? - # router.dialogues.pop(oid) - async def process_client_order_cmds( @@ -921,7 +944,7 @@ async def process_client_order_cmds( ) -> None: - client_dialogues = router.dialogues + client_dialogs = router.dialogs # cmd: dict async for cmd in client_order_stream: @@ -934,7 +957,11 @@ async def process_client_order_cmds( # such that translated message from the brokerd backend can be # routed (relayed) to **just** that client stream (and in theory # others who are registered for such order affiliated msgs). - client_dialogues[oid] = client_order_stream + subs = client_dialogs[oid] + if client_order_stream not in subs: + subs.append(client_order_stream) + + router.fqsn2dialogs[symbol].append(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid) # any dark/live status which is current @@ -1002,8 +1029,9 @@ async def process_client_order_cmds( status.req = cmd await client_order_stream.send(status) - # de-register this client dialogue - router.dialogues.pop(oid) + # de-register this order dialogue from all clients + router.dialogs[oid].clear() + router.dialogs.pop(oid) dark_book._active.pop(oid) else: @@ -1034,8 +1062,8 @@ async def process_client_order_cmds( if status is not None: # if we already had a broker order id then # this is likely an order update commmand. - log.info(f"Modifying live {broker} order: {reqid}") reqid = status.reqid + log.info(f"Modifying live {broker} order: {reqid}") status.req = req status.resp = 'pending' @@ -1252,11 +1280,10 @@ async def _emsd_main( loglevel, ) as relay, - trio.open_nursery() as n, ): - brokerd_stream = relay.brokerd_dialogue # .clone() + brokerd_stream = relay.brokerd_stream # signal to client that we're started and deliver # all known pps and accounts for this ``brokerd``. @@ -1268,26 +1295,20 @@ async def _emsd_main( # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates - async with ems_ctx.open_stream() as ems_client_order_stream: + async with ems_ctx.open_stream() as client_stream: - # register the client side before startingn the + # register the client side before starting the # brokerd-side relay task to ensure the client is # delivered all exisiting open orders on startup. - _router.clients.add(ems_client_order_stream) - - n.start_soon( - translate_and_relay_brokerd_events, - broker, - brokerd_stream, - _router, - ) + _router.clients.add(client_stream) + for oid in _router.fqsn2dialogs[fqsn]: + _router.dialogs[oid].append(client_stream) # trigger scan and exec loop n.start_soon( clear_dark_triggers, - + _router, brokerd_stream, - ems_client_order_stream, quote_stream, broker, fqsn, # form: ... @@ -1295,16 +1316,11 @@ async def _emsd_main( ) # start inbound (from attached client) order request processing + # main entrypoint, run here until cancelled. try: - - # main entrypoint, run here until cancelled. await process_client_order_cmds( - - ems_client_order_stream, - - # relay.brokerd_dialogue, + client_stream, brokerd_stream, - fqsn, feed, dark_book, @@ -1314,28 +1330,26 @@ async def _emsd_main( finally: # try to remove client from "registry" try: - _router.clients.remove(ems_client_order_stream) + _router.clients.remove(client_stream) except KeyError: log.warning( - f'Stream {ems_client_order_stream._ctx.chan.uid}' + f'Stream {client_stream._ctx.chan.uid}' ' was already dropped?' ) - dialogues = _router.dialogues + dialogs = _router.dialogs + for oid, client_streams in dialogs.items(): + if client_stream in client_streams: + client_streams.remove(client_stream) - for oid, client_stream in dialogues.copy().items(): - - if client_stream == ems_client_order_stream: - - log.warning( - f'client dialogue is being abandoned:\n' - f'{oid} ->\n{client_stream._ctx.chan.uid}' - ) - dialogues.pop(oid) - - # TODO: for order dialogues left "alive" in + # TODO: for order dialogs left "alive" in # the ems this is where we should allow some # system to take over management. Likely we # want to allow the user to choose what kind # of policy to use (eg. cancel all orders # from client, run some algo, etc.) + if not client_streams: + log.warning( + f'Order dialog is being unmonitored:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' + ) From 4877af9bc30352aee4d8671a5ce3cd8900977e5e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Oct 2022 12:54:10 -0400 Subject: [PATCH 09/10] Add pub-sub broadcasting Establishes a more formalized subscription based fan out pattern to ems clients who subscribe for order flow for a particular symbol (the fqsn is the default subscription key for now). Make `Router.client_broadcast()` take a `sub_key: str` value which determines the set of clients to forward a message to and drop all such manually defined broadcast loops from task (func) code. Also add `.get_subs()` which (hackily) allows getting the set of clients for a given sub key where any stream that is detected as "closed" is discarded in the output. Further we simplify to `Router.dialogs: defaultdict[str, set[tractor.MsgStream]]` and `.subscriptions` as maps to sets of streams for much easier broadcast management/logic using set operations inside `.client_broadcast()`. --- piker/clearing/_ems.py | 180 ++++++++++++++++++++++++++--------------- 1 file changed, 113 insertions(+), 67 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 70eece72..23b50ddf 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -21,7 +21,7 @@ In da suit parlances: "Execution management systems" from __future__ import annotations from collections import ( defaultdict, - ChainMap, + # ChainMap, ) from contextlib import asynccontextmanager from math import isnan @@ -286,16 +286,10 @@ async def clear_dark_triggers( book._active[oid] = status # send response to client-side - for client_stream in router.dialogs[oid]: - try: - await client_stream.send(status) - except ( - trio.ClosedResourceError, - ): - log.warning( - f'{client_stream} stream broke?' - ) - break + await router.client_broadcast( + fqsn, + status, + ) else: # condition scan loop complete log.debug(f'execs are {execs}') @@ -342,21 +336,24 @@ class Router(Struct): # order id to client stream map clients: set[tractor.MsgStream] = set() - fqsn2dialogs: defaultdict[ - str, # fqsn - list[str], # oids - ] = defaultdict(list) + # sets of clients mapped from subscription keys + subscribers: defaultdict[ + str, # sub key, default fqsn + set[tractor.MsgStream], # unique client streams + ] = defaultdict(set) + # sets of clients dynamically registered for specific + # order flows based on subscription config. dialogs: defaultdict[ str, # ems uuid (oid) - list[tractor.MsgStream] # client side msg stream - ] = defaultdict(list) + set[tractor.MsgStream] # client side msg stream + ] = defaultdict(set) - # mapping of ems dialog ids to msg flow history - msgflows: defaultdict[ - str, - ChainMap[dict[str, dict]], - ] = defaultdict(ChainMap) + # TODO: mapping of ems dialog ids to msg flow history + # msgflows: defaultdict[ + # str, + # ChainMap[dict[str, dict]], + # ] = defaultdict(ChainMap) # brokername to trades-dialogs streams with ``brokerd`` actors relays: dict[ @@ -372,6 +369,20 @@ class Router(Struct): return self.books.setdefault(brokername, _DarkBook(brokername)) + def get_subs( + self, + oid: str, + + ) -> set[tractor.MsgStream]: + ''' + Deliver list of non-closed subscriber client msg streams. + + ''' + return set( + stream for stream in self.dialogs[oid] + if not stream._closed + ) + @asynccontextmanager async def maybe_open_brokerd_trades_dialogue( self, @@ -431,20 +442,27 @@ class Router(Struct): async def client_broadcast( self, + sub_key: str, msg: dict, ) -> None: - for client_stream in self.clients.copy(): + to_remove: set[tractor.MsgStream] = set() + subs = self.subscribers[sub_key] + for client_stream in subs: try: await client_stream.send(msg) except ( trio.ClosedResourceError, trio.BrokenResourceError, ): + to_remove.add(client_stream) self.clients.remove(client_stream) log.warning( f'client for {client_stream} was already closed?') + if to_remove: + subs.difference_update(to_remove) + _router: Router = None @@ -558,7 +576,7 @@ async def open_brokerd_trades_dialog( consumers=1, ) - _router.relays[broker] = relay + router.relays[broker] = relay # the ems scan loop may be cancelled by the client but we # want to keep the ``brokerd`` dialogue up regardless @@ -572,7 +590,7 @@ async def open_brokerd_trades_dialog( finally: # parent context must have been closed remove from cache so # next client will respawn if needed - relay = _router.relays.pop(broker, None) + relay = router.relays.pop(broker, None) if not relay: log.warning(f'Relay for {broker} was already removed!?') @@ -627,7 +645,6 @@ async def translate_and_relay_brokerd_events( ''' book: _DarkBook = router.get_dark_book(broker) relay: TradesRelay = router.relays[broker] - assert relay.brokerd_stream == brokerd_trades_stream brokerd_msg: dict[str, Any] @@ -660,7 +677,7 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - await router.client_broadcast(pos_msg) + await router.client_broadcast(sym, pos_msg) continue # BrokerdOrderAck @@ -727,21 +744,18 @@ async def translate_and_relay_brokerd_events( # some unexpected failure - something we need to think more # about. In most default situations, with composed orders # (ex. brackets), most brokers seem to use a oca policy. - ems_client_order_streams = router.dialogs[oid] status_msg.resp = 'error' status_msg.brokerd_msg = msg book._active[oid] = status_msg - for stream in ems_client_order_streams: - await stream.send(status_msg) + await router.client_broadcast(sym, status_msg) # BrokerdStatus case { 'name': 'status', 'status': status, 'reqid': reqid, # brokerd generated order-request id - } if ( (oid := book._ems2brokerd_ids.inverse.get(reqid)) and status in ( @@ -755,7 +769,7 @@ async def translate_and_relay_brokerd_events( # TODO: maybe pack this into a composite type that # contains both the IPC stream as well the # msg-chain/dialog. - ems_client_order_streams = router.dialogs[oid] + ems_client_order_streams = router.get_subs(oid) status_msg = book._active.get(oid) if ( @@ -783,8 +797,10 @@ async def translate_and_relay_brokerd_events( status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - for stream in ems_client_order_streams: - await stream.send(status_msg) + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) if status == 'closed': log.info(f'Execution for {oid} is complete!') @@ -818,8 +834,6 @@ async def translate_and_relay_brokerd_events( msg = BrokerdFill(**brokerd_msg) log.info(f'Fill for {oid} cleared with:\n{fmsg}') - ems_client_order_streams = router.dialogs[oid] - # XXX: bleh, a fill can come after 'closed' from `ib`? # only send a late fill event we haven't already closed # out the dialog status locally. @@ -829,9 +843,10 @@ async def translate_and_relay_brokerd_events( status_msg.reqid = reqid status_msg.brokerd_msg = msg - for stream in ems_client_order_streams: - await stream.send(status_msg) - # await ems_client_order_stream.send(status_msg) + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) # ``Status`` containing an embedded order msg which # should be loaded as a "pre-existing open order" from the @@ -883,7 +898,10 @@ async def translate_and_relay_brokerd_events( # fan-out-relay position msgs immediately by # broadcasting updates on all client streams - await router.client_broadcast(status_msg) + await router.client_broadcast( + order.symbol, + status_msg, + ) # don't fall through continue @@ -937,15 +955,21 @@ async def process_client_order_cmds( client_order_stream: tractor.MsgStream, brokerd_order_stream: tractor.MsgStream, - symbol: str, + fqsn: str, feed: Feed, dark_book: _DarkBook, router: Router, ) -> None: + ''' + Client-dialog request loop: accept order requests and deliver + initial status msg responses to subscribed clients. - client_dialogs = router.dialogs + This task-loop handles both management of dark triggered orders and + alerts by inserting them into the "dark book"-table as well as + submitting live orders immediately if requested by the client. + ''' # cmd: dict async for cmd in client_order_stream: log.info(f'Received order cmd:\n{pformat(cmd)}') @@ -953,15 +977,17 @@ async def process_client_order_cmds( # CAWT DAMN we need struct support! oid = str(cmd['oid']) - # register this stream as an active dialogue for this order id - # such that translated message from the brokerd backend can be - # routed (relayed) to **just** that client stream (and in theory - # others who are registered for such order affiliated msgs). - subs = client_dialogs[oid] - if client_order_stream not in subs: - subs.append(client_order_stream) + # register this stream as an active order dialog (msg flow) for + # this order id such that translated message from the brokerd + # backend can be routed and relayed to subscribed clients. + subs = router.dialogs[oid] + + # add all subscribed clients for this fqsn (should eventually be + # a more generalize subscription system) to received order msg + # updates (and thus show stuff in the UI). + subs.add(client_order_stream) + subs.update(router.subscribers[fqsn]) - router.fqsn2dialogs[symbol].append(oid) reqid = dark_book._ems2brokerd_ids.inverse.get(oid) # any dark/live status which is current @@ -973,7 +999,7 @@ async def process_client_order_cmds( 'action': 'cancel', 'oid': oid, } if ( - (status := dark_book._active.get(oid)) + status and status.resp in ('open', 'pending') ): reqid = status.reqid @@ -1009,11 +1035,11 @@ async def process_client_order_cmds( 'action': 'cancel', 'oid': oid, } if ( - status and status.resp == 'dark_open' - # or status and status.req + status + and status.resp == 'dark_open' ): # remove from dark book clearing - entry = dark_book.orders[symbol].pop(oid, None) + entry = dark_book.orders[fqsn].pop(oid, None) if entry: ( pred, @@ -1028,14 +1054,18 @@ async def process_client_order_cmds( status.resp = 'canceled' status.req = cmd - await client_order_stream.send(status) + await router.client_broadcast( + fqsn, + status, + ) + # de-register this order dialogue from all clients router.dialogs[oid].clear() router.dialogs.pop(oid) dark_book._active.pop(oid) else: - log.exception(f'No dark order for {symbol}?') + log.exception(f'No dark order for {fqsn}?') # TODO: eventually we should be receiving # this struct on the wire unpacked in a scoped protocol @@ -1194,7 +1224,12 @@ async def process_client_order_cmds( src='dark', ) dark_book._active[oid] = status - await client_order_stream.send(status) + + # broadcast status to all subscribed clients + await router.client_broadcast( + fqsn, + status, + ) @tractor.context @@ -1220,20 +1255,26 @@ async def _emsd_main( received in a stream from that client actor and then responses are streamed back up to the original calling task in the same client. - The primary ``emsd`` task tree is: + The primary ``emsd`` task trees are: - - ``_emsd_main()``: - sets up brokerd feed, order feed with ems client, trades dialogue with - brokderd trading api. - | - - ``clear_dark_triggers()``: - run (dark order) conditions on inputs and trigger brokerd "live" - order submissions. + - ``_setup_persistent_emsd()``: + is the ``emsd`` actor's primary root task which sets up an + actor-global ``Router`` instance and starts a relay loop task + which lives until the backend broker is shutdown or the ems is + terminated. | - (maybe) ``translate_and_relay_brokerd_events()``: accept normalized trades responses from brokerd, process and relay to ems client(s); this is a effectively a "trade event reponse" proxy-broker. + + - ``_emsd_main()``: + attaches a brokerd real-time quote feed and trades dialogue with + brokderd trading api for every connecting client. + | + - ``clear_dark_triggers()``: + run (dark order) conditions on inputs and trigger brokerd "live" + order submissions. | - ``process_client_order_cmds()``: accepts order cmds from requesting clients, registers dark orders and @@ -1301,8 +1342,12 @@ async def _emsd_main( # brokerd-side relay task to ensure the client is # delivered all exisiting open orders on startup. _router.clients.add(client_stream) - for oid in _router.fqsn2dialogs[fqsn]: - _router.dialogs[oid].append(client_stream) + + # TODO: instead of by fqsn we need a subscription + # system/schema here to limit what each new client is + # allowed to see in terms of broadcasted order flow + # updates per dialog. + _router.subscribers[fqsn].add(client_stream) # trigger scan and exec loop n.start_soon( @@ -1337,6 +1382,7 @@ async def _emsd_main( ' was already dropped?' ) + _router.subscribers[fqsn].remove(client_stream) dialogs = _router.dialogs for oid, client_streams in dialogs.items(): if client_stream in client_streams: From 35871d02138375e0c4045a1ce8d78ef26fd03d01 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 3 Oct 2022 13:41:40 -0400 Subject: [PATCH 10/10] Support line update from `Order` msg in `.on_submit()` --- piker/ui/order_mode.py | 36 ++++++++++++++++++++++++++---------- 1 file changed, 26 insertions(+), 10 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index d2196b69..59b07758 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -152,11 +152,11 @@ class OrderMode: A callback applied for each level change to the line which will recompute the order size based on allocator settings. this is assigned inside - ``OrderMode.line_from_order()`` + ``OrderMode.new_line_from_order()`` ''' # NOTE: the ``Order.account`` is set at order stage time inside - # ``OrderMode.line_from_order()`` or is inside ``Order`` msg + # ``OrderMode.new_line_from_order()`` or is inside ``Order`` msg # field for loaded orders. order_info = tracker.alloc.next_order_info( startup_pp=tracker.startup_pp, @@ -174,7 +174,7 @@ class OrderMode: # reflect the corresponding account and pos info. self.pane.on_ui_settings_change('account', order.account) - def line_from_order( + def new_line_from_order( self, order: Order, chart: Optional[ChartPlotWidget] = None, @@ -240,7 +240,7 @@ class OrderMode: (self.hist_chart, {'only_show_markers_on_hover': True}), ]: kwargs.update(line_kwargs) - line = self.line_from_order( + line = self.new_line_from_order( order=order, chart=chart, **kwargs, @@ -300,7 +300,7 @@ class OrderMode: # `LineEditor.unstage_line()` on all staged lines.. # lines = self.lines_from_order( - line = self.line_from_order( + line = self.new_line_from_order( order, chart=chart, show_markers=True, @@ -428,10 +428,11 @@ class OrderMode: ) -> None: level = line.value() - # updated by level change callback set in ``.line_from_order()`` + # updated by level change callback set in ``.new_line_from_order()`` dialog = line.dialog size = dialog.order.size + # NOTE: sends modified order msg to EMS self.book.update( uuid=line.dialog.uuid, price=level, @@ -447,7 +448,8 @@ class OrderMode: # EMS response msg handlers def on_submit( self, - uuid: str + uuid: str, + order: Optional[Order] = None, ) -> Dialog: ''' @@ -464,6 +466,20 @@ class OrderMode: dialog.last_status_close() for line in lines: + + # if an order msg is provided update the line + # **from** that msg. + if order: + line.set_level(order.price) + self.on_level_change_update_next_order_info( + level=order.price, + line=line, + order=order, + # use the corresponding position tracker for the + # order's account. + tracker=self.trackers[order.account], + ) + # hide any lines not currently moused-over if not line.get_cursor(): line.hide_labels() @@ -980,9 +996,11 @@ async def process_trade_msg( match msg: case Status(resp='dark_open' | 'open'): + order = Order(**msg.req) + if dialog is not None: # show line label once order is live - mode.on_submit(oid) + mode.on_submit(oid, order=order) else: log.warning( @@ -992,7 +1010,6 @@ async def process_trade_msg( sym = mode.chart.linked.symbol fqsn = sym.front_fqsn() - order = Order(**msg.req) if ( ((order.symbol + f'.{msg.src}') == fqsn) @@ -1009,7 +1026,6 @@ async def process_trade_msg( msg.req = order dialog = mode.load_unknown_dialog_from_msg(msg) mode.on_submit(oid) - # return dialog, msg case Status(resp='error'): # delete level line from view