14 KiB
Split brokerd.<broker> into trading-only brokerd + new datad.<broker>
Context
Today a single brokerd.<broker> actor hosts BOTH concerns:
- data feed service tasks: the
_FeedsBus+open_feed_bus()ep (piker/data/feed.py:464), per-symbolallocate_persistent_feed()tasks (shm writers viasample_and_broadcast(), history backfill viapiker.tsp), plus backend epsstream_quotes,open_history_client,open_symbol_search,get_mkt_infofrompiker/brokers/<backend>/feed.py/symbols.py, - live order-control tasks:
open_trade_dialog()frompiker/brokers/<backend>/broker.py, driven byemsd.
The codebase already anticipates this split: piker/data/validate.py:70-91 groups backend eps into 'datad' vs 'brokerd' kinds, piker/brokers/ib/__init__.py:62-70 already declares _brokerd_mods/_datad_mods, and piker/brokers/_daemon.py:62 carries the literal TODO “rename the daemon to datad prolly once we split up broker vs. data tasks into separate actors?”. This work executes that split.
User-decided constraints:
datad.<broker>is a sibling ofbrokerd.<broker>underpikerd, spawned via the existingServices+maybe_spawn_daemon()machinery (piker/service/_daemon.py:46).- Hard cutover, staged by layer — no dual-mode runtime flag; every stage lands fully working.
- Post-split
brokerdis trading-only and lazily spawned solely by emsd’sopen_brokerd_dialogpath; UI/CLI/feed code never spawns it. Chart-only and paper sessions run with zero brokerd processes.
Target topology
pikerd
├── datad.ib feed bus, shm writers, tsp history, symbol search
├── brokerd.ib open_trade_dialog only (EMS-spawned, lazy)
├── emsd
│ └── paperboi.ib (paper mode; opens its feed via datad)
└── samplerd
Key verified facts (load-bearing)
feed.portalshas exactly ONE trading consumer:piker/clearing/_ems.py:671(portal = feed.portals[brokermod]→ handed toopen_brokerd_dialog). This is the single coupling forcing feed + trading into one actor.assert 'brokerd' in servicenameatpiker/data/feed.py:502is the only actor-name assert in the tree.piker ledger(piker/accounting/cli.py:100-157) is a hidden consumer: it callsbroker_init()directly, spawns its own ad-hoc actor, enters_setup_persistent_brokerd, then callsopen_brokerd_dialog(brokermod, portal, ...)with an explicit portal — the new signature must keep aportal:override param.- kraken’s feed→broker coupling is mild:
kraken/broker.py:77-81importsNoBsWs/open_autorecon_ws(really frompiker.data._web_bs) andstream_messages(pure parser) fromfeed.py. In-process imports only —enable_modulesgates RPC, not imports. No live shared state; each actor opens its own WS. - ib: default
client_id=6116(ib/api.py:1320) with linear retryclientId=client_id + i(:1403). Two ib actors connecting concurrently will collide → needs a role-based id offset. Both daemons need_spawn_kwargs={'infect_asyncio': True}(copied by both init fns). - binance has no
symbols.py(get_mkt_info/open_symbol_searchlive inbinance/feed.py); kucoin/questrade/robinhood are flat single-file with NO__enable_modules__; deribit has nobroker.py. tests/test_services.py:80,185assertbrokerdactor names incl. the paper-mode flow assertingbrokerd.krakenspawns (:229,:237) — must invert post-split._root_modules(piker/service/_actor_runtime.py:156) must gain the new datad daemon mod sopikerd_portal.run(spawn_datad, ...)resolves.piker/brokers/core.py:175symbol_searchdoesportal.run(search_w_brokerd)where the target fn lives inpiker.brokers.core→ that mod must stay RPC-enabled in datad.
Module decision
New file piker/data/_daemon.py hosts all datad machinery (_setup_persistent_datad, datad_init, spawn_datad, maybe_spawn_datad, _datad_service_mods). Mirrors the piker.brokers._daemon / piker.data._sampling (samplerd) per-subsystem convention and satisfies the existing TODO at brokers/_daemon.py:49 (“move this def to the .data subpkg”). piker.brokers._daemon keeps brokerd-only code, slimmed. Do NOT over-DRY the two ~40-line init fns into a shared factory yet (samplerd precedent accepts the duplication).
Stage 0 — prep: backend module grouping + ep introspection (no topology change)
piker/data/validate.py: add aget_eps(mod, kind) -> dict[str, Callable]helper returning the backend’s defined eps for a daemon-kind from_eps(missing eps excluded). Used later by both init fns + fail-fast checks.- Declare daemon-kind module groups in each split backend’s
__init__.py, keeping__enable_modules__as the (deduped) union so behavior is unchanged:kraken:_brokerd_mods = ['api', 'broker'],_datad_mods = ['api', 'feed', 'symbols']binance:_brokerd_mods = ['api', 'broker'],_datad_mods = ['api', 'feed']deribit:_brokerd_mods = [],_datad_mods = ['api', 'feed']ib: adjust existing groups so each includes'api'- flat backends (kucoin etc.): no attrs — init fns fall back to enabling just
modpath(existing behavior).
- Hygiene:
kraken/broker.py:77-81importsNoBsWs/open_autorecon_wsfrompiker.data._web_bsdirectly (keep thestream_messagesimport from.feed).
Gate: full pytest green, zero behavior change.
Stage 1 — introduce datad daemon machinery (additive)
New piker/data/_daemon.py:
_datad_service_mods: list[str]— datad-always-enabled mods (successor to the data side of_data_modsfrombrokers/_daemon.py:52):['piker.brokers.core', 'piker.brokers.data', 'piker.data', 'piker.data.feed', 'piker.data._sampling', 'piker.data._daemon']._setup_persistent_datad(ctx, brokername, loglevel)—@tractor.contextfixture; logging boilerplate (as_setup_persistent_brokerd:81-88), then allocates the actor-global feed bus exactly as the brokerd fixture does today (brokers/_daemon.py:105-121):assert not feed._bus, open service nursery,feed.get_feed_bus(brokername, service_nursery),ctx.started(),sleep_forever().datad_init(brokername, ...)— mirrorsbroker_init()(brokers/_daemon.py:132): actor namef'datad.{brokername}', copies backend_spawn_kwargs(critical for ib infect_asyncio), buildsenable_modulesfromgetattr(brokermod, '_datad_mods', getattr(brokermod, '__enable_modules__', [])).spawn_datad(brokername, ...)— mirrorsspawn_brokerd()(brokers/_daemon.py:202):Services.actor_n.start_actor(dname, enable_modules=_datad_service_mods + backend_mods, ...)+Services.start_service_task(dname, portal, _setup_persistent_datad, ...).maybe_spawn_datad(brokername, ...)— wrapsmaybe_spawn_daemon(service_name=f'datad.{brokername}', service_task_target=spawn_datad, ...)exactly likemaybe_spawn_brokerd(brokers/_daemon.py:256).
Supporting edits:
piker/service/_actor_runtime.py:156: add'piker.data._daemon'to_root_modules(keep'piker.brokers._daemon').piker/service/__init__.py: re-exportspawn_datad/maybe_spawn_datadnext to the brokerd ones (:56-57).tests/test_services.py: newtest_datad_spawn—open_test_pikerd+maybe_spawn_datad('kraken')+ensure_service('datad.kraken'). (Do NOT route a feed through it yet —open_feed_bus’s assert still says brokerd.)
Gate: suite green + new spawn test; nothing routes through datad yet.
Stage 2 — clearing layer: emsd self-spawns brokerd (decouple from feed.portals)
Sequenced BEFORE the feed cutover so live trading works at every boundary (otherwise feed.portals would hand emsd a datad portal).
piker/clearing/_ems.py:
open_brokerd_dialog()(:336) new signature:(brokermod, exec_mode, fqme=None, portal: tractor.Portal|None = None, loglevel=None). Internals: keep trades-ep detection (:400-417); acquire the brokerd actor ONLY when a live ep will actually open, via a small inner@acm _acquire_live_portal()that yields the passedportalif given (thepiker ledgerpath) elseasync with maybe_spawn_brokerd(brokermod.name, loglevel=loglevel)— the single place brokerd boots post-split. Move the eagerportal.open_context(trades_endpoint, ...)construction (:425) inside that block; the paper short-circuit (:432) never touches it.Router.maybe_open_brokerd_dialog()(:581) — drop theportalparam;Router.open_trade_relays()(:640) — deleteportal = feed.portals[brokermod](:671) and the pass-through (:685).piker/accounting/cli.py:144: switch to keyword formopen_brokerd_dialog(brokermod, exec_mode=..., portal=portal, loglevel=...).
Pre-cutover this is a pure refactor: emsd’s maybe_spawn_brokerd just finds the already-running brokerd via the registry.
Gate: tests/test_ems.py + services suite green; manual paper order on kraken; piker ledger smoke.
Stage 3 — feed layer hard cutover to datad
piker/data/feed.py: importmaybe_spawn_datad(replacingmaybe_spawn_brokerd,:56-58);open_feed():895→maybe_spawn_datad(...)(renamebrokerd_ctxs→datad_ctxs);open_feed_bus():502→assert 'datad' in servicename; comment sweep.piker/brokers/_daemon.py_setup_persistent_brokerd(): slim to trading-only fixture — logging setup,ctx.started(),sleep_forever(). Drop the bus alloc,assert not feed._bus, the service nursery (backendopen_trade_dialogctxs own their task trees), theeg.ExceptionGrouphandler, and the stale_FeedsBusTYPE_CHECKING import. (piker ledger’s ad-hoc actor enters this same slimmed fixture — exactly what it needs.)- Repoint remaining data-flavored spawn sites
maybe_spawn_brokerd→maybe_spawn_datad:piker/ui/_app.py:40,57(symbol search; optionally renameinstall_brokerd_search→install_datad_search, def atpiker/data/feed.py:766)piker/brokers/core.py:33,175(symbol_search)piker/brokers/cli.py:38,190,320(brokercheck,record)piker/ui/cli.py:30,72,121(legacy kivy monitor/optschain) + best-effortpiker/ui/kivy/option_chain.py:498wait_for_actor('brokerd')→'datad'
tests/test_services.py::80,:185actor_name = 'datad'; paper-mode test assertsdatad.kraken+paperboi.kraken+emsdAND adds the headline negative check:assert 'brokerd.kraken' not in services.service_tasks.
Gate: full suite with inverted assertions; manual: chart on binance/kraken shows datad.<broker> in piker services and NO brokerd; paper order fills; symbol search works; history backfill sane.
Stage 4 — caps-sec slimming, validation, ib client-id
piker/brokers/_daemon.py: delete_data_mods(:52);spawn_brokerd():236→enable_modules=tractor_kwargs.pop('enable_modules');broker_init():184readsgetattr(brokermod, '_brokerd_mods', getattr(brokermod, '__enable_modules__', [])). Resulting brokerd enable set has nopiker.data.*at all.- Fail-fast ep validation via Stage-0
validate.get_eps():broker_initraises a clear error whenget_eps(mod, 'brokerd')is empty (e.g. “kucoin is datad-only — use paper mode”);datad_initwarns analogously. - ib client-id mitigation in
load_aio_clients()(ib/api.py:~1316): role-based offset whenclient_idis the 6116 default (e.g.+16when'datad' in tractor.current_actor().name), optionally configurable viabrokers.tomlkeys. - Docs/comment sweep: resolve
brokers/_daemon.py:62TODO,piker/cli/__init__.py:253TODO, daemon list inpiker/service/__init__.py:21docstring.
Gate: full suite; audit greps clean: grep -rn maybe_spawn_brokerd piker/ hits only clearing/_ems.py, service/__init__.py, brokers/_daemon.py; no spawn-path 'brokerd' literals left in piker/data, piker/ui, piker/brokers/cli.py.
Risk register
| Risk | Sev | Mitigation |
|---|---|---|
ib: datad.ib + brokerd.ib both connect to TWS/gw; client_id=6116 collision burns connect_timeout retries |
High (live ib only) | Stage 4 role-based id offset; both daemons inherit infect_asyncio via _spawn_kwargs copy in both init fns |
piker ledger ad-hoc actor path silently broken by signature change |
Med | portal: override kept on open_brokerd_dialog (Stage 2); explicit smoke test |
| datad-only backends (kucoin/deribit) → useless brokerd spawn | Low | already covered: open_brokerd_dialog forces exec_mode='paper' when no trades ep (_ems.py:413-417) BEFORE the lazy spawn; Stage 4 fail-fast |
brokerd in-process symbology/ledger needs (ib broker.py imports .symbols; kraken uses stream_messages) |
None | verified pure in-process imports; enable_modules gates RPC only |
paper-mode test asserts brokerd spawns (test_services.py:237) |
Low | Stage 3 deliberately inverts it |
| doubled per-broker clients (HTTP/WS, symcache loads) | Low | each actor already opens its own WS today; symcache is disk-read-mostly |
Verification (per stage gates above, plus end-to-end)
pytest tests/test_services.py tests/test_feeds.py tests/test_ems.pyat every stage (kraken/binance public endpoints, no creds needed).- Manual smoke matrix post-Stage-3:
pikerd -l info+piker chart btcusdt.spot.binance→piker servicesshowsdatad.binance, no brokerd; submit paper order →paperboi.binanceappears, still no brokerd; symbol search;piker ledger <broker>.paper. - If ib creds/gateway available: live ib chart + small live order to exercise dual-actor client-id path (Stage 4).
Critical files
- new
piker/data/_daemon.py— all datad machinery piker/brokers/_daemon.py— slimmed brokerd fixture/init/spawnpiker/data/feed.py— spawn cutover + actor-name assertpiker/clearing/_ems.py— emsd lazy brokerd spawn (open_brokerd_dialog,Router)piker/service/_actor_runtime.py,piker/service/__init__.py— root mods + re-exportspiker/data/validate.py—get_eps()helper- backend
__init__.pys (kraken/binance/deribit/ib) —_datad_mods/_brokerd_mods piker/accounting/cli.py— keyword-form dialog calltests/test_services.py— inverted + new assertions