Add `datad` daemon machinery to `.data`
First half of the `brokerd` split: a new per-provider
data-feed-only daemon-actor `datad.<broker>` to (soon) host
all `validate._eps['datad']` eps (live quotes, history
loading, symbology search) leaving `brokerd` for live order
ctl only. Purely additive; nothing routes through it yet.
Deats,
- new `piker.data._daemon` mod mirroring the
`.brokers._daemon` conventions (and the `samplerd`
sub-daemon precedent):
- `_setup_persistent_datad()` lifetime fixture owning the
actor-global `_FeedsBus` alloc.
- `datad_init()` building `enable_modules` from the
backend's `_datad_mods` (falling back to
`__enable_modules__` for not-yet-split backends) and
copying `_spawn_kwargs` (critical for `ib`'s
`infect_asyncio`).
- `spawn_datad()`/`maybe_spawn_datad()` wrapping
`Services` + `maybe_spawn_daemon()`.
- add `piker.data._daemon` to `_root_modules` so `pikerd`
can run `spawn_datad()` requests.
- re-export the spawn eps from `piker.service`.
- add `test_datad_spawn` verifying actor boot + service
registration via `ensure_service('datad.kraken')`.
Note the `Services`-based impl style deliberately mirrors
`spawn_brokerd()` so the eventual `tractor.hilevel`
`ServiceMngr` port (see the `service_mng_to_tractor`
branch's d8c21d44 prep work) lands symmetrically on both.
(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
Prompt-IO: ai/prompt-io/claude/20260610T171142Z_119d2c04_prompt_io.md
datad_service
parent
119d2c0495
commit
6418121923
|
|
@ -0,0 +1,47 @@
|
|||
---
|
||||
model: claude-fable-5[1m]
|
||||
service: claude
|
||||
session: 32d15f9a-b2d3-4c26-bdc9-190219141a25
|
||||
timestamp: 2026-06-10T17:11:42Z
|
||||
git_ref: datad_service
|
||||
diff_cmd: git log -1 -p --follow -- ai/prompt-io/claude/20260610T171142Z_119d2c04_prompt_io.md
|
||||
scope: code
|
||||
substantive: true
|
||||
raw_file: 20260610T171142Z_119d2c04_prompt_io.raw.md
|
||||
---
|
||||
|
||||
## Prompt
|
||||
|
||||
Same session-initiating `brokerd`-split instruction (see
|
||||
`20260610T170859Z_75cefe10_prompt_io.md`); this is the
|
||||
approved plan's "stage 1": introduce the `datad` daemon
|
||||
machinery additively (nothing routes through it yet).
|
||||
User-decided constraint applied: `datad.<broker>` is a
|
||||
SIBLING of `brokerd.<broker>` under `pikerd`, spawned
|
||||
via the existing `Services` + `maybe_spawn_daemon()`
|
||||
machinery.
|
||||
|
||||
## Response summary
|
||||
|
||||
New `piker/data/_daemon.py` hosting the
|
||||
`datad.<broker>` feed-only daemon-actor: lifetime
|
||||
fixture owning the actor-global `_FeedsBus`, init/spawn
|
||||
fns mirroring `.brokers._daemon` conventions and the
|
||||
`samplerd` sub-daemon precedent, plus root-mod
|
||||
registration, `piker.service` re-exports and a spawn
|
||||
test.
|
||||
|
||||
## Files changed
|
||||
|
||||
- `piker/data/_daemon.py` — NEW:
|
||||
`_setup_persistent_datad()`, `datad_init()`,
|
||||
`spawn_datad()`, `maybe_spawn_datad()`,
|
||||
`_datad_service_mods`
|
||||
- `piker/service/_actor_runtime.py` — add
|
||||
`piker.data._daemon` to `_root_modules`
|
||||
- `piker/service/__init__.py` — re-export spawn eps
|
||||
- `tests/test_services.py` — add `test_datad_spawn`
|
||||
|
||||
## Human edits
|
||||
|
||||
None — committed as generated.
|
||||
|
|
@ -0,0 +1,60 @@
|
|||
---
|
||||
model: claude-fable-5[1m]
|
||||
service: claude
|
||||
timestamp: 2026-06-10T17:11:42Z
|
||||
git_ref: datad_service
|
||||
diff_cmd: git log -1 -p --follow -- ai/prompt-io/claude/20260610T171142Z_119d2c04_prompt_io.md
|
||||
---
|
||||
|
||||
NOTE: diff-ref mode entry (code committed in the same
|
||||
commit as this log); backfilled from the live dev
|
||||
session transcript per the `/prompt-io` skill rules.
|
||||
|
||||
> `git log -1 -p --follow -- piker/data/_daemon.py`
|
||||
|
||||
Generated symbols + key design decisions:
|
||||
|
||||
- `_datad_service_mods: list[str]` — datad-always
|
||||
enabled mods, the data-side successor to the old
|
||||
`piker.brokers._daemon._data_mods` set; kept minimal
|
||||
per the caps-sec model.
|
||||
- `_setup_persistent_datad()` — `@tractor.context`
|
||||
lifetime fixture: console-log setup then allocates
|
||||
the actor-global feed bus via
|
||||
`feed.get_feed_bus(brokername, service_nursery)`
|
||||
exactly as the old brokerd fixture did, pinned open
|
||||
with `ctx.started()` + `sleep_forever()`.
|
||||
- `datad_init()` — actor name `f'datad.{brokername}'`;
|
||||
copies backend `_spawn_kwargs` (CRITICAL for `ib`'s
|
||||
`infect_asyncio=True`); builds `enable_modules` from
|
||||
`getattr(brokermod, '_datad_mods',
|
||||
getattr(brokermod, '__enable_modules__', []))` as
|
||||
the flat-backend fallback.
|
||||
- `spawn_datad()` — `Services.actor_n.start_actor()` +
|
||||
`Services.start_service_task()` exactly mirroring
|
||||
`spawn_brokerd()`; dedup-composes enable mods via
|
||||
`list(dict.fromkeys(...))`.
|
||||
- `maybe_spawn_datad()` — wraps `maybe_spawn_daemon(
|
||||
service_name=f'datad.{brokername}', ...)`.
|
||||
|
||||
> `git log -1 -p --follow -- piker/service/_actor_runtime.py`
|
||||
> `git log -1 -p --follow -- piker/service/__init__.py`
|
||||
> `git log -1 -p --follow -- tests/test_services.py`
|
||||
|
||||
Design rationale (verbatim from session):
|
||||
|
||||
- `_root_modules` must gain `piker.data._daemon` so
|
||||
`pikerd_portal.run(spawn_datad, ...)` resolves in
|
||||
the root.
|
||||
- the `Services`-based impl style deliberately mirrors
|
||||
`spawn_brokerd()` so the eventual `tractor.hilevel`
|
||||
`ServiceMngr` port (see the `service_mng_to_tractor`
|
||||
branch's d8c21d44 prep, surfaced by the
|
||||
user-requested branch-overlap survey) lands
|
||||
symmetrically on both spawn fns.
|
||||
- mod placement (`piker/data/_daemon.py` vs.
|
||||
generalizing `piker.brokers._daemon`) follows the
|
||||
per-subsystem daemon-mod convention
|
||||
(`.clearing._ems`, `.data._sampling`) and resolves
|
||||
the existing TODO at `brokers/_daemon.py:49` ("move
|
||||
this def to the `.data` subpkg").
|
||||
|
|
@ -0,0 +1,300 @@
|
|||
# piker: trading gear for hackers
|
||||
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Data-daemon-actor "endpoint-hooks": the service task entry
|
||||
points for `datad.<brokername>`, the per-provider real-time
|
||||
and historical market-data feed daemon.
|
||||
|
||||
The (data vs. broker)d-split mirrors the ep-groupings in
|
||||
`piker.data.validate._eps`: this daemon hosts all `'datad'`
|
||||
eps (live quotes, history loading, symbology search) while
|
||||
its `brokerd.<brokername>` sibling hosts only the live
|
||||
order-control (and thus credentialed) `'brokerd'` eps.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from types import ModuleType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AsyncContextManager,
|
||||
)
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
from piker.log import (
|
||||
get_logger,
|
||||
get_console_log,
|
||||
)
|
||||
from . import _util
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .feed import _FeedsBus
|
||||
|
||||
log = get_logger(name=__name__)
|
||||
|
||||
# `datad`-actor-always-enabled mods: the data-side successor
|
||||
# to the old `piker.brokers._daemon._data_mods` set.
|
||||
# NOTE: keeping this list as small as possible is part of
|
||||
# our caps-sec model and should be treated with utmost care!
|
||||
_datad_service_mods: list[str] = [
|
||||
'piker.brokers.core',
|
||||
'piker.brokers.data',
|
||||
'piker.data',
|
||||
'piker.data.feed',
|
||||
'piker.data._sampling',
|
||||
'piker.data._daemon',
|
||||
]
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def _setup_persistent_datad(
|
||||
ctx: tractor.Context,
|
||||
brokername: str,
|
||||
loglevel: str|None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Allocate an actor-wide service nursery in this
|
||||
`datad.<brokername>` actor such that data-feed tasks
|
||||
(shm writer-samplers, history backfillers, symbology
|
||||
loaders) can be run in the background persistently by
|
||||
the provider backend as needed.
|
||||
|
||||
'''
|
||||
# NOTE: we only need to setup logging once (and only)
|
||||
# here since all hosted daemon tasks will reference
|
||||
# this same log instance's (actor local) state and thus
|
||||
# don't require any further (level) configuration on
|
||||
# their own B)
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
tll: str = actor.loglevel
|
||||
log = get_console_log(
|
||||
level=loglevel or tll,
|
||||
name=f'{_util.subsys}.{brokername}',
|
||||
with_tractor_log=bool(tll),
|
||||
)
|
||||
assert log.name == _util.subsys
|
||||
|
||||
from piker.data import feed
|
||||
assert not feed._bus
|
||||
|
||||
# allocate a nursery to the bus for spawning background
|
||||
# tasks which service client IPC requests, normally
|
||||
# `tractor.Context` connections to explicitly required
|
||||
# `datad` endpoints such as:
|
||||
# - `stream_quotes()`,
|
||||
# - `manage_history()`,
|
||||
# - `allocate_persistent_feed()`,
|
||||
# - `open_symbol_search()`
|
||||
# NOTE: see ep invocation details inside `.data.feed`.
|
||||
async with (
|
||||
trio.open_nursery() as service_nursery
|
||||
):
|
||||
bus: _FeedsBus = feed.get_feed_bus(
|
||||
brokername,
|
||||
service_nursery,
|
||||
)
|
||||
assert bus is feed._bus
|
||||
|
||||
# unblock caller
|
||||
await ctx.started()
|
||||
|
||||
# we pin this task to keep the feeds manager active
|
||||
# until the parent actor decides to tear it down
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
def datad_init(
|
||||
brokername: str,
|
||||
loglevel: str|None = None,
|
||||
|
||||
**start_actor_kwargs,
|
||||
|
||||
) -> tuple[
|
||||
ModuleType,
|
||||
dict,
|
||||
AsyncContextManager,
|
||||
]:
|
||||
'''
|
||||
Given an input broker name, load all named arguments
|
||||
which can be passed for daemon endpoint + context spawn
|
||||
as required in every `datad.<brokername>` (actor)
|
||||
service.
|
||||
|
||||
This includes:
|
||||
- load the appropriate <brokername>.py pkg module,
|
||||
- reads any declared `_datad_mods: list[str]` (falling
|
||||
back to the full `__enable_modules__` set for
|
||||
not-yet-split backends) which will be passed to
|
||||
`tractor.ActorNursery.start_actor(enable_modules=)`
|
||||
at actor start time,
|
||||
- deliver a reference to the daemon lifetime fixture,
|
||||
which for now is always the
|
||||
`_setup_persistent_datad()` context defined above.
|
||||
|
||||
'''
|
||||
from piker.brokers import get_brokermod
|
||||
from .validate import get_eps
|
||||
brokermod = get_brokermod(brokername)
|
||||
modpath: str = brokermod.__name__
|
||||
|
||||
# warn (but don't bail) when the backend is missing
|
||||
# some/all of the `datad` ep contract defined by
|
||||
# `piker.data.validate._eps`.
|
||||
datad_eps: dict = get_eps(brokermod, 'datad')
|
||||
if not datad_eps:
|
||||
log.warning(
|
||||
f'Backend {brokername!r} offers NO `datad` '
|
||||
f'(data-feed) eps!?\n'
|
||||
f'Most feed/chart functionality will be '
|
||||
f'broken for this provider..\n'
|
||||
)
|
||||
|
||||
start_actor_kwargs['name'] = f'datad.{brokername}'
|
||||
|
||||
# XXX CRITICAL: include any backend-declared spawn
|
||||
# kwargs, eg. `{'infect_asyncio': True}` required by
|
||||
# `ib`'s embedded `asyncio`-mode `ib_async` usage!
|
||||
start_actor_kwargs.update(
|
||||
getattr(
|
||||
brokermod,
|
||||
'_spawn_kwargs',
|
||||
{},
|
||||
)
|
||||
)
|
||||
|
||||
# lookup actor-enabled modules declared by the backend
|
||||
# offering the `datad` endpoint(s).
|
||||
enabled: list[str]
|
||||
enabled = start_actor_kwargs['enable_modules'] = [
|
||||
__name__, # so that eps from THIS mod can be invoked
|
||||
modpath,
|
||||
]
|
||||
for submodname in getattr(
|
||||
brokermod,
|
||||
'_datad_mods',
|
||||
# fallback for (flat, less mature) backends which
|
||||
# don't yet declare a daemon-kind mod split.
|
||||
getattr(
|
||||
brokermod,
|
||||
'__enable_modules__',
|
||||
[],
|
||||
),
|
||||
):
|
||||
subpath: str = f'{modpath}.{submodname}'
|
||||
enabled.append(subpath)
|
||||
|
||||
return (
|
||||
brokermod,
|
||||
start_actor_kwargs, # to `ActorNursery.start_actor()`
|
||||
|
||||
# XXX see impl above; contains all (actor global)
|
||||
# setup/teardown expected in all `datad` actor
|
||||
# instances.
|
||||
_setup_persistent_datad,
|
||||
)
|
||||
|
||||
|
||||
async def spawn_datad(
|
||||
brokername: str,
|
||||
loglevel: str|None = None,
|
||||
|
||||
**tractor_kwargs,
|
||||
|
||||
) -> bool:
|
||||
|
||||
log.info(
|
||||
f'Spawning data-daemon,\n'
|
||||
f'backend: {brokername!r}'
|
||||
)
|
||||
|
||||
(
|
||||
brokermod,
|
||||
tractor_kwargs,
|
||||
daemon_fixture_ep,
|
||||
) = datad_init(
|
||||
brokername,
|
||||
loglevel,
|
||||
**tractor_kwargs,
|
||||
)
|
||||
|
||||
# ask `pikerd` to spawn a new sub-actor and manage it
|
||||
# under its actor nursery
|
||||
from piker.service import Services
|
||||
|
||||
dname: str = tractor_kwargs.pop('name') # f'datad.{brokername}'
|
||||
enable_mods: list[str] = list(dict.fromkeys(
|
||||
_datad_service_mods
|
||||
+
|
||||
tractor_kwargs.pop('enable_modules')
|
||||
))
|
||||
portal = await Services.actor_n.start_actor(
|
||||
dname,
|
||||
enable_modules=enable_mods,
|
||||
debug_mode=Services.debug_mode,
|
||||
**tractor_kwargs,
|
||||
)
|
||||
|
||||
# NOTE: the service mngr expects an already spawned
|
||||
# actor + its portal ref in order to do non-blocking
|
||||
# setup of the `datad` service nursery.
|
||||
await Services.start_service_task(
|
||||
dname,
|
||||
portal,
|
||||
|
||||
# signature of target root-task endpoint
|
||||
daemon_fixture_ep,
|
||||
brokername=brokername,
|
||||
loglevel=loglevel,
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_spawn_datad(
|
||||
brokername: str,
|
||||
loglevel: str|None = None,
|
||||
|
||||
**pikerd_kwargs,
|
||||
|
||||
) -> tractor.Portal:
|
||||
'''
|
||||
Helper to spawn a datad service *from* a client who
|
||||
wishes to use the sub-actor-daemon but is fine with
|
||||
re-using any existing and contactable `datad`.
|
||||
|
||||
Mas o menos, acts as a cached-actor-getter factory.
|
||||
|
||||
'''
|
||||
from piker.service import maybe_spawn_daemon
|
||||
|
||||
async with maybe_spawn_daemon(
|
||||
service_name=f'datad.{brokername}',
|
||||
service_task_target=spawn_datad,
|
||||
spawn_args={
|
||||
'brokername': brokername,
|
||||
},
|
||||
loglevel=loglevel,
|
||||
|
||||
**pikerd_kwargs,
|
||||
|
||||
) as portal:
|
||||
yield portal
|
||||
|
|
@ -56,3 +56,7 @@ from ..brokers._daemon import (
|
|||
spawn_brokerd as spawn_brokerd,
|
||||
maybe_spawn_brokerd as maybe_spawn_brokerd,
|
||||
)
|
||||
from ..data._daemon import (
|
||||
spawn_datad as spawn_datad,
|
||||
maybe_spawn_datad as maybe_spawn_datad,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -157,6 +157,7 @@ _root_modules: list[str] = [
|
|||
__name__,
|
||||
'piker.service._daemon',
|
||||
'piker.brokers._daemon',
|
||||
'piker.data._daemon',
|
||||
|
||||
'piker.clearing._ems',
|
||||
'piker.clearing._client',
|
||||
|
|
|
|||
|
|
@ -66,6 +66,41 @@ def test_runtime_boot(
|
|||
trio.run(main)
|
||||
|
||||
|
||||
def test_datad_spawn(
|
||||
open_test_pikerd: AsyncContextManager,
|
||||
loglevel: str,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Verify the new (data-feed-only) `datad.<broker>` daemon
|
||||
can be spawned/registered as a `pikerd` sub-service via
|
||||
the `maybe_spawn_datad()` factory.
|
||||
|
||||
'''
|
||||
from piker.service import maybe_spawn_datad
|
||||
|
||||
backend: str = 'kraken'
|
||||
datad_name: str = f'datad.{backend}'
|
||||
|
||||
async def main():
|
||||
async with (
|
||||
open_test_pikerd() as (_, _, _, services),
|
||||
|
||||
maybe_spawn_datad(
|
||||
backend,
|
||||
loglevel=loglevel,
|
||||
) as portal,
|
||||
):
|
||||
assert portal
|
||||
|
||||
async with ensure_service(datad_name):
|
||||
assert (
|
||||
datad_name in services.service_tasks
|
||||
)
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
||||
def test_ensure_datafeed_actors(
|
||||
open_test_pikerd: AsyncContextManager,
|
||||
loglevel: str,
|
||||
|
|
|
|||
Loading…
Reference in New Issue