.data: cut feed layer over to `datad` actors

The topology flip: all data-feed consumers now route to the
new `datad.<broker>` sibling daemon; `brokerd` becomes
trading-only and is ONLY ever booted lazily by `emsd`'s
`open_brokerd_dialog()` (see prior commit). Chart-only and
paper sessions run with zero (live, credentialed) `brokerd`
procs B)

Deats,
- `open_feed()` -> `maybe_spawn_datad()` (NB: imported
  relative-direct from `._daemon` to dodge a partial-init
  cycle via `piker.service`); flip the `open_feed_bus()`
  actor-name assert to `'datad'`; comment sweep.
- slim `_setup_persistent_brokerd()` to a trading-only
  fixture: console logging + pinned-open ctx; the feed-bus
  alloc moves to `_setup_persistent_datad()` and backend
  `open_trade_dialog()` ctxs own their own task trees.
  (the `piker ledger` ad-hoc actor enters this same slimmed
  fixture - exactly what it needs.)
- repoint data-flavoured spawn sites to `maybe_spawn_datad`:
  `.ui._app` symbol-search (+ rename
  `install_brokerd_search` -> `install_datad_search`),
  `.brokers.core.symbol_search()`, `.brokers.cli`
  `brokercheck`/`record`, legacy kivy `.ui.cli` +
  `option_chain`'s `wait_for_actor()`.
- invert `tests.test_services` expectations: feed/EMS-paper
  flows must spawn `datad.kraken` and `paperboi.kraken`
  with an explicit negative assert that NO `brokerd.kraken`
  service task exists.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Gud Boi 2026-06-09 17:22:44 -04:00
parent 3548893337
commit d7f1d70b61
8 changed files with 73 additions and 88 deletions

View File

