Add `datad` daemon machinery to `.data`

First half of the `brokerd` split: a new per-provider
data-feed-only daemon-actor `datad.<broker>` to (soon) host
all `validate._eps['datad']` eps (live quotes, history
loading, symbology search) leaving `brokerd` for live order
ctl only. Purely additive; nothing routes through it yet.

Deats,
- new `piker.data._daemon` mod mirroring the
  `.brokers._daemon` conventions (and the `samplerd`
  sub-daemon precedent):
  - `_setup_persistent_datad()` lifetime fixture owning the
    actor-global `_FeedsBus` alloc.
  - `datad_init()` building `enable_modules` from the
    backend's `_datad_mods` (falling back to
    `__enable_modules__` for not-yet-split backends) and
    copying `_spawn_kwargs` (critical for `ib`'s
    `infect_asyncio`).
  - `spawn_datad()`/`maybe_spawn_datad()` wrapping
    `Services` + `maybe_spawn_daemon()`.
- add `piker.data._daemon` to `_root_modules` so `pikerd`
  can run `spawn_datad()` requests.
- re-export the spawn eps from `piker.service`.
- add `test_datad_spawn` verifying actor boot + service
  registration via `ensure_service('datad.kraken')`.

Note the `Services`-based impl style deliberately mirrors
`spawn_brokerd()` so the eventual `tractor.hilevel`
`ServiceMngr` port (see the `service_mng_to_tractor`
branch's d8c21d44 prep work) lands symmetrically on both.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Gud Boi 2026-06-09 17:21:59 -04:00
parent 66957ffdb0
commit 7de661c03e
4 changed files with 340 additions and 0 deletions

View File

@ -0,0 +1,300 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Data-daemon-actor "endpoint-hooks": the service task entry
points for `datad.<brokername>`, the per-provider real-time
and historical market-data feed daemon.
The (data vs. broker)d-split mirrors the ep-groupings in
`piker.data.validate._eps`: this daemon hosts all `'datad'`
eps (live quotes, history loading, symbology search) while
its `brokerd.<brokername>` sibling hosts only the live
order-control (and thus credentialed) `'brokerd'` eps.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from types import ModuleType
from typing import (
TYPE_CHECKING,
AsyncContextManager,
)
import tractor
import trio
from piker.log import (
get_logger,
get_console_log,
)
from . import _util
if TYPE_CHECKING:
from .feed import _FeedsBus
log = get_logger(name=__name__)
# `datad`-actor-always-enabled mods: the data-side successor
# to the old `piker.brokers._daemon._data_mods` set.
# NOTE: keeping this list as small as possible is part of
# our caps-sec model and should be treated with utmost care!
_datad_service_mods: list[str] = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data.feed',
'piker.data._sampling',
'piker.data._daemon',
]
@tractor.context
async def _setup_persistent_datad(
ctx: tractor.Context,
brokername: str,
loglevel: str|None = None,
) -> None:
'''
Allocate an actor-wide service nursery in this
`datad.<brokername>` actor such that data-feed tasks
(shm writer-samplers, history backfillers, symbology
loaders) can be run in the background persistently by
the provider backend as needed.
'''
# NOTE: we only need to setup logging once (and only)
# here since all hosted daemon tasks will reference
# this same log instance's (actor local) state and thus
# don't require any further (level) configuration on
# their own B)
actor: tractor.Actor = tractor.current_actor()
tll: str = actor.loglevel
log = get_console_log(
level=loglevel or tll,
name=f'{_util.subsys}.{brokername}',
with_tractor_log=bool(tll),
)
assert log.name == _util.subsys
from piker.data import feed
assert not feed._bus
# allocate a nursery to the bus for spawning background
# tasks which service client IPC requests, normally
# `tractor.Context` connections to explicitly required
# `datad` endpoints such as:
# - `stream_quotes()`,
# - `manage_history()`,
# - `allocate_persistent_feed()`,
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
async with (
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,
)
assert bus is feed._bus
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active
# until the parent actor decides to tear it down
await trio.sleep_forever()
def datad_init(
brokername: str,
loglevel: str|None = None,
**start_actor_kwargs,
) -> tuple[
ModuleType,
dict,
AsyncContextManager,
]:
'''
Given an input broker name, load all named arguments
which can be passed for daemon endpoint + context spawn
as required in every `datad.<brokername>` (actor)
service.
This includes:
- load the appropriate <brokername>.py pkg module,
- reads any declared `_datad_mods: list[str]` (falling
back to the full `__enable_modules__` set for
not-yet-split backends) which will be passed to
`tractor.ActorNursery.start_actor(enable_modules=)`
at actor start time,
- deliver a reference to the daemon lifetime fixture,
which for now is always the
`_setup_persistent_datad()` context defined above.
'''
from piker.brokers import get_brokermod
from .validate import get_eps
brokermod = get_brokermod(brokername)
modpath: str = brokermod.__name__
# warn (but don't bail) when the backend is missing
# some/all of the `datad` ep contract defined by
# `piker.data.validate._eps`.
datad_eps: dict = get_eps(brokermod, 'datad')
if not datad_eps:
log.warning(
f'Backend {brokername!r} offers NO `datad` '
f'(data-feed) eps!?\n'
f'Most feed/chart functionality will be '
f'broken for this provider..\n'
)
start_actor_kwargs['name'] = f'datad.{brokername}'
# XXX CRITICAL: include any backend-declared spawn
# kwargs, eg. `{'infect_asyncio': True}` required by
# `ib`'s embedded `asyncio`-mode `ib_async` usage!
start_actor_kwargs.update(
getattr(
brokermod,
'_spawn_kwargs',
{},
)
)
# lookup actor-enabled modules declared by the backend
# offering the `datad` endpoint(s).
enabled: list[str]
enabled = start_actor_kwargs['enable_modules'] = [
__name__, # so that eps from THIS mod can be invoked
modpath,
]
for submodname in getattr(
brokermod,
'_datad_mods',
# fallback for (flat, less mature) backends which
# don't yet declare a daemon-kind mod split.
getattr(
brokermod,
'__enable_modules__',
[],
),
):
subpath: str = f'{modpath}.{submodname}'
enabled.append(subpath)
return (
brokermod,
start_actor_kwargs, # to `ActorNursery.start_actor()`
# XXX see impl above; contains all (actor global)
# setup/teardown expected in all `datad` actor
# instances.
_setup_persistent_datad,
)
async def spawn_datad(
brokername: str,
loglevel: str|None = None,
**tractor_kwargs,
) -> bool:
log.info(
f'Spawning data-daemon,\n'
f'backend: {brokername!r}'
)
(
brokermod,
tractor_kwargs,
daemon_fixture_ep,
) = datad_init(
brokername,
loglevel,
**tractor_kwargs,
)
# ask `pikerd` to spawn a new sub-actor and manage it
# under its actor nursery
from piker.service import Services
dname: str = tractor_kwargs.pop('name') # f'datad.{brokername}'
enable_mods: list[str] = list(dict.fromkeys(
_datad_service_mods
+
tractor_kwargs.pop('enable_modules')
))
portal = await Services.actor_n.start_actor(
dname,
enable_modules=enable_mods,
debug_mode=Services.debug_mode,
**tractor_kwargs,
)
# NOTE: the service mngr expects an already spawned
# actor + its portal ref in order to do non-blocking
# setup of the `datad` service nursery.
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
daemon_fixture_ep,
brokername=brokername,
loglevel=loglevel,
)
return True
@acm
async def maybe_spawn_datad(
brokername: str,
loglevel: str|None = None,
**pikerd_kwargs,
) -> tractor.Portal:
'''
Helper to spawn a datad service *from* a client who
wishes to use the sub-actor-daemon but is fine with
re-using any existing and contactable `datad`.
Mas o menos, acts as a cached-actor-getter factory.
'''
from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon(
service_name=f'datad.{brokername}',
service_task_target=spawn_datad,
spawn_args={
'brokername': brokername,
},
loglevel=loglevel,
**pikerd_kwargs,
) as portal:
yield portal

