.clearing: lazily spawn `brokerd` from `emsd`

Drop the ONE coupling that forces feed + trading eps into
the same actor: `Router.open_trade_relays()` pulling its
trades-dialog portal from `feed.portals[brokermod]`.
Instead `open_brokerd_dialog()` now (maybe) spawns/finds
`brokerd.<broker>` itself via `maybe_spawn_brokerd()` and
ONLY when a live trades-ep will actually be opened; the
paper-mode short-circuit never touches it, so post
feed-cutover paper sessions will run with zero `brokerd`
procs.

Pre-cutover this is a pure refactor: the registry lookup
just finds the same feed-spawned daemon.

Deats,
- new `open_brokerd_dialog()` sig: portal acquisition moves
  inside via an `acquire_live_portal()` helper; keep an
  explicit `portal: Portal|None` override for the
  `piker ledger` cli which boots its own ad-hoc actor.
- `Router.maybe_open_brokerd_dialog()` drops its `portal`
  param; `open_trade_relays()` drops the `feed.portals`
  lookup entirely.
- `.accounting.cli`: pass `portal=` by keyword.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>

Prompt-IO: ai/prompt-io/claude/20260610T171226Z_64181219_prompt_io.md
datad_service
Gud Boi 2026-06-10 13:12:59 -04:00
parent 6418121923
commit 59d5d9a66d
4 changed files with 134 additions and 16 deletions

View File

@ -0,0 +1,47 @@
---
model: claude-fable-5[1m]
service: claude
session: 32d15f9a-b2d3-4c26-bdc9-190219141a25
timestamp: 2026-06-10T17:12:26Z
git_ref: datad_service
diff_cmd: git log -1 -p --follow -- ai/prompt-io/claude/20260610T171226Z_64181219_prompt_io.md
scope: code
substantive: true
raw_file: 20260610T171226Z_64181219_prompt_io.raw.md
---
## Prompt
Same session-initiating `brokerd`-split instruction (see
`20260610T170859Z_75cefe10_prompt_io.md`); this is the
approved plan's "stage 2": decouple the clearing layer
from `feed.portals` BEFORE the feed cutover so live
trading works at every stage boundary. User-decided
constraint applied: post-split `brokerd` is
trading-only and spawned LAZILY only by `emsd`'s
`open_brokerd_dialog()` path.
## Response summary
Kill the single coupling forcing feed + trading eps
into one actor (`Router.open_trade_relays()` pulling
its trades portal from `feed.portals[brokermod]`):
`open_brokerd_dialog()` now (maybe) spawns/finds
`brokerd.<broker>` itself and ONLY when a live
trades-ep will actually open; paper mode never touches
it. Pre-cutover this is a pure refactor (registry
lookup finds the same feed-spawned daemon).
## Files changed
- `piker/clearing/_ems.py``open_brokerd_dialog()`
re-sig + inner `acquire_live_portal()`;
`Router.maybe_open_brokerd_dialog()` drops `portal`
param; `open_trade_relays()` drops the
`feed.portals` lookup
- `piker/accounting/cli.py` — keyword-form `portal=`
override kept for the `piker ledger` ad-hoc actor
## Human edits
None — committed as generated.

View File

@ -0,0 +1,43 @@
---
model: claude-fable-5[1m]
service: claude
timestamp: 2026-06-10T17:12:26Z
git_ref: datad_service
diff_cmd: git log -1 -p --follow -- ai/prompt-io/claude/20260610T171226Z_64181219_prompt_io.md
---
NOTE: diff-ref mode entry (code committed in the same
commit as this log); backfilled from the live dev
session transcript per the `/prompt-io` skill rules.
> `git log -1 -p --follow -- piker/clearing/_ems.py`
Generated: new `open_brokerd_dialog()` signature
`(brokermod, exec_mode, fqme=None, portal=None,
loglevel=None)` with an inner `@acm
acquire_live_portal()` that yields the caller-provided
`portal` override (the `piker ledger` path) else
`maybe_spawn_brokerd(brokermod.name)` — designated THE
one place a live, credentialed `brokerd.<broker>` gets
booted post-split. The eager
`portal.open_context(trades_endpoint, ...)`
construction moved inside that block so the paper
short-circuit never acquires a live portal.
> `git log -1 -p --follow -- piker/accounting/cli.py`
Key analysis (verbatim from session):
- `feed.portals` had exactly ONE trading consumer:
`piker/clearing/_ems.py:671` (`portal =
feed.portals[brokermod]`) — the single coupling
forcing feed + trading into one actor.
- `piker ledger` (`accounting/cli.py:100-157`) is a
hidden consumer: it calls `broker_init()` directly,
spawns its own ad-hoc actor and passes the portal
positionally — the new signature must keep an
explicit `portal:` override param for this path.
- stage sequencing: clearing decouple lands BEFORE the
feed cutover, otherwise `feed.portals` would hand
emsd a `datad` portal and live `open_trade_dialog`
RPC would fail.

