.clearing: lazily spawn `brokerd` from `emsd`

Drop the ONE coupling that forces feed + trading eps into
the same actor: `Router.open_trade_relays()` pulling its
trades-dialog portal from `feed.portals[brokermod]`.
Instead `open_brokerd_dialog()` now (maybe) spawns/finds
`brokerd.<broker>` itself via `maybe_spawn_brokerd()` and
ONLY when a live trades-ep will actually be opened; the
paper-mode short-circuit never touches it, so post
feed-cutover paper sessions will run with zero `brokerd`
procs.

Pre-cutover this is a pure refactor: the registry lookup
just finds the same feed-spawned daemon.

Deats,
- new `open_brokerd_dialog()` sig: portal acquisition moves
  inside via an `acquire_live_portal()` helper; keep an
  explicit `portal: Portal|None` override for the
  `piker ledger` cli which boots its own ad-hoc actor.
- `Router.maybe_open_brokerd_dialog()` drops its `portal`
  param; `open_trade_relays()` drops the `feed.portals`
  lookup entirely.
- `.accounting.cli`: pass `portal=` by keyword.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Gud Boi 2026-06-09 17:22:16 -04:00
parent 7de661c03e
commit 3548893337
2 changed files with 44 additions and 16 deletions

View File

@ -143,12 +143,15 @@ def sync(
# (what the EMS normally does internall) B) # (what the EMS normally does internall) B)
open_brokerd_dialog( open_brokerd_dialog(
brokermod, brokermod,
portal,
exec_mode=( exec_mode=(
'paper' 'paper'
if account == 'paper' if account == 'paper'
else 'live' else 'live'
), ),
# use our own ad-hoc-spawned actor,
# do NOT (spawn and) use the
# `brokerd.<broker>` service daemon!
portal=portal,
loglevel=loglevel, loglevel=loglevel,
) as ( ) as (
brokerd_stream, brokerd_stream,

View File

@ -335,9 +335,14 @@ class TradesRelay(Struct):
@acm @acm
async def open_brokerd_dialog( async def open_brokerd_dialog(
brokermod: ModuleType, brokermod: ModuleType,
portal: tractor.Portal,
exec_mode: str, exec_mode: str,
fqme: str|None = None, fqme: str|None = None,
# XXX: explicit (already spawned) trading-actor override,
# currently only used by the `piker ledger` cli which
# boots its own ad-hoc `brokerd`-like actor; normally we
# (lazily) spawn/find the `brokerd.<broker>` daemon here.
portal: tractor.Portal|None = None,
loglevel: str|None = None, loglevel: str|None = None,
) -> tuple[ ) -> tuple[
@ -351,6 +356,10 @@ async def open_brokerd_dialog(
paper engine instance depending on live trading support for the paper engine instance depending on live trading support for the
broker backend, configuration, or client code usage. broker backend, configuration, or client code usage.
NOTE: this is now the ONE place where a (live, credentialed)
`brokerd.<broker>` daemon-actor gets (lazily) booted; pure
data/paper sessions should never spawn one!
''' '''
get_console_log( get_console_log(
level=loglevel, level=loglevel,
@ -416,16 +425,29 @@ async def open_brokerd_dialog(
) )
exec_mode: str = 'paper' exec_mode: str = 'paper'
if ( @acm
trades_endpoint is not None async def acquire_live_portal():
or '''
exec_mode != 'paper' Deliver a portal to the (live, credentialed) trading
): actor hosting the backend's `open_trade_dialog()` ep:
# open live brokerd trades endpoint either the caller-provided override or the (maybe
open_trades_endpoint = portal.open_context( lazily spawned) `brokerd.<broker>` service daemon.
trades_endpoint,
loglevel=loglevel, '''
if portal is not None:
yield portal
return
# XXX: the ONE (normal) place a `brokerd.<broker>`
# daemon-actor gets booted in the runtime B)
from piker.brokers._daemon import (
maybe_spawn_brokerd,
) )
async with maybe_spawn_brokerd(
brokermod.name,
loglevel=loglevel,
) as live_portal:
yield live_portal
@acm @acm
async def maybe_open_paper_ep(): async def maybe_open_paper_ep():
@ -437,7 +459,14 @@ async def open_brokerd_dialog(
return return
# open trades-dialog endpoint with backend broker # open trades-dialog endpoint with backend broker
async with open_trades_endpoint as msg: async with (
acquire_live_portal() as live_portal,
live_portal.open_context(
trades_endpoint,
loglevel=loglevel,
) as msg,
):
ctx, first = msg ctx, first = msg
# runtime indication that the backend can't support live # runtime indication that the backend can't support live
@ -581,7 +610,6 @@ class Router(Struct):
async def maybe_open_brokerd_dialog( async def maybe_open_brokerd_dialog(
self, self,
brokermod: ModuleType, brokermod: ModuleType,
portal: tractor.Portal,
exec_mode: str, exec_mode: str,
fqme: str, fqme: str,
loglevel: str, loglevel: str,
@ -606,7 +634,6 @@ class Router(Struct):
async with open_brokerd_dialog( async with open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
portal=portal,
exec_mode=exec_mode, exec_mode=exec_mode,
fqme=fqme, fqme=fqme,
loglevel=loglevel, loglevel=loglevel,
@ -668,7 +695,6 @@ class Router(Struct):
brokername, _, _, _ = unpack_fqme(fqme) brokername, _, _, _ = unpack_fqme(fqme)
brokermod = feed.mods[brokername] brokermod = feed.mods[brokername]
broker = brokermod.name broker = brokermod.name
portal = feed.portals[brokermod]
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
flume = feed.flumes[fqme] flume = feed.flumes[fqme]
@ -682,7 +708,6 @@ class Router(Struct):
async with self.maybe_open_brokerd_dialog( async with self.maybe_open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
portal=portal,
exec_mode=exec_mode, exec_mode=exec_mode,
fqme=fqme, fqme=fqme,
loglevel=loglevel, loglevel=loglevel,