View File

@ -56,3 +56,7 @@ from ..brokers._daemon import (
spawn_brokerd as spawn_brokerd,
maybe_spawn_brokerd as maybe_spawn_brokerd,
)
from ..data._daemon import (
spawn_datad as spawn_datad,
maybe_spawn_datad as maybe_spawn_datad,
)

View File

@ -157,6 +157,7 @@ _root_modules: list[str] = [
__name__,
'piker.service._daemon',
'piker.brokers._daemon',
'piker.data._daemon',
'piker.clearing._ems',
'piker.clearing._client',

View File

@ -66,6 +66,41 @@ def test_runtime_boot(
trio.run(main)
def test_datad_spawn(
open_test_pikerd: AsyncContextManager,
loglevel: str,
) -> None:
'''
Verify the new (data-feed-only) `datad.<broker>` daemon
can be spawned/registered as a `pikerd` sub-service via
the `maybe_spawn_datad()` factory.
'''
from piker.service import maybe_spawn_datad
backend: str = 'kraken'
datad_name: str = f'datad.{backend}'
async def main():
async with (
open_test_pikerd() as (_, _, _, services),
maybe_spawn_datad(
backend,
loglevel=loglevel,
) as portal,
):
assert portal
async with ensure_service(datad_name):
assert (
datad_name in services.service_tasks
)
trio.run(main)
def test_ensure_datafeed_actors(
open_test_pikerd: AsyncContextManager,
loglevel: str,