diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 99b268be..6fe815dc 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -23,6 +23,7 @@ from collections import ( defaultdict, # ChainMap, ) +from contextlib import asynccontextmanager as acm from math import isnan from pprint import pformat import time @@ -40,6 +41,10 @@ import tractor from ..log import get_logger from ..data._normalize import iterticks +from ..data._source import ( + unpack_fqsn, + mk_fqsn, +) from ..data.feed import ( Feed, maybe_open_feed, @@ -104,7 +109,7 @@ def mk_check( ) -class _DarkBook(Struct): +class DarkBook(Struct): ''' EMS-trigger execution book. @@ -119,7 +124,7 @@ class _DarkBook(Struct): broker: str # levels which have an executable action (eg. alert, order, signal) - orders: dict[ + triggers: dict[ str, # symbol dict[ str, # uuid @@ -131,14 +136,8 @@ class _DarkBook(Struct): ] ] = {} - # tracks most recent values per symbol each from data feed - lasts: dict[ - str, - float, - ] = {} - - _active: dict = {} - + lasts: dict[str, float] = {} # quote prices + _active: dict[str, Status] = {} # active order dialogs _ems2brokerd_ids: dict[str, str] = bidict() @@ -157,7 +156,7 @@ async def clear_dark_triggers( broker: str, fqsn: str, - book: _DarkBook, + book: DarkBook, ) -> None: ''' @@ -174,7 +173,7 @@ async def clear_dark_triggers( async for quotes in quote_stream: # start = time.time() for sym, quote in quotes.items(): - execs = book.orders.get(sym, {}) + execs = book.triggers.get(sym, {}) for tick in iterticks( quote, # dark order price filter(s) @@ -292,7 +291,7 @@ async def clear_dark_triggers( else: # condition scan loop complete log.debug(f'execs are {execs}') if execs: - book.orders[fqsn] = execs + book.triggers[fqsn] = execs # print(f'execs scan took: {time.time() - start}') @@ -329,7 +328,7 @@ class Router(Struct): nursery: trio.Nursery # broker to book map - books: dict[str, _DarkBook] = {} + books: dict[str, DarkBook] = {} # sets of clients mapped from subscription keys subscribers: defaultdict[ @@ -360,9 +359,9 @@ class Router(Struct): self, brokername: str, - ) -> _DarkBook: + ) -> DarkBook: - return self.books.setdefault(brokername, _DarkBook(brokername)) + return self.books.setdefault(brokername, DarkBook(brokername)) def get_subs( self, @@ -378,7 +377,130 @@ class Router(Struct): if not stream._closed ) - async def maybe_open_trade_relays( + @acm + async def maybe_open_brokerd_dialog( + self, + feed: Feed, + exec_mode: str, + symbol: str, + loglevel: str, + + ) -> None: + brokermod = feed.mod + broker = brokermod.name + relay: TradesRelay = self.relays.get(broker) + if ( + relay + + # We always want to spawn a new relay for the paper + # engine per symbol since we need a new tractor context + # to be opened for every every symbol such that a new + # data feed and ``PaperBoi`` client will be created and + # then used to simulate clearing events. + and exec_mode != 'paper' + ): + # deliver already cached instance + yield relay + return + + trades_endpoint = getattr(brokermod, 'trades_dialogue', None) + if ( + trades_endpoint is None + or exec_mode == 'paper' + ): + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running + # a paper-simulator clearing engine. + + # load the paper trading engine + exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + fqsn='.'.join([symbol, broker]), + loglevel=loglevel, + ) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = feed.portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + # open trades-dialog endpoint with backend broker + positions: list[BrokerdPosition] + accounts: tuple[str] + + async with ( + open_trades_endpoint as ( + brokerd_ctx, + (positions, accounts,), + ), + brokerd_ctx.open_stream() as brokerd_trades_stream, + ): + # XXX: really we only want one stream per `emsd` + # actor to relay global `brokerd` order events + # unless we're going to expect each backend to + # relay only orders affiliated with a particular + # ``trades_dialogue()`` session (seems annoying + # for implementers). So, here we cache the relay + # task and instead of running multiple tasks + # (which will result in multiples of the same + # msg being relayed for each EMS client) we just + # register each client stream to this single + # relay loop in the dialog table. + + # begin processing order events from the target + # brokerd backend by receiving order submission + # response messages, normalizing them to EMS + # messages and relaying back to the piker order + # client set. + + # locally cache and track positions per account with + # a table of (brokername, acctid) -> `BrokerdPosition` + # msgs. + pps = {} + for msg in positions: + log.info(f'loading pp: {msg}') + + account = msg['account'] + + # TODO: better value error for this which + # dumps the account and message and states the + # mismatch.. + assert account in accounts + + pps.setdefault( + (broker, account), + [], + ).append(msg) + + relay = TradesRelay( + brokerd_stream=brokerd_trades_stream, + positions=pps, + accounts=accounts, + consumers=1, + ) + + self.relays[broker] = relay + + # this context should block here indefinitely until + # the ``brokerd`` task either dies or is cancelled + try: + yield relay + finally: + # parent context must have been closed remove from cache so + # next client will respawn if needed + relay = self.relays.pop(broker, None) + if not relay: + log.warning( + f'Relay for {broker} was already removed!?') + + async def open_trade_relays( self, fqsn: str, exec_mode: str, @@ -408,144 +530,48 @@ class Router(Struct): # XXX: this should be initial price quote from target provider first_quote: dict = feed.first_quotes[fqsn] - book: _DarkBook = self.get_dark_book(broker) + book: DarkBook = self.get_dark_book(broker) book.lasts[fqsn]: float = first_quote['last'] - relay: TradesRelay = self.relays.get(broker) - if ( - relay + async with self.maybe_open_brokerd_dialog( + feed=feed, + exec_mode=exec_mode, + symbol=symbol, + loglevel=loglevel, + ) as relay: + + # dark book clearing loop, also lives with parent + # daemon to allow dark order clearing while no + # client is connected. + self.nursery.start_soon( + clear_dark_triggers, + self, + relay.brokerd_stream, + quote_stream, + broker, + fqsn, # form: ... + book + ) + + client_ready = trio.Event() + task_status.started((relay, feed, client_ready)) + + # sync to the client side by waiting for the stream + # connection setup before relaying any existing live + # orders from the brokerd. + await client_ready.wait() + assert self.subscribers + + # spawn a ``brokerd`` order control dialog stream + # that syncs lifetime with the parent `emsd` daemon. + self.nursery.start_soon( + translate_and_relay_brokerd_events, + broker, + relay.brokerd_stream, + self, + ) - # We always want to spawn a new relay for the paper engine - # per symbol since we need a new tractor context to be - # opened for every every symbol such that a new data feed - # and ``PaperBoi`` client will be created and then used to - # simulate clearing events. - and exec_mode != 'paper' - ): - task_status.started((relay, feed)) await trio.sleep_forever() - return - - trades_endpoint = getattr(brokermod, 'trades_dialogue', None) - if ( - trades_endpoint is None - or exec_mode == 'paper' - ): - # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running - # a paper-simulator clearing engine. - - # load the paper trading engine - exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') - - # load the paper trading engine as a subactor of this emsd - # actor to simulate the real IPC load it'll have when also - # pulling data from feeds - open_trades_endpoint = paper.open_paperboi( - fqsn='.'.join([symbol, broker]), - loglevel=loglevel, - ) - - else: - # open live brokerd trades endpoint - open_trades_endpoint = feed.portal.open_context( - trades_endpoint, - loglevel=loglevel, - ) - - # open trades-dialog endpoint with backend broker - try: - positions: list[BrokerdPosition] - accounts: tuple[str] - - async with ( - open_trades_endpoint as ( - brokerd_ctx, - (positions, accounts,), - ), - brokerd_ctx.open_stream() as brokerd_trades_stream, - ): - # XXX: really we only want one stream per `emsd` - # actor to relay global `brokerd` order events - # unless we're going to expect each backend to - # relay only orders affiliated with a particular - # ``trades_dialogue()`` session (seems annoying - # for implementers). So, here we cache the relay - # task and instead of running multiple tasks - # (which will result in multiples of the same - # msg being relayed for each EMS client) we just - # register each client stream to this single - # relay loop in the dialog table. - - # begin processing order events from the target - # brokerd backend by receiving order submission - # response messages, normalizing them to EMS - # messages and relaying back to the piker order - # client set. - - # locally cache and track positions per account with - # a table of (brokername, acctid) -> `BrokerdPosition` - # msgs. - pps = {} - for msg in positions: - log.info(f'loading pp: {msg}') - - account = msg['account'] - - # TODO: better value error for this which - # dumps the account and message and states the - # mismatch.. - assert account in accounts - - pps.setdefault( - (broker, account), - [], - ).append(msg) - - relay = TradesRelay( - brokerd_stream=brokerd_trades_stream, - positions=pps, - accounts=accounts, - consumers=1, - ) - - self.relays[broker] = relay - - # spawn a ``brokerd`` order control dialog stream - # that syncs lifetime with the parent `emsd` daemon. - self.nursery.start_soon( - translate_and_relay_brokerd_events, - broker, - relay.brokerd_stream, - self, - ) - - # dark book clearing loop, also lives with parent - # daemon to allow dark order clearing while no - # client is connected. - self.nursery.start_soon( - clear_dark_triggers, - self, - relay.brokerd_stream, - quote_stream, - broker, - fqsn, # form: ... - book - ) - - task_status.started((relay, feed)) - - # this context should block here indefinitely until - # the ``brokerd`` task either dies or is cancelled - await trio.sleep_forever() - - finally: - # parent context must have been closed remove from cache so - # next client will respawn if needed - relay = self.relays.pop(broker, None) - if not relay: - log.warning(f'Relay for {broker} was already removed!?') async def client_broadcast( self, @@ -584,7 +610,7 @@ class Router(Struct): and notify_on_headless ): log.info( - 'No clients attached, firing notification for msg:\n' + 'No clients attached, firing notification for {sub_key} msg:\n' f'{msg}' ) await notify_from_ems_status_msg( @@ -645,7 +671,7 @@ async def translate_and_relay_brokerd_events( {'presubmitted', 'submitted', 'cancelled', 'inactive'} ''' - book: _DarkBook = router.get_dark_book(broker) + book: DarkBook = router.get_dark_book(broker) relay: TradesRelay = router.relays[broker] assert relay.brokerd_stream == brokerd_trades_stream @@ -885,7 +911,11 @@ async def translate_and_relay_brokerd_events( # use backend request id as our ems id though this # may end up with collisions? status_msg = Status(**brokerd_msg) + + # NOTE: be sure to pack an fqsn for the client side! order = Order(**status_msg.req) + order.symbol = mk_fqsn(broker, order.symbol) + assert order.price and order.size status_msg.req = order @@ -961,7 +991,7 @@ async def process_client_order_cmds( fqsn: str, feed: Feed, - dark_book: _DarkBook, + dark_book: DarkBook, router: Router, ) -> None: @@ -1042,7 +1072,7 @@ async def process_client_order_cmds( and status.resp == 'dark_open' ): # remove from dark book clearing - entry = dark_book.orders[fqsn].pop(oid, None) + entry = dark_book.triggers[fqsn].pop(oid, None) if entry: ( pred, @@ -1204,7 +1234,7 @@ async def process_client_order_cmds( # submit execution/order to EMS scan loop # NOTE: this may result in an override of an existing # dark book entry if the order id already exists - dark_book.orders.setdefault( + dark_book.triggers.setdefault( fqsn, {} )[oid] = ( pred, @@ -1287,7 +1317,6 @@ async def _emsd_main( global _router assert _router - from ..data._source import unpack_fqsn broker, symbol, suffix = unpack_fqsn(fqsn) # TODO: would be nice if in tractor we can require either a ctx arg, @@ -1298,14 +1327,15 @@ async def _emsd_main( # spawn one task per broker feed relay: TradesRelay feed: Feed + client_ready: trio.Event # open a stream with the brokerd backend for order flow dialogue # only open if one isn't already up: we try to keep as few duplicate # streams as necessary. # TODO: should we try using `tractor.trionics.maybe_open_context()` # here? - relay, feed = await _router.nursery.start( - _router.maybe_open_trade_relays, + relay, feed, client_ready = await _router.nursery.start( + _router.open_trade_relays, fqsn, exec_mode, loglevel, @@ -1333,6 +1363,7 @@ async def _emsd_main( # allowed to see in terms of broadcasted order flow # updates per dialog. _router.subscribers[fqsn].add(client_stream) + client_ready.set() # start inbound (from attached client) order request processing # main entrypoint, run here until cancelled.