.data: cut feed layer over to `datad` actors

The topology flip: all data-feed consumers now route to the
new `datad.<broker>` sibling daemon; `brokerd` becomes
trading-only and is ONLY ever booted lazily by `emsd`'s
`open_brokerd_dialog()` (see prior commit). Chart-only and
paper sessions run with zero (live, credentialed) `brokerd`
procs B)

Deats,
- `open_feed()` -> `maybe_spawn_datad()` (NB: imported
  relative-direct from `._daemon` to dodge a partial-init
  cycle via `piker.service`); flip the `open_feed_bus()`
  actor-name assert to `'datad'`; comment sweep.
- slim `_setup_persistent_brokerd()` to a trading-only
  fixture: console logging + pinned-open ctx; the feed-bus
  alloc moves to `_setup_persistent_datad()` and backend
  `open_trade_dialog()` ctxs own their own task trees.
  (the `piker ledger` ad-hoc actor enters this same slimmed
  fixture - exactly what it needs.)
- repoint data-flavoured spawn sites to `maybe_spawn_datad`:
  `.ui._app` symbol-search (+ rename
  `install_brokerd_search` -> `install_datad_search`),
  `.brokers.core.symbol_search()`, `.brokers.cli`
  `brokercheck`/`record`, legacy kivy `.ui.cli` +
  `option_chain`'s `wait_for_actor()`.
- invert `tests.test_services` expectations: feed/EMS-paper
  flows must spawn `datad.kraken` and `paperboi.kraken`
  with an explicit negative assert that NO `brokerd.kraken`
  service task exists.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
datad_service
Gud Boi 2026-06-09 17:22:44 -04:00
parent 233a53ea48
commit cefbc74671
8 changed files with 73 additions and 88 deletions

View File

