piker/ai/claude-code/plans/datad_service.md

144 lines
14 KiB
Markdown
Raw Normal View History

# Split `brokerd.<broker>` into trading-only `brokerd` + new `datad.<broker>`
## Context
Today a single `brokerd.<broker>` actor hosts BOTH concerns:
- **data feed service tasks**: the `_FeedsBus` + `open_feed_bus()` ep (`piker/data/feed.py:464`), per-symbol `allocate_persistent_feed()` tasks (shm writers via `sample_and_broadcast()`, history backfill via `piker.tsp`), plus backend eps `stream_quotes`, `open_history_client`, `open_symbol_search`, `get_mkt_info` from `piker/brokers/<backend>/feed.py`/`symbols.py`,
- **live order-control tasks**: `open_trade_dialog()` from `piker/brokers/<backend>/broker.py`, driven by `emsd`.
The codebase already anticipates this split: `piker/data/validate.py:70-91` groups backend eps into `'datad'` vs `'brokerd'` kinds, `piker/brokers/ib/__init__.py:62-70` already declares `_brokerd_mods`/`_datad_mods`, and `piker/brokers/_daemon.py:62` carries the literal TODO *"rename the daemon to datad prolly once we split up broker vs. data tasks into separate actors?"*. This work executes that split.
**User-decided constraints:**
- `datad.<broker>` is a **sibling** of `brokerd.<broker>` under `pikerd`, spawned via the existing `Services` + `maybe_spawn_daemon()` machinery (`piker/service/_daemon.py:46`).
- **Hard cutover, staged by layer** — no dual-mode runtime flag; every stage lands fully working.
- Post-split `brokerd` is **trading-only and lazily spawned solely by emsd's** `open_brokerd_dialog` path; UI/CLI/feed code never spawns it. Chart-only and paper sessions run with **zero** brokerd processes.
## Target topology
```
pikerd
├── datad.ib feed bus, shm writers, tsp history, symbol search
├── brokerd.ib open_trade_dialog only (EMS-spawned, lazy)
├── emsd
│ └── paperboi.ib (paper mode; opens its feed via datad)
└── samplerd
```
## Key verified facts (load-bearing)
- `feed.portals` has exactly ONE trading consumer: `piker/clearing/_ems.py:671` (`portal = feed.portals[brokermod]` → handed to `open_brokerd_dialog`). This is the single coupling forcing feed + trading into one actor.
- `assert 'brokerd' in servicename` at `piker/data/feed.py:502` is the only actor-name assert in the tree.
- `piker ledger` (`piker/accounting/cli.py:100-157`) is a hidden consumer: it calls `broker_init()` directly, spawns its own ad-hoc actor, enters `_setup_persistent_brokerd`, then calls `open_brokerd_dialog(brokermod, portal, ...)` **with an explicit portal** — the new signature must keep a `portal:` override param.
- kraken's feed→broker coupling is mild: `kraken/broker.py:77-81` imports `NoBsWs`/`open_autorecon_ws` (really from `piker.data._web_bs`) and `stream_messages` (pure parser) from `feed.py`. In-process imports only — `enable_modules` gates RPC, not imports. No live shared state; each actor opens its own WS.
- ib: default `client_id=6116` (`ib/api.py:1320`) with linear retry `clientId=client_id + i` (`:1403`). Two ib actors connecting concurrently will collide → needs a role-based id offset. Both daemons need `_spawn_kwargs={'infect_asyncio': True}` (copied by both init fns).
- binance has no `symbols.py` (`get_mkt_info`/`open_symbol_search` live in `binance/feed.py`); kucoin/questrade/robinhood are flat single-file with NO `__enable_modules__`; deribit has no `broker.py`.
- `tests/test_services.py:80,185` assert `brokerd` actor names incl. the paper-mode flow asserting `brokerd.kraken` spawns (`:229,:237`) — must invert post-split.
- `_root_modules` (`piker/service/_actor_runtime.py:156`) must gain the new datad daemon mod so `pikerd_portal.run(spawn_datad, ...)` resolves.
- `piker/brokers/core.py:175` `symbol_search` does `portal.run(search_w_brokerd)` where the target fn lives in `piker.brokers.core` → that mod must stay RPC-enabled in datad.
## Module decision
New file **`piker/data/_daemon.py`** hosts all datad machinery (`_setup_persistent_datad`, `datad_init`, `spawn_datad`, `maybe_spawn_datad`, `_datad_service_mods`). Mirrors the `piker.brokers._daemon` / `piker.data._sampling` (samplerd) per-subsystem convention and satisfies the existing TODO at `brokers/_daemon.py:49` ("move this def to the `.data` subpkg"). `piker.brokers._daemon` keeps brokerd-only code, slimmed. Do NOT over-DRY the two ~40-line init fns into a shared factory yet (samplerd precedent accepts the duplication).
---
## Stage 0 — prep: backend module grouping + ep introspection (no topology change)
1. `piker/data/validate.py`: add a `get_eps(mod, kind) -> dict[str, Callable]` helper returning the backend's defined eps for a daemon-kind from `_eps` (missing eps excluded). Used later by both init fns + fail-fast checks.
2. Declare daemon-kind module groups in each split backend's `__init__.py`, keeping `__enable_modules__` as the (deduped) union so behavior is unchanged:
- `kraken`: `_brokerd_mods = ['api', 'broker']`, `_datad_mods = ['api', 'feed', 'symbols']`
- `binance`: `_brokerd_mods = ['api', 'broker']`, `_datad_mods = ['api', 'feed']`
- `deribit`: `_brokerd_mods = []`, `_datad_mods = ['api', 'feed']`
- `ib`: adjust existing groups so each includes `'api'`
- flat backends (kucoin etc.): no attrs — init fns fall back to enabling just `modpath` (existing behavior).
3. Hygiene: `kraken/broker.py:77-81` imports `NoBsWs`/`open_autorecon_ws` from `piker.data._web_bs` directly (keep the `stream_messages` import from `.feed`).
**Gate**: full pytest green, zero behavior change.
## Stage 1 — introduce `datad` daemon machinery (additive)
New `piker/data/_daemon.py`:
- `_datad_service_mods: list[str]` — datad-always-enabled mods (successor to the data side of `_data_mods` from `brokers/_daemon.py:52`): `['piker.brokers.core', 'piker.brokers.data', 'piker.data', 'piker.data.feed', 'piker.data._sampling', 'piker.data._daemon']`.
- `_setup_persistent_datad(ctx, brokername, loglevel)``@tractor.context` fixture; logging boilerplate (as `_setup_persistent_brokerd:81-88`), then allocates the actor-global feed bus exactly as the brokerd fixture does today (`brokers/_daemon.py:105-121`): `assert not feed._bus`, open service nursery, `feed.get_feed_bus(brokername, service_nursery)`, `ctx.started()`, `sleep_forever()`.
- `datad_init(brokername, ...)` — mirrors `broker_init()` (`brokers/_daemon.py:132`): actor name `f'datad.{brokername}'`, copies backend `_spawn_kwargs` (**critical for ib infect_asyncio**), builds `enable_modules` from `getattr(brokermod, '_datad_mods', getattr(brokermod, '__enable_modules__', []))`.
- `spawn_datad(brokername, ...)` — mirrors `spawn_brokerd()` (`brokers/_daemon.py:202`): `Services.actor_n.start_actor(dname, enable_modules=_datad_service_mods + backend_mods, ...)` + `Services.start_service_task(dname, portal, _setup_persistent_datad, ...)`.
- `maybe_spawn_datad(brokername, ...)` — wraps `maybe_spawn_daemon(service_name=f'datad.{brokername}', service_task_target=spawn_datad, ...)` exactly like `maybe_spawn_brokerd` (`brokers/_daemon.py:256`).
Supporting edits:
- `piker/service/_actor_runtime.py:156`: add `'piker.data._daemon'` to `_root_modules` (keep `'piker.brokers._daemon'`).
- `piker/service/__init__.py`: re-export `spawn_datad`/`maybe_spawn_datad` next to the brokerd ones (`:56-57`).
- `tests/test_services.py`: new `test_datad_spawn``open_test_pikerd` + `maybe_spawn_datad('kraken')` + `ensure_service('datad.kraken')`. (Do NOT route a feed through it yet — `open_feed_bus`'s assert still says brokerd.)
**Gate**: suite green + new spawn test; nothing routes through datad yet.
## Stage 2 — clearing layer: emsd self-spawns brokerd (decouple from `feed.portals`)
Sequenced BEFORE the feed cutover so live trading works at every boundary (otherwise `feed.portals` would hand emsd a datad portal).
`piker/clearing/_ems.py`:
1. `open_brokerd_dialog()` (`:336`) new signature: `(brokermod, exec_mode, fqme=None, portal: tractor.Portal|None = None, loglevel=None)`. Internals: keep trades-ep detection (`:400-417`); acquire the brokerd actor ONLY when a live ep will actually open, via a small inner `@acm _acquire_live_portal()` that yields the passed `portal` if given (the `piker ledger` path) else `async with maybe_spawn_brokerd(brokermod.name, loglevel=loglevel)`**the single place brokerd boots post-split**. Move the eager `portal.open_context(trades_endpoint, ...)` construction (`:425`) inside that block; the paper short-circuit (`:432`) never touches it.
2. `Router.maybe_open_brokerd_dialog()` (`:581`) — drop the `portal` param; `Router.open_trade_relays()` (`:640`) — delete `portal = feed.portals[brokermod]` (`:671`) and the pass-through (`:685`).
3. `piker/accounting/cli.py:144`: switch to keyword form `open_brokerd_dialog(brokermod, exec_mode=..., portal=portal, loglevel=...)`.
Pre-cutover this is a pure refactor: emsd's `maybe_spawn_brokerd` just *finds* the already-running brokerd via the registry.
**Gate**: `tests/test_ems.py` + services suite green; manual paper order on kraken; `piker ledger` smoke.
## Stage 3 — feed layer hard cutover to datad
1. `piker/data/feed.py`: import `maybe_spawn_datad` (replacing `maybe_spawn_brokerd`, `:56-58`); `open_feed()` `:895``maybe_spawn_datad(...)` (rename `brokerd_ctxs``datad_ctxs`); `open_feed_bus()` `:502``assert 'datad' in servicename`; comment sweep.
2. `piker/brokers/_daemon.py` `_setup_persistent_brokerd()`: slim to trading-only fixture — logging setup, `ctx.started()`, `sleep_forever()`. Drop the bus alloc, `assert not feed._bus`, the service nursery (backend `open_trade_dialog` ctxs own their task trees), the `eg.ExceptionGroup` handler, and the stale `_FeedsBus` TYPE_CHECKING import. (`piker ledger`'s ad-hoc actor enters this same slimmed fixture — exactly what it needs.)
3. Repoint remaining data-flavored spawn sites `maybe_spawn_brokerd``maybe_spawn_datad`:
- `piker/ui/_app.py:40,57` (symbol search; optionally rename `install_brokerd_search``install_datad_search`, def at `piker/data/feed.py:766`)
- `piker/brokers/core.py:33,175` (`symbol_search`)
- `piker/brokers/cli.py:38,190,320` (`brokercheck`, `record`)
- `piker/ui/cli.py:30,72,121` (legacy kivy monitor/optschain) + best-effort `piker/ui/kivy/option_chain.py:498` `wait_for_actor('brokerd')``'datad'`
4. `tests/test_services.py`: `:80,:185` `actor_name = 'datad'`; paper-mode test asserts `datad.kraken` + `paperboi.kraken` + `emsd` AND adds the headline negative check: `assert 'brokerd.kraken' not in services.service_tasks`.
**Gate**: full suite with inverted assertions; manual: chart on binance/kraken shows `datad.<broker>` in `piker services` and NO brokerd; paper order fills; symbol search works; history backfill sane.
## Stage 4 — caps-sec slimming, validation, ib client-id
1. `piker/brokers/_daemon.py`: delete `_data_mods` (`:52`); `spawn_brokerd()` `:236``enable_modules=tractor_kwargs.pop('enable_modules')`; `broker_init()` `:184` reads `getattr(brokermod, '_brokerd_mods', getattr(brokermod, '__enable_modules__', []))`. Resulting brokerd enable set has no `piker.data.*` at all.
2. Fail-fast ep validation via Stage-0 `validate.get_eps()`: `broker_init` raises a clear error when `get_eps(mod, 'brokerd')` is empty (e.g. "kucoin is datad-only — use paper mode"); `datad_init` warns analogously.
3. ib client-id mitigation in `load_aio_clients()` (`ib/api.py:~1316`): role-based offset when `client_id` is the 6116 default (e.g. `+16` when `'datad' in tractor.current_actor().name`), optionally configurable via `brokers.toml` keys.
4. Docs/comment sweep: resolve `brokers/_daemon.py:62` TODO, `piker/cli/__init__.py:253` TODO, daemon list in `piker/service/__init__.py:21` docstring.
**Gate**: full suite; audit greps clean: `grep -rn maybe_spawn_brokerd piker/` hits only `clearing/_ems.py`, `service/__init__.py`, `brokers/_daemon.py`; no spawn-path `'brokerd'` literals left in `piker/data`, `piker/ui`, `piker/brokers/cli.py`.
---
## Risk register
| Risk | Sev | Mitigation |
|---|---|---|
| ib: `datad.ib` + `brokerd.ib` both connect to TWS/gw; `client_id=6116` collision burns `connect_timeout` retries | High (live ib only) | Stage 4 role-based id offset; both daemons inherit `infect_asyncio` via `_spawn_kwargs` copy in both init fns |
| `piker ledger` ad-hoc actor path silently broken by signature change | Med | `portal:` override kept on `open_brokerd_dialog` (Stage 2); explicit smoke test |
| datad-only backends (kucoin/deribit) → useless brokerd spawn | Low | already covered: `open_brokerd_dialog` forces `exec_mode='paper'` when no trades ep (`_ems.py:413-417`) BEFORE the lazy spawn; Stage 4 fail-fast |
| brokerd in-process symbology/ledger needs (ib `broker.py` imports `.symbols`; kraken uses `stream_messages`) | None | verified pure in-process imports; `enable_modules` gates RPC only |
| paper-mode test asserts brokerd spawns (`test_services.py:237`) | Low | Stage 3 deliberately inverts it |
| doubled per-broker clients (HTTP/WS, symcache loads) | Low | each actor already opens its own WS today; symcache is disk-read-mostly |
## Verification (per stage gates above, plus end-to-end)
- `pytest tests/test_services.py tests/test_feeds.py tests/test_ems.py` at every stage (kraken/binance public endpoints, no creds needed).
- Manual smoke matrix post-Stage-3: `pikerd -l info` + `piker chart btcusdt.spot.binance``piker services` shows `datad.binance`, no brokerd; submit paper order → `paperboi.binance` appears, still no brokerd; symbol search; `piker ledger <broker>.paper`.
- If ib creds/gateway available: live ib chart + small live order to exercise dual-actor client-id path (Stage 4).
## Critical files
- **new** `piker/data/_daemon.py` — all datad machinery
- `piker/brokers/_daemon.py` — slimmed brokerd fixture/init/spawn
- `piker/data/feed.py` — spawn cutover + actor-name assert
- `piker/clearing/_ems.py` — emsd lazy brokerd spawn (`open_brokerd_dialog`, `Router`)
- `piker/service/_actor_runtime.py`, `piker/service/__init__.py` — root mods + re-exports
- `piker/data/validate.py``get_eps()` helper
- backend `__init__.py`s (kraken/binance/deribit/ib) — `_datad_mods`/`_brokerd_mods`
- `piker/accounting/cli.py` — keyword-form dialog call
- `tests/test_services.py` — inverted + new assertions