@ -69,9 +69,15 @@ async def _setup_persistent_brokerd(
) -> None: ) -> None:
''' '''
Allocate a actor-wide service nursery in ``brokerd`` Trading-only daemon (lifetime) fixture: console logging
such that feeds can be run in the background persistently by setup and a pinned-open context for service mgmt.
the broker backend as needed.
All data-feed-bus state now lives in the (data-feed-only)
`datad.<brokername>` sibling daemon, see
`piker.data._daemon._setup_persistent_datad()`; this
actor hosts only the backend's `open_trade_dialog()`
(live order-control) ep-task(s) which manage their own
task trees per `tractor.Context`.
''' '''
# NOTE: we only need to setup logging once (and only) here # NOTE: we only need to setup logging once (and only) here
@ -87,46 +93,12 @@ async def _setup_persistent_brokerd(
) )
assert log.name == _util.subsys assert log.name == _util.subsys
# further, set the log level on any broker broker specific # unblock caller
# logger instance. await ctx.started()
from piker.data import feed # we pin this task to keep the daemon active until the
assert not feed._bus # parent actor decides to tear it down
await trio.sleep_forever()
# allocate a nursery to the bus for spawning background
# tasks to service client IPC requests, normally
# `tractor.Context` connections to explicitly required
# `brokerd` endpoints such as:
# - `stream_quotes()`,
# - `manage_history()`,
# - `allocate_persistent_feed()`,
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
try:
async with (
# tractor.trionics.collapse_eg(),
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,
)
assert bus is feed._bus
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
except eg.ExceptionGroup:
# TODO: likely some underlying `brokerd` IPC connection
# broke so here we handle a respawn and re-connect attempt!
# This likely should pair with development of the OCO task
# nusery in dev over @ `tractor` B)
# https://github.com/goodboy/tractor/pull/363
raise
def broker_init( def broker_init(

View File

@ -35,7 +35,7 @@ from piker.log import (
get_logger, get_logger,
) )
from ..service import ( from ..service import (
maybe_spawn_brokerd, maybe_spawn_datad,
maybe_open_pikerd, maybe_open_pikerd,
) )
from ..brokers import ( from ..brokers import (
@ -187,7 +187,7 @@ def brokercheck(config, broker):
''' '''
async def bcheck_main(): async def bcheck_main():
async with maybe_spawn_brokerd(broker) as portal: async with maybe_spawn_datad(broker) as portal:
await portal.run(run_test, broker) await portal.run(run_test, broker)
await portal.cancel_actor() await portal.cancel_actor()
@ -317,7 +317,7 @@ def record(config, rate, name, dhost, filename):
return return
async def main(tries): async def main(tries):
async with maybe_spawn_brokerd( async with maybe_spawn_datad(
tries=tries, loglevel=loglevel tries=tries, loglevel=loglevel
) as portal: ) as portal:
# run app "main" # run app "main"

View File

@ -30,7 +30,7 @@ import trio
from piker.log import get_logger from piker.log import get_logger
from . import get_brokermod from . import get_brokermod
from ..service import maybe_spawn_brokerd from ..service import maybe_spawn_datad
from . import open_cached_client from . import open_cached_client
from ..accounting import MktPair from ..accounting import MktPair
@ -172,7 +172,7 @@ async def symbol_search(
# await tractor.devx._debug.maybe_init_greenback() # await tractor.devx._debug.maybe_init_greenback()
# tractor.pause_from_sync() # tractor.pause_from_sync()
async with maybe_spawn_brokerd( async with maybe_spawn_datad(
mod.name, mod.name,
infect_asyncio=getattr( infect_asyncio=getattr(
mod, mod,

View File

@ -17,7 +17,7 @@
''' '''
Data feed apis and infra. Data feed apis and infra.
This module is enabled for ``brokerd`` daemons and includes mostly This module is enabled for ``datad`` daemons and includes mostly
endpoints and middleware to support our real-time, provider agnostic, endpoints and middleware to support our real-time, provider agnostic,
live market quotes layer. Historical data loading and processing is also live market quotes layer. Historical data loading and processing is also
initiated in parts of the feed bus startup but business logic and initiated in parts of the feed bus startup but business logic and
@ -54,8 +54,11 @@ from piker.accounting import (
) )
from piker.types import Struct from piker.types import Struct
from piker.brokers import get_brokermod from piker.brokers import get_brokermod
from piker.service import ( # NOTE: must be a "relative-direct" import (NOT via
maybe_spawn_brokerd, # `piker.service`) to avoid a partial-init cycle when this
# mod is loaded as part of `piker.service.__init__`.
from ._daemon import (
maybe_spawn_datad,
) )
from piker.calc import humanize from piker.calc import humanize
from ._util import ( from ._util import (
@ -110,7 +113,7 @@ class _FeedsBus(Struct):
''' '''
Data feeds broadcaster and persistence management. Data feeds broadcaster and persistence management.
This is a brokerd side api used to manager persistent real-time This is a datad side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely. A bus is streams that can be allocated and left alive indefinitely. A bus is
associated one-to-one with a particular broker backend where the associated one-to-one with a particular broker backend where the
"bus" refers so a multi-symbol bus where quotes are interleaved in "bus" refers so a multi-symbol bus where quotes are interleaved in
@ -249,7 +252,7 @@ async def allocate_persistent_feed(
''' '''
Create and maintain a "feed bus" which allocates tasks for real-time Create and maintain a "feed bus" which allocates tasks for real-time
streaming and optional historical data storage per broker/data provider streaming and optional historical data storage per broker/data provider
backend; this normally task runs *in* a `brokerd` actor. backend; this normally task runs *in* a `datad` actor.
If none exists, this allocates a ``_FeedsBus`` which manages the If none exists, this allocates a ``_FeedsBus`` which manages the
lifetimes of streaming tasks created for each requested symbol. lifetimes of streaming tasks created for each requested symbol.
@ -318,8 +321,8 @@ async def allocate_persistent_feed(
# at max capacity. # at max capacity.
# - the same ideas ^ but when a local core is maxxed out (like how # - the same ideas ^ but when a local core is maxxed out (like how
# binance does often with hft XD # binance does often with hft XD
# - if a brokerd is non-local then we can't just allocate a mem # - if a datad is non-local then we can't just allocate a mem
# channel here and have the brokerd write it, we instead need # channel here and have the datad write it, we instead need
# a small streaming machine around the remote feed which can then # a small streaming machine around the remote feed which can then
# do the normal work of sampling and writing shm buffers # do the normal work of sampling and writing shm buffers
# (depending on if we want sampling done on the far end or not?) # (depending on if we want sampling done on the far end or not?)
@ -499,7 +502,7 @@ async def open_feed_bus(
# (after we also group them in a nice `/dev/shm/piker/` subdir). # (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are # ensure we are who we think we are
servicename = tractor.current_actor().name servicename = tractor.current_actor().name
assert 'brokerd' in servicename assert 'datad' in servicename
assert brokername in servicename assert brokername in servicename
bus: _FeedsBus = get_feed_bus(brokername) bus: _FeedsBus = get_feed_bus(brokername)
@ -509,12 +512,12 @@ async def open_feed_bus(
for symbol in symbols: for symbol in symbols:
# if no cached feed for this symbol has been created for this # if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in # datad yet, start persistent stream and shm writer task in
# service nursery # service nursery
flume = bus.feeds.get(symbol) flume = bus.feeds.get(symbol)
if flume is None: if flume is None:
# allocate a new actor-local stream bus which # allocate a new actor-local stream bus which
# will persist for this `brokerd`'s service lifetime. # will persist for this `datad`'s service lifetime.
async with bus.task_lock: async with bus.task_lock:
await bus.nursery.start( await bus.nursery.start(
partial( partial(
@ -721,7 +724,7 @@ class Feed(Struct):
mods = {name: self.mods[name] for name in brokers} mods = {name: self.mods[name] for name in brokers}
if len(mods) == 1: if len(mods) == 1:
# just pass the brokerd stream directly if only one provider # just pass the datad stream directly if only one provider
# was detected. # was detected.
stream = self.streams[list(brokers)[0]] stream = self.streams[list(brokers)[0]]
async with stream.subscribe() as bstream: async with stream.subscribe() as bstream:
@ -763,7 +766,7 @@ class Feed(Struct):
@acm @acm
async def install_brokerd_search( async def install_datad_search(
portal: tractor.Portal, portal: tractor.Portal,
brokermod: ModuleType, brokermod: ModuleType,
@ -812,7 +815,7 @@ async def maybe_open_feed(
ReceiveChannel[dict[str, Any]], ReceiveChannel[dict[str, Any]],
): ):
''' '''
Maybe open a data to a ``brokerd`` daemon only if there is no Maybe open a data feed to a ``datad`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver. in a tractor broadcast receiver.
@ -885,14 +888,14 @@ async def open_feed(
providers.setdefault(mod, []).append(bs_fqme) providers.setdefault(mod, []).append(bs_fqme)
feed.mods[mod.name] = mod feed.mods[mod.name] = mod
# one actor per brokerd for now # one actor per datad for now
brokerd_ctxs = [] datad_ctxs = []
for brokermod, bfqmes in providers.items(): for brokermod, bfqmes in providers.items():
# if no `brokerd` for this backend exists yet we spawn # if no `datad` for this backend exists yet we spawn
# a daemon actor for it. # a daemon actor for it.
brokerd_ctxs.append( datad_ctxs.append(
maybe_spawn_brokerd( maybe_spawn_datad(
brokermod.name, brokermod.name,
loglevel=loglevel loglevel=loglevel
) )
@ -900,7 +903,7 @@ async def open_feed(
portals: tuple[tractor.Portal] portals: tuple[tractor.Portal]
async with trionics.gather_contexts( async with trionics.gather_contexts(
brokerd_ctxs, datad_ctxs,
) as portals: ) as portals:
bus_ctxs: list[AsyncContextManager] = [] bus_ctxs: list[AsyncContextManager] = []
@ -937,9 +940,9 @@ async def open_feed(
tick_throttle=tick_throttle, tick_throttle=tick_throttle,
# XXX: super important to avoid # XXX: super important to avoid
# the brokerd from some other # the datad from some other
# backend overruning the task here # backend overruning the task here
# bc some other brokerd took longer # bc some other datad took longer
# to startup before we hit the `.open_stream()` # to startup before we hit the `.open_stream()`
# loop below XD .. really we should try to do each # loop below XD .. really we should try to do each
# of these stream open sequences sequentially per # of these stream open sequences sequentially per
@ -1008,7 +1011,7 @@ async def open_feed(
assert stream assert stream
feed.streams[brokermod.name] = stream feed.streams[brokermod.name] = stream
# apply `brokerd`-common stream to each flume # apply `datad`-common stream to each flume
# tracking a live market feed from that provider. # tracking a live market feed from that provider.
for fqme, flume in feed.flumes.items(): for fqme, flume in feed.flumes.items():
if brokermod.name == flume.mkt.broker: if brokermod.name == flume.mkt.broker:

View File

@ -32,12 +32,12 @@ from . import _event
from . import _search from . import _search
from ..accounting import unpack_fqme from ..accounting import unpack_fqme
from ..data._symcache import open_symcache from ..data._symcache import open_symcache
from ..data.feed import install_brokerd_search from ..data.feed import install_datad_search
from ..log import ( from ..log import (
get_logger, get_logger,
get_console_log, get_console_log,
) )
from ..service import maybe_spawn_brokerd from ..service import maybe_spawn_datad
from ._exec import run_qtractor from ._exec import run_qtractor
log = get_logger(__name__) log = get_logger(__name__)
@ -50,16 +50,16 @@ async def load_provider_search(
) -> None: ) -> None:
name = brokermod.name name = brokermod.name
log.info(f'loading brokerd for {name}..') log.info(f'loading datad for {name}..')
async with ( async with (
maybe_spawn_brokerd( maybe_spawn_datad(
name, name,
loglevel=loglevel loglevel=loglevel
) as portal, ) as portal,
install_brokerd_search( install_datad_search(
portal, portal,
brokermod, brokermod,
), ),

View File

@ -27,7 +27,7 @@ from ..cli import (
load_trans_eps, load_trans_eps,
) )
from .. import watchlists as wl from .. import watchlists as wl
from ..service import maybe_spawn_brokerd from ..service import maybe_spawn_datad
_config_dir = click.get_app_dir('piker') _config_dir = click.get_app_dir('piker')
@ -69,7 +69,7 @@ def monitor(config, rate, name, dhost, test, tl):
from .kivy.monitor import _async_main from .kivy.monitor import _async_main
async def main(): async def main():
async with maybe_spawn_brokerd( async with maybe_spawn_datad(
brokername=brokermod.name, brokername=brokermod.name,
loglevel=loglevel loglevel=loglevel
) as portal: ) as portal:
@ -118,7 +118,7 @@ def optschain(
from .kivy.option_chain import _async_main from .kivy.option_chain import _async_main
async def main(): async def main():
async with maybe_spawn_brokerd( async with maybe_spawn_datad(
loglevel=loglevel loglevel=loglevel
): ):
# run app "main" # run app "main"

View File

@ -495,7 +495,7 @@ async def _async_main(
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
# get a portal to the data feed daemon # get a portal to the data feed daemon
async with tractor.wait_for_actor('brokerd') as portal: async with tractor.wait_for_actor('datad') as portal:
# set up a pager view for large ticker lists # set up a pager view for large ticker lists
chain = await new_chain_ui( chain = await new_chain_ui(

View File

@ -108,14 +108,14 @@ def test_ensure_datafeed_actors(
) -> None: ) -> None:
''' '''
Verify that booting a data feed starts a `brokerd` Verify that booting a data feed starts a `datad`
actor and a singleton global `samplerd` and opening actor and a singleton global `samplerd` and opening
an order mode in paper opens the `paperboi` service. an order mode in paper opens the `paperboi` service.
''' '''
actor_name: str = 'brokerd' actor_name: str = 'datad'
backend: str = 'kraken' backend: str = 'kraken'
brokerd_name: str = f'{actor_name}.{backend}' datad_name: str = f'{actor_name}.{backend}'
async def main(): async def main():
async with ( async with (
@ -130,7 +130,7 @@ def test_ensure_datafeed_actors(
await feed.pause() await feed.pause()
async with ( async with (
ensure_service(brokerd_name), ensure_service(datad_name),
ensure_service('samplerd'), ensure_service('samplerd'),
): ):
await trio.sleep(0.1) await trio.sleep(0.1)
@ -227,9 +227,9 @@ def test_ensure_ems_in_paper_actors(
) -> None: ) -> None:
actor_name: str = 'brokerd'
backend: str = 'kraken' backend: str = 'kraken'
brokerd_name: str = f'{actor_name}.{backend}' datad_name: str = f'datad.{backend}'
brokerd_name: str = f'brokerd.{backend}'
async def main(): async def main():
@ -273,15 +273,25 @@ def test_ensure_ems_in_paper_actors(
async with ( async with (
ensure_service('emsd'), ensure_service('emsd'),
ensure_service(brokerd_name), ensure_service(datad_name),
ensure_service(f'paperboi.{backend}'), ensure_service(f'paperboi.{backend}'),
): ):
for name in pikerd_subservices: for name in pikerd_subservices:
assert name in services.service_tasks assert name in services.service_tasks
# brokerd.kraken actor should have been started # datad.kraken actor should have been
# implicitly by the ems. # started implicitly by the feed layer.
assert brokerd_name in services.service_tasks assert datad_name in services.service_tasks
# XXX: paper-mode sessions should NEVER
# boot a (live, credentialed) `brokerd`;
# only emsd's `open_brokerd_dialog()`
# live-ep path is allowed to spawn it!
assert (
brokerd_name
not in
services.service_tasks
)
print('ALL SERVICES STARTED, cancelling runtime with:\n' print('ALL SERVICES STARTED, cancelling runtime with:\n'
f'-> {cancel_method}') f'-> {cancel_method}')