From 7de661c03e54b40eac7087cc6e6252079046af6f Mon Sep 17 00:00:00 2001 From: goodboy Date: Tue, 9 Jun 2026 17:21:59 -0400 Subject: [PATCH] Add `datad` daemon machinery to `.data` First half of the `brokerd` split: a new per-provider data-feed-only daemon-actor `datad.` to (soon) host all `validate._eps['datad']` eps (live quotes, history loading, symbology search) leaving `brokerd` for live order ctl only. Purely additive; nothing routes through it yet. Deats, - new `piker.data._daemon` mod mirroring the `.brokers._daemon` conventions (and the `samplerd` sub-daemon precedent): - `_setup_persistent_datad()` lifetime fixture owning the actor-global `_FeedsBus` alloc. - `datad_init()` building `enable_modules` from the backend's `_datad_mods` (falling back to `__enable_modules__` for not-yet-split backends) and copying `_spawn_kwargs` (critical for `ib`'s `infect_asyncio`). - `spawn_datad()`/`maybe_spawn_datad()` wrapping `Services` + `maybe_spawn_daemon()`. - add `piker.data._daemon` to `_root_modules` so `pikerd` can run `spawn_datad()` requests. - re-export the spawn eps from `piker.service`. - add `test_datad_spawn` verifying actor boot + service registration via `ensure_service('datad.kraken')`. Note the `Services`-based impl style deliberately mirrors `spawn_brokerd()` so the eventual `tractor.hilevel` `ServiceMngr` port (see the `service_mng_to_tractor` branch's d8c21d44 prep work) lands symmetrically on both. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code Co-Authored-By: Claude Fable 5 --- piker/data/_daemon.py | 300 ++++++++++++++++++++++++++++++++ piker/service/__init__.py | 4 + piker/service/_actor_runtime.py | 1 + tests/test_services.py | 35 ++++ 4 files changed, 340 insertions(+) create mode 100644 piker/data/_daemon.py diff --git a/piker/data/_daemon.py b/piker/data/_daemon.py new file mode 100644 index 00000000..663d7102 --- /dev/null +++ b/piker/data/_daemon.py @@ -0,0 +1,300 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Data-daemon-actor "endpoint-hooks": the service task entry +points for `datad.`, the per-provider real-time +and historical market-data feed daemon. + +The (data vs. broker)d-split mirrors the ep-groupings in +`piker.data.validate._eps`: this daemon hosts all `'datad'` +eps (live quotes, history loading, symbology search) while +its `brokerd.` sibling hosts only the live +order-control (and thus credentialed) `'brokerd'` eps. + +''' +from __future__ import annotations +from contextlib import ( + asynccontextmanager as acm, +) +from types import ModuleType +from typing import ( + TYPE_CHECKING, + AsyncContextManager, +) + +import tractor +import trio + +from piker.log import ( + get_logger, + get_console_log, +) +from . import _util + +if TYPE_CHECKING: + from .feed import _FeedsBus + +log = get_logger(name=__name__) + +# `datad`-actor-always-enabled mods: the data-side successor +# to the old `piker.brokers._daemon._data_mods` set. +# NOTE: keeping this list as small as possible is part of +# our caps-sec model and should be treated with utmost care! +_datad_service_mods: list[str] = [ + 'piker.brokers.core', + 'piker.brokers.data', + 'piker.data', + 'piker.data.feed', + 'piker.data._sampling', + 'piker.data._daemon', +] + + +@tractor.context +async def _setup_persistent_datad( + ctx: tractor.Context, + brokername: str, + loglevel: str|None = None, + +) -> None: + ''' + Allocate an actor-wide service nursery in this + `datad.` actor such that data-feed tasks + (shm writer-samplers, history backfillers, symbology + loaders) can be run in the background persistently by + the provider backend as needed. + + ''' + # NOTE: we only need to setup logging once (and only) + # here since all hosted daemon tasks will reference + # this same log instance's (actor local) state and thus + # don't require any further (level) configuration on + # their own B) + actor: tractor.Actor = tractor.current_actor() + tll: str = actor.loglevel + log = get_console_log( + level=loglevel or tll, + name=f'{_util.subsys}.{brokername}', + with_tractor_log=bool(tll), + ) + assert log.name == _util.subsys + + from piker.data import feed + assert not feed._bus + + # allocate a nursery to the bus for spawning background + # tasks which service client IPC requests, normally + # `tractor.Context` connections to explicitly required + # `datad` endpoints such as: + # - `stream_quotes()`, + # - `manage_history()`, + # - `allocate_persistent_feed()`, + # - `open_symbol_search()` + # NOTE: see ep invocation details inside `.data.feed`. + async with ( + trio.open_nursery() as service_nursery + ): + bus: _FeedsBus = feed.get_feed_bus( + brokername, + service_nursery, + ) + assert bus is feed._bus + + # unblock caller + await ctx.started() + + # we pin this task to keep the feeds manager active + # until the parent actor decides to tear it down + await trio.sleep_forever() + + +def datad_init( + brokername: str, + loglevel: str|None = None, + + **start_actor_kwargs, + +) -> tuple[ + ModuleType, + dict, + AsyncContextManager, +]: + ''' + Given an input broker name, load all named arguments + which can be passed for daemon endpoint + context spawn + as required in every `datad.` (actor) + service. + + This includes: + - load the appropriate .py pkg module, + - reads any declared `_datad_mods: list[str]` (falling + back to the full `__enable_modules__` set for + not-yet-split backends) which will be passed to + `tractor.ActorNursery.start_actor(enable_modules=)` + at actor start time, + - deliver a reference to the daemon lifetime fixture, + which for now is always the + `_setup_persistent_datad()` context defined above. + + ''' + from piker.brokers import get_brokermod + from .validate import get_eps + brokermod = get_brokermod(brokername) + modpath: str = brokermod.__name__ + + # warn (but don't bail) when the backend is missing + # some/all of the `datad` ep contract defined by + # `piker.data.validate._eps`. + datad_eps: dict = get_eps(brokermod, 'datad') + if not datad_eps: + log.warning( + f'Backend {brokername!r} offers NO `datad` ' + f'(data-feed) eps!?\n' + f'Most feed/chart functionality will be ' + f'broken for this provider..\n' + ) + + start_actor_kwargs['name'] = f'datad.{brokername}' + + # XXX CRITICAL: include any backend-declared spawn + # kwargs, eg. `{'infect_asyncio': True}` required by + # `ib`'s embedded `asyncio`-mode `ib_async` usage! + start_actor_kwargs.update( + getattr( + brokermod, + '_spawn_kwargs', + {}, + ) + ) + + # lookup actor-enabled modules declared by the backend + # offering the `datad` endpoint(s). + enabled: list[str] + enabled = start_actor_kwargs['enable_modules'] = [ + __name__, # so that eps from THIS mod can be invoked + modpath, + ] + for submodname in getattr( + brokermod, + '_datad_mods', + # fallback for (flat, less mature) backends which + # don't yet declare a daemon-kind mod split. + getattr( + brokermod, + '__enable_modules__', + [], + ), + ): + subpath: str = f'{modpath}.{submodname}' + enabled.append(subpath) + + return ( + brokermod, + start_actor_kwargs, # to `ActorNursery.start_actor()` + + # XXX see impl above; contains all (actor global) + # setup/teardown expected in all `datad` actor + # instances. + _setup_persistent_datad, + ) + + +async def spawn_datad( + brokername: str, + loglevel: str|None = None, + + **tractor_kwargs, + +) -> bool: + + log.info( + f'Spawning data-daemon,\n' + f'backend: {brokername!r}' + ) + + ( + brokermod, + tractor_kwargs, + daemon_fixture_ep, + ) = datad_init( + brokername, + loglevel, + **tractor_kwargs, + ) + + # ask `pikerd` to spawn a new sub-actor and manage it + # under its actor nursery + from piker.service import Services + + dname: str = tractor_kwargs.pop('name') # f'datad.{brokername}' + enable_mods: list[str] = list(dict.fromkeys( + _datad_service_mods + + + tractor_kwargs.pop('enable_modules') + )) + portal = await Services.actor_n.start_actor( + dname, + enable_modules=enable_mods, + debug_mode=Services.debug_mode, + **tractor_kwargs, + ) + + # NOTE: the service mngr expects an already spawned + # actor + its portal ref in order to do non-blocking + # setup of the `datad` service nursery. + await Services.start_service_task( + dname, + portal, + + # signature of target root-task endpoint + daemon_fixture_ep, + brokername=brokername, + loglevel=loglevel, + ) + return True + + +@acm +async def maybe_spawn_datad( + brokername: str, + loglevel: str|None = None, + + **pikerd_kwargs, + +) -> tractor.Portal: + ''' + Helper to spawn a datad service *from* a client who + wishes to use the sub-actor-daemon but is fine with + re-using any existing and contactable `datad`. + + Mas o menos, acts as a cached-actor-getter factory. + + ''' + from piker.service import maybe_spawn_daemon + + async with maybe_spawn_daemon( + service_name=f'datad.{brokername}', + service_task_target=spawn_datad, + spawn_args={ + 'brokername': brokername, + }, + loglevel=loglevel, + + **pikerd_kwargs, + + ) as portal: + yield portal diff --git a/piker/service/__init__.py b/piker/service/__init__.py index 29360620..6fe5a5c8 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -56,3 +56,7 @@ from ..brokers._daemon import ( spawn_brokerd as spawn_brokerd, maybe_spawn_brokerd as maybe_spawn_brokerd, ) +from ..data._daemon import ( + spawn_datad as spawn_datad, + maybe_spawn_datad as maybe_spawn_datad, +) diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index b0f180e4..6c09a9e7 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -157,6 +157,7 @@ _root_modules: list[str] = [ __name__, 'piker.service._daemon', 'piker.brokers._daemon', + 'piker.data._daemon', 'piker.clearing._ems', 'piker.clearing._client', diff --git a/tests/test_services.py b/tests/test_services.py index 69771c09..abbe6c6b 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -66,6 +66,41 @@ def test_runtime_boot( trio.run(main) +def test_datad_spawn( + open_test_pikerd: AsyncContextManager, + loglevel: str, + +) -> None: + ''' + Verify the new (data-feed-only) `datad.` daemon + can be spawned/registered as a `pikerd` sub-service via + the `maybe_spawn_datad()` factory. + + ''' + from piker.service import maybe_spawn_datad + + backend: str = 'kraken' + datad_name: str = f'datad.{backend}' + + async def main(): + async with ( + open_test_pikerd() as (_, _, _, services), + + maybe_spawn_datad( + backend, + loglevel=loglevel, + ) as portal, + ): + assert portal + + async with ensure_service(datad_name): + assert ( + datad_name in services.service_tasks + ) + + trio.run(main) + + def test_ensure_datafeed_actors( open_test_pikerd: AsyncContextManager, loglevel: str,