View File

@ -143,12 +143,15 @@ def sync(
# (what the EMS normally does internall) B) # (what the EMS normally does internall) B)
open_brokerd_dialog( open_brokerd_dialog(
brokermod, brokermod,
portal,
exec_mode=( exec_mode=(
'paper' 'paper'
if account == 'paper' if account == 'paper'
else 'live' else 'live'
), ),
# use our own ad-hoc-spawned actor,
# do NOT (spawn and) use the
# `brokerd.<broker>` service daemon!
portal=portal,
loglevel=loglevel, loglevel=loglevel,
) as ( ) as (
brokerd_stream, brokerd_stream,

View File

@ -335,9 +335,14 @@ class TradesRelay(Struct):
@acm @acm
async def open_brokerd_dialog( async def open_brokerd_dialog(
brokermod: ModuleType, brokermod: ModuleType,
portal: tractor.Portal,
exec_mode: str, exec_mode: str,
fqme: str|None = None, fqme: str|None = None,
# XXX: explicit (already spawned) trading-actor override,
# currently only used by the `piker ledger` cli which
# boots its own ad-hoc `brokerd`-like actor; normally we
# (lazily) spawn/find the `brokerd.<broker>` daemon here.
portal: tractor.Portal|None = None,
loglevel: str|None = None, loglevel: str|None = None,
) -> tuple[ ) -> tuple[
@ -351,6 +356,10 @@ async def open_brokerd_dialog(
paper engine instance depending on live trading support for the paper engine instance depending on live trading support for the
broker backend, configuration, or client code usage. broker backend, configuration, or client code usage.
NOTE: this is now the ONE place where a (live, credentialed)
`brokerd.<broker>` daemon-actor gets (lazily) booted; pure
data/paper sessions should never spawn one!
''' '''
get_console_log( get_console_log(
level=loglevel, level=loglevel,
@ -416,16 +425,29 @@ async def open_brokerd_dialog(
) )
exec_mode: str = 'paper' exec_mode: str = 'paper'
if ( @acm
trades_endpoint is not None async def acquire_live_portal():
or '''
exec_mode != 'paper' Deliver a portal to the (live, credentialed) trading
): actor hosting the backend's `open_trade_dialog()` ep:
# open live brokerd trades endpoint either the caller-provided override or the (maybe
open_trades_endpoint = portal.open_context( lazily spawned) `brokerd.<broker>` service daemon.
trades_endpoint,
loglevel=loglevel, '''
if portal is not None:
yield portal
return
# XXX: the ONE (normal) place a `brokerd.<broker>`
# daemon-actor gets booted in the runtime B)
from piker.brokers._daemon import (
maybe_spawn_brokerd,
) )
async with maybe_spawn_brokerd(
brokermod.name,
loglevel=loglevel,
) as live_portal:
yield live_portal
@acm @acm
async def maybe_open_paper_ep(): async def maybe_open_paper_ep():
@ -437,7 +459,14 @@ async def open_brokerd_dialog(
return return
# open trades-dialog endpoint with backend broker # open trades-dialog endpoint with backend broker
async with open_trades_endpoint as msg: async with (
acquire_live_portal() as live_portal,
live_portal.open_context(
trades_endpoint,
loglevel=loglevel,
) as msg,
):
ctx, first = msg ctx, first = msg
# runtime indication that the backend can't support live # runtime indication that the backend can't support live
@ -581,7 +610,6 @@ class Router(Struct):
async def maybe_open_brokerd_dialog( async def maybe_open_brokerd_dialog(
self, self,
brokermod: ModuleType, brokermod: ModuleType,
portal: tractor.Portal,
exec_mode: str, exec_mode: str,
fqme: str, fqme: str,
loglevel: str, loglevel: str,
@ -606,7 +634,6 @@ class Router(Struct):
async with open_brokerd_dialog( async with open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
portal=portal,
exec_mode=exec_mode, exec_mode=exec_mode,
fqme=fqme, fqme=fqme,
loglevel=loglevel, loglevel=loglevel,
@ -668,7 +695,6 @@ class Router(Struct):
brokername, _, _, _ = unpack_fqme(fqme) brokername, _, _, _ = unpack_fqme(fqme)
brokermod = feed.mods[brokername] brokermod = feed.mods[brokername]
broker = brokermod.name broker = brokermod.name
portal = feed.portals[brokermod]
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
flume = feed.flumes[fqme] flume = feed.flumes[fqme]
@ -682,7 +708,6 @@ class Router(Struct):
async with self.maybe_open_brokerd_dialog( async with self.maybe_open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
portal=portal,
exec_mode=exec_mode, exec_mode=exec_mode,
fqme=fqme, fqme=fqme,
loglevel=loglevel, loglevel=loglevel,