diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 5bdf34d3..2cd6a1b9 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -27,12 +27,14 @@ from typing import AsyncIterator, Callable, Any from bidict import bidict from pydantic import BaseModel import trio +from trio_typing import TaskStatus import tractor from .. import data from ..log import get_logger from ..data._normalize import iterticks from ..data.feed import Feed +from .._daemon import maybe_spawn_brokerd from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -82,15 +84,16 @@ def mk_check( @dataclass class _DarkBook: - """Client-side execution book. + '''EMS-trigger execution book. - Contains conditions for executions (aka "orders") which are not - exposed to brokers and thus the market; i.e. these are privacy - focussed "client side" orders. + Contains conditions for executions (aka "orders" or "triggers") + which are not exposed to brokers and thus the market; i.e. these are + privacy focussed "client side" orders which are submitted in real-time + based on specified trigger conditions. - A singleton instance is created per EMS actor (for now). + A an instance per `brokerd` is created per EMS actor (for now). - """ + ''' broker: str # levels which have an executable action (eg. alert, order, signal) @@ -256,17 +259,34 @@ async def clear_dark_triggers( # print(f'execs scan took: {time.time() - start}') +@dataclass +class TradesRelay: + brokerd_dialogue: tractor.MsgStream + positions: dict[str, float] + consumers: int = 0 + + class _Router(BaseModel): - '''Order router which manages per-broker dark books, alerts, - and clearing related data feed management. + '''Order router which manages and tracks per-broker dark book, + alerts, clearing and related data feed management. + + A singleton per ``emsd`` actor. ''' + # setup at actor spawn time nursery: trio.Nursery feeds: dict[tuple[str, str], Any] = {} + + # broker to book map books: dict[str, _DarkBook] = {} + + # order id to client stream map + clients: set[tractor.MsgStream] = set() dialogues: dict[str, list[tractor.MsgStream]] = {} - relays: dict[str, tuple[dict, tractor.MsgStream]] = {} + + # brokername to trades-dialogues streams with ``brokerd`` actors + relays: dict[str, TradesRelay] = {} class Config: arbitrary_types_allowed = True @@ -280,10 +300,160 @@ class _Router(BaseModel): return self.books.setdefault(brokername, _DarkBook(brokername)) + @asynccontextmanager + async def maybe_open_brokerd_trades_dialogue( + + self, + feed: Feed, + symbol: str, + dark_book: _DarkBook, + _exec_mode: str, + loglevel: str, + + ) -> tuple[dict, tractor.MsgStream]: + '''Open and yield ``brokerd`` trades dialogue context-stream if none + already exists. + + ''' + relay = self.relays.get(feed.mod.name) + + if relay is None: + + relay = await self.nursery.start( + open_brokerd_trades_dialogue, + self, + feed, + symbol, + _exec_mode, + loglevel, + ) + + relay.consumers += 1 + + # TODO: get updated positions here? + assert relay.brokerd_dialogue + try: + yield relay + + finally: + + # TODO: what exactly needs to be torn down here or + # are we just consumer tracking? + + relay.consumers -= 1 + _router: _Router = None +async def open_brokerd_trades_dialogue( + + router: _Router, + feed: Feed, + symbol: str, + _exec_mode: str, + loglevel: str, + + task_status: TaskStatus[TradesRelay] = trio.TASK_STATUS_IGNORED, + +) -> tuple[dict, tractor.MsgStream]: + '''Open and yield ``brokerd`` trades dialogue context-stream if none + already exists. + + ''' + trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) + + broker = feed.mod.name + + # TODO: make a `tractor` bug about this! + # portal = feed._brokerd_portal + + # XXX: we must have our own portal + channel otherwise + # when the data feed closes it may result in a half-closed/fucked + # channel that the brokerd side thinks is still open somehow!? + async with maybe_spawn_brokerd( + + broker, + loglevel=loglevel, + + ) as portal: + + if trades_endpoint is None or _exec_mode == 'paper': + + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running a + # paper-simulator clearing engine. + + # load the paper trading engine + _exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + broker=broker, + symbol=symbol, + loglevel=loglevel, + ) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + try: + async with ( + + open_trades_endpoint as (brokerd_ctx, positions), + brokerd_ctx.open_stream() as brokerd_trades_stream, + + ): + # XXX: really we only want one stream per `emsd` actor + # to relay global `brokerd` order events unless we're + # doing to expect each backend to relay only orders + # affiliated with a particular ``trades_dialogue()`` + # session (seems annoying for implementers). So, here + # we cache the relay task and instead of running multiple + # tasks (which will result in multiples of the same msg being + # relayed for each EMS client) we just register each client + # stream to this single relay loop using _router.dialogues + + # begin processing order events from the target brokerd backend + # by receiving order submission response messages, + # normalizing them to EMS messages and relaying back to + # the piker order client set. + + relay = TradesRelay( + brokerd_dialogue=brokerd_trades_stream, + positions=positions, + consumers=1 + ) + + _router.relays[broker] = relay + + # the ems scan loop may be cancelled by the client but we + # want to keep the ``brokerd`` dialogue up regardless + + task_status.started(relay) + + await translate_and_relay_brokerd_events( + broker, + brokerd_trades_stream, + _router, + ) + + # this context should block here indefinitely until + # the ``brokerd`` task either dies or is cancelled + + finally: + # parent context must have been closed + # remove from cache so next client will respawn if needed + _router.relays.pop(broker) + + @tractor.context async def _setup_persistent_emsd( @@ -298,20 +468,18 @@ async def _setup_persistent_emsd( _router = _Router(nursery=service_nursery) - # TODO: send back the full set of persistent orders/execs persistent + # TODO: send back the full set of persistent + # orders/execs? await ctx.started() - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down + # allow service tasks to run until cancelled await trio.sleep_forever() async def translate_and_relay_brokerd_events( broker: str, - # ems_client_order_stream: tractor.MsgStream, brokerd_trades_stream: tractor.MsgStream, - book: _DarkBook, router: _Router, ) -> AsyncIterator[dict]: @@ -334,6 +502,11 @@ async def translate_and_relay_brokerd_events( {'presubmitted', 'submitted', 'cancelled', 'inactive'} ''' + book = router.get_dark_book(broker) + relay = router.relays[broker] + + assert relay.brokerd_dialogue == brokerd_trades_stream + async for brokerd_msg in brokerd_trades_stream: name = brokerd_msg['name'] @@ -342,13 +515,15 @@ async def translate_and_relay_brokerd_events( if name == 'position': + pos_msg = BrokerdPosition(**brokerd_msg).dict() + + # keep up to date locally in ``emsd`` + relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg) + # relay through position msgs immediately by # broadcasting updates on all client streams - for oid, ems_client_order_stream in router.dialogues.items(): - - await ems_client_order_stream.send( - BrokerdPosition(**brokerd_msg).dict() - ) + for client_stream in router.clients: + await client_stream.send(pos_msg) continue @@ -425,7 +600,7 @@ async def translate_and_relay_brokerd_events( resp = None broker_details = {} - client_flow_complete: bool = False + # client_flow_complete: bool = False if name in ( 'error', @@ -460,7 +635,7 @@ async def translate_and_relay_brokerd_events( if msg.status == 'cancelled': - client_flow_complete = True + # client_flow_complete = True log.info(f'Cancellation for {oid} is complete!') if msg.status == 'filled': @@ -473,7 +648,7 @@ async def translate_and_relay_brokerd_events( # be sure to pop this stream from our dialogue set # since the order dialogue should be done. - client_flow_complete = True + # client_flow_complete = True log.info(f'Execution for {oid} is complete!') # just log it @@ -503,22 +678,26 @@ async def translate_and_relay_brokerd_events( # Create and relay response status message # to requesting EMS client - ems_client_order_stream = router.dialogues[oid] - await ems_client_order_stream.send( - Status( - oid=oid, - resp=resp, - time_ns=time.time_ns(), - broker_reqid=reqid, - brokerd_msg=broker_details, - ).dict() - ) + try: + ems_client_order_stream = router.dialogues[oid] + await ems_client_order_stream.send( + Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=broker_details, + ).dict() + ) + except KeyError: + log.error( + f'Received `brokerd` msg for unknown client with oid: {oid}') - # TODO: do we want this to keep things cleaned up? - # it might require a special status from brokerd to affirm the - # flow is complete? - # if client_flow_complete: - # router.dialogues.pop(oid) + # TODO: do we want this to keep things cleaned up? + # it might require a special status from brokerd to affirm the + # flow is complete? + # if client_flow_complete: + # router.dialogues.pop(oid) async def process_client_order_cmds( @@ -533,6 +712,8 @@ async def process_client_order_cmds( ) -> None: + client_dialogues = router.dialogues + # cmd: dict async for cmd in client_order_stream: @@ -541,14 +722,16 @@ async def process_client_order_cmds( action = cmd['action'] oid = cmd['oid'] + # TODO: make ``tractor.MsgStream`` a frozen type again such that it + # can be stored in sets like the old context was. + # wait, maybe this **is** already working thanks to our parent + # `trio` type? + # register this stream as an active dialogue for this order id # such that translated message from the brokerd backend can be # routed (relayed) to **just** that client stream (and in theory # others who are registered for such order affiliated msgs). - - # TODO: make ``tractor.MsgStream`` a frozen type again such that it - # can be stored in sets like the old context was. - router.dialogues[oid] = client_order_stream + client_dialogues[oid] = client_order_stream reqid = dark_book._ems2brokerd_ids.inverse.get(oid) live_entry = dark_book._ems_entries.get(oid) @@ -655,7 +838,7 @@ async def process_client_order_cmds( # flow so that if a cancel comes from the requesting # client, before that ack, when the ack does arrive we # immediately take the reqid from the broker and cancel - # that order with them immediately. + # that live order asap. dark_book._ems_entries[oid] = msg # "DARK" triggers @@ -725,102 +908,6 @@ async def process_client_order_cmds( ) -@asynccontextmanager -async def maybe_open_brokerd_trades_dialogue( - - router: _Router, - feed: Feed, - broker: str, - symbol: str, - dark_book: _DarkBook, - _exec_mode: str, - loglevel: str, - -) -> tuple[dict, tractor.MsgStream]: - '''Open and yield ``brokerd`` trades dialogue context-stream if none - already exists. - - ''' - trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) - portal = feed._brokerd_portal - - if broker in _router.relays: - - positions, brokerd_trades_stream = _router.relays[broker] - - # TODO: get updated positions here? - yield positions, brokerd_trades_stream - return - - if trades_endpoint is None or _exec_mode == 'paper': - - # for paper mode we need to mock this trades response feed - # so we load bidir stream to a new sub-actor running a - # paper-simulator clearing engine. - - # load the paper trading engine - _exec_mode = 'paper' - log.warning(f'Entering paper trading mode for {broker}') - - # load the paper trading engine as a subactor of this emsd - # actor to simulate the real IPC load it'll have when also - # pulling data from feeds - open_trades_endpoint = paper.open_paperboi( - broker=broker, - symbol=symbol, - loglevel=loglevel, - ) - - else: - # open live brokerd trades endpoint - open_trades_endpoint = portal.open_context( - trades_endpoint, - loglevel=loglevel, - ) - - async with ( - - open_trades_endpoint as (brokerd_ctx, positions), - brokerd_ctx.open_stream() as brokerd_trades_stream, - trio.open_nursery() as n, - - ): - # XXX: really we only want one stream per `emsd` actor - # to relay global `brokerd` order events unless we're - # doing to expect each backend to relay only orders - # affiliated with a particular ``trades_dialogue()`` - # session (seems annoying for implementers). So, here - # we cache the relay task and instead of running multiple - # tasks (which will result in multiples of the same msg being - # relayed for each EMS client) we just register each client - # stream to this single relay loop using _router.dialogues - - # begin processing order events from the target brokerd backend - # by receiving order submission response messages, - # normalizing them to EMS messages and relaying back to - # the piker order client set. - - n.start_soon( - - translate_and_relay_brokerd_events, - - broker, - # ems_client_order_stream, - brokerd_trades_stream, - dark_book, - _router, - ) - - _router.relays[broker] = (positions, brokerd_trades_stream) - - try: - yield positions, brokerd_trades_stream - - finally: - # remove from cache so next client will respawn if needed - _router.relays.pop(broker) - - @tractor.context async def _emsd_main( @@ -855,7 +942,7 @@ async def _emsd_main( run (dark order) conditions on inputs and trigger brokerd "live" order submissions. | - - ``translate_and_relay_brokerd_events()``: + - (maybe) ``translate_and_relay_brokerd_events()``: accept normalized trades responses from brokerd, process and relay to ems client(s); this is a effectively a "trade event reponse" proxy-broker. @@ -905,24 +992,24 @@ async def _emsd_main( # only open if one isn't already up: we try to keep # as few duplicate streams as necessary - maybe_open_brokerd_trades_dialogue( - _router, + _router.maybe_open_brokerd_trades_dialogue( feed, - broker, symbol, dark_book, _exec_mode, loglevel, - ) as (positions, brokerd_trades_stream), + ) as relay, trio.open_nursery() as n, ): + brokerd_stream = relay.brokerd_dialogue # .clone() + # signal to client that we're started # TODO: we could eventually send back **all** brokerd # positions here? - await ems_ctx.started(positions) + await ems_ctx.started(relay.positions) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates @@ -932,7 +1019,8 @@ async def _emsd_main( n.start_soon( clear_dark_triggers, - brokerd_trades_stream, + # relay.brokerd_dialogue, + brokerd_stream, ems_client_order_stream, feed.stream, @@ -942,12 +1030,41 @@ async def _emsd_main( ) # start inbound (from attached client) order request processing - await process_client_order_cmds( + try: + _router.clients.add(ems_client_order_stream) - ems_client_order_stream, - brokerd_trades_stream, - symbol, - feed, - dark_book, - _router, - ) + await process_client_order_cmds( + + ems_client_order_stream, + + # relay.brokerd_dialogue, + brokerd_stream, + + symbol, + feed, + dark_book, + _router, + ) + + finally: + # remove client from "registry" + _router.clients.remove(ems_client_order_stream) + + dialogues = _router.dialogues + + for oid, client_stream in dialogues.items(): + + if client_stream == ems_client_order_stream: + + log.warning( + f'client dialogue is being abandoned:\n' + f'{oid} ->\n{client_stream._ctx.chan.uid}' + ) + dialogues.pop(oid) + + # TODO: for order dialogues left "alive" in + # the ems this is where we should allow some + # system to take over management. Likely we + # want to allow the user to choose what kind + # of policy to use (eg. cancel all orders + # from client, run some algo, etc.).