brokerd: slim RPC caps + `ib` client-id offset

Caps-sec tightening now that `brokerd` is trading-only: NO
`piker.data.*` (feed) mods are RPC-enabled in the (live,
credentialed) trading actor anymore.

Deats,
- drop `_data_mods` for a minimal `_brokerd_service_mods`
  (just `piker.brokers._daemon`); dedup-compose with the
  backend's set in `spawn_brokerd()`.
- `broker_init()` reads the backend's `_brokerd_mods`
  (fallback: `__enable_modules__` for flat backends).
- fail fast in `spawn_brokerd()` via `validate.get_eps()`
  when a backend offers NO live order-ctl eps (eg.
  `kucoin`, `deribit`) -> tells the caller to use
  paper-mode instead of booting a dead actor; analogous
  warning in `datad_init()` for datad-ep-less backends.
- offset `ib`'s default `client_id` per daemon-kind in
  `load_aio_clients()`: post-split BOTH `datad.ib` and
  `brokerd.ib` connect to the same gw/tws endpoint and the
  shared default (6116 + linear retry incrs) would collide
  and burn the full conn-timeout retry cycle; datad gets
  +16, ad-hoc (test/cli) actors +32.
- drop the import-cleanup leftovers (`exceptiongroup`,
  `_FeedsBus` type-only import) and the now-resolved
  "expose datad" TODO in `.cli`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
datad_service
Gud Boi 2026-06-09 17:23:04 -04:00
parent cefbc74671
commit 14e6a319ed
3 changed files with 55 additions and 22 deletions

View File

@ -25,10 +25,8 @@ from contextlib import (
) )
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
TYPE_CHECKING,
AsyncContextManager, AsyncContextManager,
) )
import exceptiongroup as eg
import tractor import tractor
import trio import trio
@ -40,27 +38,20 @@ from piker.log import (
from . import _util from . import _util
from . import get_brokermod from . import get_brokermod
if TYPE_CHECKING:
from ..data import _FeedsBus
log = get_logger(name=__name__) log = get_logger(name=__name__)
# `brokerd` enabled modules # `brokerd`-actor-always-enabled mods.
# TODO: move this def to the `.data` subpkg..
# NOTE: keeping this list as small as possible is part of our caps-sec # NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care! # model and should be treated with utmost care! In particular NO
_data_mods: str = [ # `piker.data.*` feed mods should be enabled in this (live,
'piker.brokers.core', # credentialed) trading actor; all data-feed serving is the
'piker.brokers.data', # domain of the `datad.<broker>` sibling daemon, see
# `piker.data._daemon._datad_service_mods`.
_brokerd_service_mods: list[str] = [
'piker.brokers._daemon', 'piker.brokers._daemon',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
] ]
# TODO: we should rename the daemon to datad prolly once we split up
# broker vs. data tasks into separate actors?
@tractor.context @tractor.context
async def _setup_persistent_brokerd( async def _setup_persistent_brokerd(
ctx: tractor.Context, ctx: tractor.Context,
@ -119,8 +110,10 @@ def broker_init(
This includes: This includes:
- load the appropriate <brokername>.py pkg module, - load the appropriate <brokername>.py pkg module,
- reads any declared `__enable_modules__: listr[str]` which will be - reads any declared `_brokerd_mods: list[str]` (falling
passed to `tractor.ActorNursery.start_actor(enabled_modules=<this>)` back to the full `__enable_modules__` set for
not-yet-split backends) which will be passed to
`tractor.ActorNursery.start_actor(enable_modules=)`
at actor start time, at actor start time,
- deliver a references to the daemon lifetime fixture, which - deliver a references to the daemon lifetime fixture, which
for now is always the `_setup_persistent_brokerd()` context defined for now is always the `_setup_persistent_brokerd()` context defined
@ -154,9 +147,15 @@ def broker_init(
modpath, modpath,
] ]
for submodname in getattr( for submodname in getattr(
brokermod,
'_brokerd_mods',
# fallback for (flat, less mature) backends which
# don't yet declare a daemon-kind mod split.
getattr(
brokermod, brokermod,
'__enable_modules__', '__enable_modules__',
[], [],
),
): ):
subpath: str = f'{modpath}.{submodname}' subpath: str = f'{modpath}.{submodname}'
enabled.append(subpath) enabled.append(subpath)
@ -184,6 +183,22 @@ async def spawn_brokerd(
f'backend: {brokername!r}' f'backend: {brokername!r}'
) )
# fail fast on (data-only) backends which don't offer
# ANY live order-control eps; the caller should instead
# be using paper-mode (and thus never spawning us)!
from ..data.validate import get_eps
brokerd_eps: dict = get_eps(
get_brokermod(brokername),
'brokerd',
)
if not brokerd_eps:
raise RuntimeError(
f'Backend {brokername!r} offers NO `brokerd` '
f'(live order-control) eps!?\n'
f'It is likely a datad-only provider, use '
f'paper-mode for clearing instead.\n'
)
( (
brokermode, brokermode,
tractor_kwargs, tractor_kwargs,
@ -205,7 +220,11 @@ async def spawn_brokerd(
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
portal = await Services.actor_n.start_actor( portal = await Services.actor_n.start_actor(
dname, dname,
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), enable_modules=list(dict.fromkeys(
_brokerd_service_mods
+
tractor_kwargs.pop('enable_modules')
)),
debug_mode=Services.debug_mode, debug_mode=Services.debug_mode,
**tractor_kwargs **tractor_kwargs
) )

View File

@ -1340,6 +1340,21 @@ async def load_aio_clients(
ib = None ib = None
client = None client = None
# XXX: post (brokerd vs. datad)-split BOTH per-broker
# daemons connect to the same API gw/tws endpoint(s); to
# avoid `clientId` collisions (and the long conn-timeout
# retry cycle they cause) we offset the data-daemon's
# default id-range to be disjoint from `brokerd.ib`'s
# (which also retries with `client_id + i` increments).
if client_id == 6116: # the default from above
aname: str = tractor.current_actor().name
if 'datad' in aname:
client_id += 16
# ad-hoc (test/cli) actors get their own range to
# avoid clashing with any live daemon-tree's conns.
elif 'brokerd' not in aname:
client_id += 32
# attempt to get connection info from config; if no .toml entry # attempt to get connection info from config; if no .toml entry
# exists, we try to load from a default localhost connection. # exists, we try to load from a default localhost connection.
localhost = '127.0.0.1' localhost = '127.0.0.1'

View File

@ -250,7 +250,6 @@ def cli(
# TODO: load endpoints from `conf::[network].pikerd` # TODO: load endpoints from `conf::[network].pikerd`
# - pikerd vs. regd, separate registry daemon? # - pikerd vs. regd, separate registry daemon?
# - expose datad vs. brokerd?
# - bind emsd with certain perms on public iface? # - bind emsd with certain perms on public iface?
regaddrs: list[tuple[str, int]] = regaddr or [( regaddrs: list[tuple[str, int]] = regaddr or [(
_default_registry_host, _default_registry_host,