diff --git a/piker/accounting/cli.py b/piker/accounting/cli.py index 10306898..229a9d14 100644 --- a/piker/accounting/cli.py +++ b/piker/accounting/cli.py @@ -143,12 +143,15 @@ def sync( # (what the EMS normally does internall) B) open_brokerd_dialog( brokermod, - portal, exec_mode=( 'paper' if account == 'paper' else 'live' ), + # use our own ad-hoc-spawned actor, + # do NOT (spawn and) use the + # `brokerd.` service daemon! + portal=portal, loglevel=loglevel, ) as ( brokerd_stream, diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index c77ffd1b..1d152926 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -335,9 +335,14 @@ class TradesRelay(Struct): @acm async def open_brokerd_dialog( brokermod: ModuleType, - portal: tractor.Portal, exec_mode: str, fqme: str|None = None, + + # XXX: explicit (already spawned) trading-actor override, + # currently only used by the `piker ledger` cli which + # boots its own ad-hoc `brokerd`-like actor; normally we + # (lazily) spawn/find the `brokerd.` daemon here. + portal: tractor.Portal|None = None, loglevel: str|None = None, ) -> tuple[ @@ -351,6 +356,10 @@ async def open_brokerd_dialog( paper engine instance depending on live trading support for the broker backend, configuration, or client code usage. + NOTE: this is now the ONE place where a (live, credentialed) + `brokerd.` daemon-actor gets (lazily) booted; pure + data/paper sessions should never spawn one! + ''' get_console_log( level=loglevel, @@ -416,16 +425,29 @@ async def open_brokerd_dialog( ) exec_mode: str = 'paper' - if ( - trades_endpoint is not None - or - exec_mode != 'paper' - ): - # open live brokerd trades endpoint - open_trades_endpoint = portal.open_context( - trades_endpoint, - loglevel=loglevel, + @acm + async def acquire_live_portal(): + ''' + Deliver a portal to the (live, credentialed) trading + actor hosting the backend's `open_trade_dialog()` ep: + either the caller-provided override or the (maybe + lazily spawned) `brokerd.` service daemon. + + ''' + if portal is not None: + yield portal + return + + # XXX: the ONE (normal) place a `brokerd.` + # daemon-actor gets booted in the runtime B) + from piker.brokers._daemon import ( + maybe_spawn_brokerd, ) + async with maybe_spawn_brokerd( + brokermod.name, + loglevel=loglevel, + ) as live_portal: + yield live_portal @acm async def maybe_open_paper_ep(): @@ -437,7 +459,14 @@ async def open_brokerd_dialog( return # open trades-dialog endpoint with backend broker - async with open_trades_endpoint as msg: + async with ( + acquire_live_portal() as live_portal, + + live_portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) as msg, + ): ctx, first = msg # runtime indication that the backend can't support live @@ -581,7 +610,6 @@ class Router(Struct): async def maybe_open_brokerd_dialog( self, brokermod: ModuleType, - portal: tractor.Portal, exec_mode: str, fqme: str, loglevel: str, @@ -606,7 +634,6 @@ class Router(Struct): async with open_brokerd_dialog( brokermod=brokermod, - portal=portal, exec_mode=exec_mode, fqme=fqme, loglevel=loglevel, @@ -668,7 +695,6 @@ class Router(Struct): brokername, _, _, _ = unpack_fqme(fqme) brokermod = feed.mods[brokername] broker = brokermod.name - portal = feed.portals[brokermod] # XXX: this should be initial price quote from target provider flume = feed.flumes[fqme] @@ -682,7 +708,6 @@ class Router(Struct): async with self.maybe_open_brokerd_dialog( brokermod=brokermod, - portal=portal, exec_mode=exec_mode, fqme=fqme, loglevel=loglevel,