diff --git a/piker/data/_daemon.py b/piker/data/_daemon.py
new file mode 100644
index 00000000..663d7102
--- /dev/null
+++ b/piker/data/_daemon.py
@@ -0,0 +1,300 @@
+# piker: trading gear for hackers
+# Copyright (C) Tyler Goodlet (in stewardship for pikers)
+
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as published by
+# the Free Software Foundation, either version 3 of the License, or
+# (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see .
+
+'''
+Data-daemon-actor "endpoint-hooks": the service task entry
+points for `datad.`, the per-provider real-time
+and historical market-data feed daemon.
+
+The (data vs. broker)d-split mirrors the ep-groupings in
+`piker.data.validate._eps`: this daemon hosts all `'datad'`
+eps (live quotes, history loading, symbology search) while
+its `brokerd.` sibling hosts only the live
+order-control (and thus credentialed) `'brokerd'` eps.
+
+'''
+from __future__ import annotations
+from contextlib import (
+ asynccontextmanager as acm,
+)
+from types import ModuleType
+from typing import (
+ TYPE_CHECKING,
+ AsyncContextManager,
+)
+
+import tractor
+import trio
+
+from piker.log import (
+ get_logger,
+ get_console_log,
+)
+from . import _util
+
+if TYPE_CHECKING:
+ from .feed import _FeedsBus
+
+log = get_logger(name=__name__)
+
+# `datad`-actor-always-enabled mods: the data-side successor
+# to the old `piker.brokers._daemon._data_mods` set.
+# NOTE: keeping this list as small as possible is part of
+# our caps-sec model and should be treated with utmost care!
+_datad_service_mods: list[str] = [
+ 'piker.brokers.core',
+ 'piker.brokers.data',
+ 'piker.data',
+ 'piker.data.feed',
+ 'piker.data._sampling',
+ 'piker.data._daemon',
+]
+
+
+@tractor.context
+async def _setup_persistent_datad(
+ ctx: tractor.Context,
+ brokername: str,
+ loglevel: str|None = None,
+
+) -> None:
+ '''
+ Allocate an actor-wide service nursery in this
+ `datad.` actor such that data-feed tasks
+ (shm writer-samplers, history backfillers, symbology
+ loaders) can be run in the background persistently by
+ the provider backend as needed.
+
+ '''
+ # NOTE: we only need to setup logging once (and only)
+ # here since all hosted daemon tasks will reference
+ # this same log instance's (actor local) state and thus
+ # don't require any further (level) configuration on
+ # their own B)
+ actor: tractor.Actor = tractor.current_actor()
+ tll: str = actor.loglevel
+ log = get_console_log(
+ level=loglevel or tll,
+ name=f'{_util.subsys}.{brokername}',
+ with_tractor_log=bool(tll),
+ )
+ assert log.name == _util.subsys
+
+ from piker.data import feed
+ assert not feed._bus
+
+ # allocate a nursery to the bus for spawning background
+ # tasks which service client IPC requests, normally
+ # `tractor.Context` connections to explicitly required
+ # `datad` endpoints such as:
+ # - `stream_quotes()`,
+ # - `manage_history()`,
+ # - `allocate_persistent_feed()`,
+ # - `open_symbol_search()`
+ # NOTE: see ep invocation details inside `.data.feed`.
+ async with (
+ trio.open_nursery() as service_nursery
+ ):
+ bus: _FeedsBus = feed.get_feed_bus(
+ brokername,
+ service_nursery,
+ )
+ assert bus is feed._bus
+
+ # unblock caller
+ await ctx.started()
+
+ # we pin this task to keep the feeds manager active
+ # until the parent actor decides to tear it down
+ await trio.sleep_forever()
+
+
+def datad_init(
+ brokername: str,
+ loglevel: str|None = None,
+
+ **start_actor_kwargs,
+
+) -> tuple[
+ ModuleType,
+ dict,
+ AsyncContextManager,
+]:
+ '''
+ Given an input broker name, load all named arguments
+ which can be passed for daemon endpoint + context spawn
+ as required in every `datad.` (actor)
+ service.
+
+ This includes:
+ - load the appropriate .py pkg module,
+ - reads any declared `_datad_mods: list[str]` (falling
+ back to the full `__enable_modules__` set for
+ not-yet-split backends) which will be passed to
+ `tractor.ActorNursery.start_actor(enable_modules=)`
+ at actor start time,
+ - deliver a reference to the daemon lifetime fixture,
+ which for now is always the
+ `_setup_persistent_datad()` context defined above.
+
+ '''
+ from piker.brokers import get_brokermod
+ from .validate import get_eps
+ brokermod = get_brokermod(brokername)
+ modpath: str = brokermod.__name__
+
+ # warn (but don't bail) when the backend is missing
+ # some/all of the `datad` ep contract defined by
+ # `piker.data.validate._eps`.
+ datad_eps: dict = get_eps(brokermod, 'datad')
+ if not datad_eps:
+ log.warning(
+ f'Backend {brokername!r} offers NO `datad` '
+ f'(data-feed) eps!?\n'
+ f'Most feed/chart functionality will be '
+ f'broken for this provider..\n'
+ )
+
+ start_actor_kwargs['name'] = f'datad.{brokername}'
+
+ # XXX CRITICAL: include any backend-declared spawn
+ # kwargs, eg. `{'infect_asyncio': True}` required by
+ # `ib`'s embedded `asyncio`-mode `ib_async` usage!
+ start_actor_kwargs.update(
+ getattr(
+ brokermod,
+ '_spawn_kwargs',
+ {},
+ )
+ )
+
+ # lookup actor-enabled modules declared by the backend
+ # offering the `datad` endpoint(s).
+ enabled: list[str]
+ enabled = start_actor_kwargs['enable_modules'] = [
+ __name__, # so that eps from THIS mod can be invoked
+ modpath,
+ ]
+ for submodname in getattr(
+ brokermod,
+ '_datad_mods',
+ # fallback for (flat, less mature) backends which
+ # don't yet declare a daemon-kind mod split.
+ getattr(
+ brokermod,
+ '__enable_modules__',
+ [],
+ ),
+ ):
+ subpath: str = f'{modpath}.{submodname}'
+ enabled.append(subpath)
+
+ return (
+ brokermod,
+ start_actor_kwargs, # to `ActorNursery.start_actor()`
+
+ # XXX see impl above; contains all (actor global)
+ # setup/teardown expected in all `datad` actor
+ # instances.
+ _setup_persistent_datad,
+ )
+
+
+async def spawn_datad(
+ brokername: str,
+ loglevel: str|None = None,
+
+ **tractor_kwargs,
+
+) -> bool:
+
+ log.info(
+ f'Spawning data-daemon,\n'
+ f'backend: {brokername!r}'
+ )
+
+ (
+ brokermod,
+ tractor_kwargs,
+ daemon_fixture_ep,
+ ) = datad_init(
+ brokername,
+ loglevel,
+ **tractor_kwargs,
+ )
+
+ # ask `pikerd` to spawn a new sub-actor and manage it
+ # under its actor nursery
+ from piker.service import Services
+
+ dname: str = tractor_kwargs.pop('name') # f'datad.{brokername}'
+ enable_mods: list[str] = list(dict.fromkeys(
+ _datad_service_mods
+ +
+ tractor_kwargs.pop('enable_modules')
+ ))
+ portal = await Services.actor_n.start_actor(
+ dname,
+ enable_modules=enable_mods,
+ debug_mode=Services.debug_mode,
+ **tractor_kwargs,
+ )
+
+ # NOTE: the service mngr expects an already spawned
+ # actor + its portal ref in order to do non-blocking
+ # setup of the `datad` service nursery.
+ await Services.start_service_task(
+ dname,
+ portal,
+
+ # signature of target root-task endpoint
+ daemon_fixture_ep,
+ brokername=brokername,
+ loglevel=loglevel,
+ )
+ return True
+
+
+@acm
+async def maybe_spawn_datad(
+ brokername: str,
+ loglevel: str|None = None,
+
+ **pikerd_kwargs,
+
+) -> tractor.Portal:
+ '''
+ Helper to spawn a datad service *from* a client who
+ wishes to use the sub-actor-daemon but is fine with
+ re-using any existing and contactable `datad`.
+
+ Mas o menos, acts as a cached-actor-getter factory.
+
+ '''
+ from piker.service import maybe_spawn_daemon
+
+ async with maybe_spawn_daemon(
+ service_name=f'datad.{brokername}',
+ service_task_target=spawn_datad,
+ spawn_args={
+ 'brokername': brokername,
+ },
+ loglevel=loglevel,
+
+ **pikerd_kwargs,
+
+ ) as portal:
+ yield portal
diff --git a/piker/service/__init__.py b/piker/service/__init__.py
index 29360620..6fe5a5c8 100644
--- a/piker/service/__init__.py
+++ b/piker/service/__init__.py
@@ -56,3 +56,7 @@ from ..brokers._daemon import (
spawn_brokerd as spawn_brokerd,
maybe_spawn_brokerd as maybe_spawn_brokerd,
)
+from ..data._daemon import (
+ spawn_datad as spawn_datad,
+ maybe_spawn_datad as maybe_spawn_datad,
+)
diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py
index b0f180e4..6c09a9e7 100644
--- a/piker/service/_actor_runtime.py
+++ b/piker/service/_actor_runtime.py
@@ -157,6 +157,7 @@ _root_modules: list[str] = [
__name__,
'piker.service._daemon',
'piker.brokers._daemon',
+ 'piker.data._daemon',
'piker.clearing._ems',
'piker.clearing._client',
diff --git a/tests/test_services.py b/tests/test_services.py
index 69771c09..abbe6c6b 100644
--- a/tests/test_services.py
+++ b/tests/test_services.py
@@ -66,6 +66,41 @@ def test_runtime_boot(
trio.run(main)
+def test_datad_spawn(
+ open_test_pikerd: AsyncContextManager,
+ loglevel: str,
+
+) -> None:
+ '''
+ Verify the new (data-feed-only) `datad.` daemon
+ can be spawned/registered as a `pikerd` sub-service via
+ the `maybe_spawn_datad()` factory.
+
+ '''
+ from piker.service import maybe_spawn_datad
+
+ backend: str = 'kraken'
+ datad_name: str = f'datad.{backend}'
+
+ async def main():
+ async with (
+ open_test_pikerd() as (_, _, _, services),
+
+ maybe_spawn_datad(
+ backend,
+ loglevel=loglevel,
+ ) as portal,
+ ):
+ assert portal
+
+ async with ensure_service(datad_name):
+ assert (
+ datad_name in services.service_tasks
+ )
+
+ trio.run(main)
+
+
def test_ensure_datafeed_actors(
open_test_pikerd: AsyncContextManager,
loglevel: str,