@ -69,9 +69,15 @@ async def _setup_persistent_brokerd(
) -> None:
'''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
Trading-only daemon (lifetime) fixture: console logging
setup and a pinned-open context for service mgmt.
All data-feed-bus state now lives in the (data-feed-only)
`datad.<brokername>` sibling daemon, see
`piker.data._daemon._setup_persistent_datad()`; this
actor hosts only the backend's `open_trade_dialog()`
(live order-control) ep-task(s) which manage their own
task trees per `tractor.Context`.
'''
# NOTE: we only need to setup logging once (and only) here
@ -87,47 +93,13 @@ async def _setup_persistent_brokerd(
)
assert log.name == _util.subsys
# further, set the log level on any broker broker specific
# logger instance.
from piker.data import feed
assert not feed._bus
# allocate a nursery to the bus for spawning background
# tasks to service client IPC requests, normally
# `tractor.Context` connections to explicitly required
# `brokerd` endpoints such as:
# - `stream_quotes()`,
# - `manage_history()`,
# - `allocate_persistent_feed()`,
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
try:
async with (
# tractor.trionics.collapse_eg(),
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,
)
assert bus is feed._bus
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active until the
# we pin this task to keep the daemon active until the
# parent actor decides to tear it down
await trio.sleep_forever()
except eg.ExceptionGroup:
# TODO: likely some underlying `brokerd` IPC connection
# broke so here we handle a respawn and re-connect attempt!
# This likely should pair with development of the OCO task
# nusery in dev over @ `tractor` B)
# https://github.com/goodboy/tractor/pull/363
raise
def broker_init(
brokername: str,

View File

@ -35,7 +35,7 @@ from piker.log import (
get_logger,
)
from ..service import (
maybe_spawn_brokerd,
maybe_spawn_datad,
maybe_open_pikerd,
)
from ..brokers import (
@ -187,7 +187,7 @@ def brokercheck(config, broker):
'''
async def bcheck_main():
async with maybe_spawn_brokerd(broker) as portal:
async with maybe_spawn_datad(broker) as portal:
await portal.run(run_test, broker)
await portal.cancel_actor()
@ -317,7 +317,7 @@ def record(config, rate, name, dhost, filename):
return
async def main(tries):
async with maybe_spawn_brokerd(
async with maybe_spawn_datad(
tries=tries, loglevel=loglevel
) as portal:
# run app "main"

View File

@ -30,7 +30,7 @@ import trio
from piker.log import get_logger
from . import get_brokermod
from ..service import maybe_spawn_brokerd
from ..service import maybe_spawn_datad
from . import open_cached_client
from ..accounting import MktPair
@ -172,7 +172,7 @@ async def symbol_search(
# await tractor.devx._debug.maybe_init_greenback()
# tractor.pause_from_sync()
async with maybe_spawn_brokerd(
async with maybe_spawn_datad(
mod.name,
infect_asyncio=getattr(
mod,

View File

@ -17,7 +17,7 @@
'''
Data feed apis and infra.
This module is enabled for ``brokerd`` daemons and includes mostly
This module is enabled for ``datad`` daemons and includes mostly
endpoints and middleware to support our real-time, provider agnostic,
live market quotes layer. Historical data loading and processing is also
initiated in parts of the feed bus startup but business logic and
@ -54,8 +54,11 @@ from piker.accounting import (
)
from piker.types import Struct
from piker.brokers import get_brokermod
from piker.service import (
maybe_spawn_brokerd,
# NOTE: must be a "relative-direct" import (NOT via
# `piker.service`) to avoid a partial-init cycle when this
# mod is loaded as part of `piker.service.__init__`.
from ._daemon import (
maybe_spawn_datad,
)
from piker.calc import humanize
from ._util import (
@ -110,7 +113,7 @@ class _FeedsBus(Struct):
'''
Data feeds broadcaster and persistence management.
This is a brokerd side api used to manager persistent real-time
This is a datad side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely. A bus is
associated one-to-one with a particular broker backend where the
"bus" refers so a multi-symbol bus where quotes are interleaved in
@ -249,7 +252,7 @@ async def allocate_persistent_feed(
'''
Create and maintain a "feed bus" which allocates tasks for real-time
streaming and optional historical data storage per broker/data provider
backend; this normally task runs *in* a `brokerd` actor.
backend; this normally task runs *in* a `datad` actor.
If none exists, this allocates a ``_FeedsBus`` which manages the
lifetimes of streaming tasks created for each requested symbol.
@ -318,8 +321,8 @@ async def allocate_persistent_feed(
# at max capacity.
# - the same ideas ^ but when a local core is maxxed out (like how
# binance does often with hft XD
# - if a brokerd is non-local then we can't just allocate a mem
# channel here and have the brokerd write it, we instead need
# - if a datad is non-local then we can't just allocate a mem
# channel here and have the datad write it, we instead need
# a small streaming machine around the remote feed which can then
# do the normal work of sampling and writing shm buffers
# (depending on if we want sampling done on the far end or not?)
@ -499,7 +502,7 @@ async def open_feed_bus(
# (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are
servicename = tractor.current_actor().name
assert 'brokerd' in servicename
assert 'datad' in servicename
assert brokername in servicename
bus: _FeedsBus = get_feed_bus(brokername)
@ -509,12 +512,12 @@ async def open_feed_bus(
for symbol in symbols:
# if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in
# datad yet, start persistent stream and shm writer task in
# service nursery
flume = bus.feeds.get(symbol)
if flume is None:
# allocate a new actor-local stream bus which
# will persist for this `brokerd`'s service lifetime.
# will persist for this `datad`'s service lifetime.
async with bus.task_lock:
await bus.nursery.start(
partial(
@ -721,7 +724,7 @@ class Feed(Struct):
mods = {name: self.mods[name] for name in brokers}
if len(mods) == 1:
# just pass the brokerd stream directly if only one provider
# just pass the datad stream directly if only one provider
# was detected.
stream = self.streams[list(brokers)[0]]
async with stream.subscribe() as bstream:
@ -763,7 +766,7 @@ class Feed(Struct):
@acm
async def install_brokerd_search(
async def install_datad_search(
portal: tractor.Portal,
brokermod: ModuleType,
@ -812,7 +815,7 @@ async def maybe_open_feed(
ReceiveChannel[dict[str, Any]],
):
'''
Maybe open a data to a ``brokerd`` daemon only if there is no
Maybe open a data feed to a ``datad`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
@ -885,14 +888,14 @@ async def open_feed(
providers.setdefault(mod, []).append(bs_fqme)
feed.mods[mod.name] = mod
# one actor per brokerd for now
brokerd_ctxs = []
# one actor per datad for now
datad_ctxs = []
for brokermod, bfqmes in providers.items():
# if no `brokerd` for this backend exists yet we spawn
# if no `datad` for this backend exists yet we spawn
# a daemon actor for it.
brokerd_ctxs.append(
maybe_spawn_brokerd(
datad_ctxs.append(
maybe_spawn_datad(
brokermod.name,
loglevel=loglevel
)
@ -900,7 +903,7 @@ async def open_feed(
portals: tuple[tractor.Portal]
async with trionics.gather_contexts(
brokerd_ctxs,
datad_ctxs,
) as portals:
bus_ctxs: list[AsyncContextManager] = []
@ -937,9 +940,9 @@ async def open_feed(
tick_throttle=tick_throttle,
# XXX: super important to avoid
# the brokerd from some other
# the datad from some other
# backend overruning the task here
# bc some other brokerd took longer
# bc some other datad took longer
# to startup before we hit the `.open_stream()`
# loop below XD .. really we should try to do each
# of these stream open sequences sequentially per
@ -1008,7 +1011,7 @@ async def open_feed(
assert stream
feed.streams[brokermod.name] = stream
# apply `brokerd`-common stream to each flume
# apply `datad`-common stream to each flume
# tracking a live market feed from that provider.
for fqme, flume in feed.flumes.items():
if brokermod.name == flume.mkt.broker:

View File

@ -32,12 +32,12 @@ from . import _event
from . import _search
from ..accounting import unpack_fqme
from ..data._symcache import open_symcache
from ..data.feed import install_brokerd_search
from ..data.feed import install_datad_search
from ..log import (
get_logger,
get_console_log,
)
from ..service import maybe_spawn_brokerd
from ..service import maybe_spawn_datad
from ._exec import run_qtractor
log = get_logger(__name__)
@ -50,16 +50,16 @@ async def load_provider_search(
) -> None:
name = brokermod.name
log.info(f'loading brokerd for {name}..')
log.info(f'loading datad for {name}..')
async with (
maybe_spawn_brokerd(
maybe_spawn_datad(
name,
loglevel=loglevel
) as portal,
install_brokerd_search(
install_datad_search(
portal,
brokermod,
),

View File

@ -27,7 +27,7 @@ from ..cli import (
load_trans_eps,
)
from .. import watchlists as wl
from ..service import maybe_spawn_brokerd
from ..service import maybe_spawn_datad
_config_dir = click.get_app_dir('piker')
@ -69,7 +69,7 @@ def monitor(config, rate, name, dhost, test, tl):
from .kivy.monitor import _async_main
async def main():
async with maybe_spawn_brokerd(
async with maybe_spawn_datad(
brokername=brokermod.name,
loglevel=loglevel
) as portal:
@ -118,7 +118,7 @@ def optschain(
from .kivy.option_chain import _async_main
async def main():
async with maybe_spawn_brokerd(
async with maybe_spawn_datad(
loglevel=loglevel
):
# run app "main"

View File

@ -495,7 +495,7 @@ async def _async_main(
async with trio.open_nursery() as nursery:
# get a portal to the data feed daemon
async with tractor.wait_for_actor('brokerd') as portal:
async with tractor.wait_for_actor('datad') as portal:
# set up a pager view for large ticker lists
chain = await new_chain_ui(

View File

@ -108,14 +108,14 @@ def test_ensure_datafeed_actors(
) -> None:
'''
Verify that booting a data feed starts a `brokerd`
Verify that booting a data feed starts a `datad`
actor and a singleton global `samplerd` and opening
an order mode in paper opens the `paperboi` service.
'''
actor_name: str = 'brokerd'
actor_name: str = 'datad'
backend: str = 'kraken'
brokerd_name: str = f'{actor_name}.{backend}'
datad_name: str = f'{actor_name}.{backend}'
async def main():
async with (
@ -130,7 +130,7 @@ def test_ensure_datafeed_actors(
await feed.pause()
async with (
ensure_service(brokerd_name),
ensure_service(datad_name),
ensure_service('samplerd'),
):
await trio.sleep(0.1)
@ -227,9 +227,9 @@ def test_ensure_ems_in_paper_actors(
) -> None:
actor_name: str = 'brokerd'
backend: str = 'kraken'
brokerd_name: str = f'{actor_name}.{backend}'
datad_name: str = f'datad.{backend}'
brokerd_name: str = f'brokerd.{backend}'
async def main():
@ -273,15 +273,25 @@ def test_ensure_ems_in_paper_actors(
async with (
ensure_service('emsd'),
ensure_service(brokerd_name),
ensure_service(datad_name),
ensure_service(f'paperboi.{backend}'),
):
for name in pikerd_subservices:
assert name in services.service_tasks
# brokerd.kraken actor should have been started
# implicitly by the ems.
assert brokerd_name in services.service_tasks
# datad.kraken actor should have been
# started implicitly by the feed layer.
assert datad_name in services.service_tasks
# XXX: paper-mode sessions should NEVER
# boot a (live, credentialed) `brokerd`;
# only emsd's `open_brokerd_dialog()`
# live-ep path is allowed to spawn it!
assert (
brokerd_name
not in
services.service_tasks
)
print('ALL SERVICES STARTED, cancelling runtime with:\n'
f'-> {cancel_method}')