Compare commits

..

10 Commits

Author SHA1 Message Date
Gud Boi 4dc50e1d1b brokerd: slim RPC caps + `ib` client-id offset
Caps-sec tightening now that `brokerd` is trading-only: NO
`piker.data.*` (feed) mods are RPC-enabled in the (live,
credentialed) trading actor anymore.

Deats,
- drop `_data_mods` for a minimal `_brokerd_service_mods`
  (just `piker.brokers._daemon`); dedup-compose with the
  backend's set in `spawn_brokerd()`.
- `broker_init()` reads the backend's `_brokerd_mods`
  (fallback: `__enable_modules__` for flat backends).
- fail fast in `spawn_brokerd()` via `validate.get_eps()`
  when a backend offers NO live order-ctl eps (eg.
  `kucoin`, `deribit`) -> tells the caller to use
  paper-mode instead of booting a dead actor; analogous
  warning in `datad_init()` for datad-ep-less backends.
- offset `ib`'s default `client_id` per daemon-kind in
  `load_aio_clients()`: post-split BOTH `datad.ib` and
  `brokerd.ib` connect to the same gw/tws endpoint and the
  shared default (6116 + linear retry incrs) would collide
  and burn the full conn-timeout retry cycle; datad gets
  +16, ad-hoc (test/cli) actors +32.
- drop the import-cleanup leftovers (`exceptiongroup`,
  `_FeedsBus` type-only import) and the now-resolved
  "expose datad" TODO in `.cli`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:23:04 -04:00
Gud Boi d7f1d70b61 .data: cut feed layer over to `datad` actors
The topology flip: all data-feed consumers now route to the
new `datad.<broker>` sibling daemon; `brokerd` becomes
trading-only and is ONLY ever booted lazily by `emsd`'s
`open_brokerd_dialog()` (see prior commit). Chart-only and
paper sessions run with zero (live, credentialed) `brokerd`
procs B)

Deats,
- `open_feed()` -> `maybe_spawn_datad()` (NB: imported
  relative-direct from `._daemon` to dodge a partial-init
  cycle via `piker.service`); flip the `open_feed_bus()`
  actor-name assert to `'datad'`; comment sweep.
- slim `_setup_persistent_brokerd()` to a trading-only
  fixture: console logging + pinned-open ctx; the feed-bus
  alloc moves to `_setup_persistent_datad()` and backend
  `open_trade_dialog()` ctxs own their own task trees.
  (the `piker ledger` ad-hoc actor enters this same slimmed
  fixture - exactly what it needs.)
- repoint data-flavoured spawn sites to `maybe_spawn_datad`:
  `.ui._app` symbol-search (+ rename
  `install_brokerd_search` -> `install_datad_search`),
  `.brokers.core.symbol_search()`, `.brokers.cli`
  `brokercheck`/`record`, legacy kivy `.ui.cli` +
  `option_chain`'s `wait_for_actor()`.
- invert `tests.test_services` expectations: feed/EMS-paper
  flows must spawn `datad.kraken` and `paperboi.kraken`
  with an explicit negative assert that NO `brokerd.kraken`
  service task exists.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:22:44 -04:00
Gud Boi 3548893337 .clearing: lazily spawn `brokerd` from `emsd`
Drop the ONE coupling that forces feed + trading eps into
the same actor: `Router.open_trade_relays()` pulling its
trades-dialog portal from `feed.portals[brokermod]`.
Instead `open_brokerd_dialog()` now (maybe) spawns/finds
`brokerd.<broker>` itself via `maybe_spawn_brokerd()` and
ONLY when a live trades-ep will actually be opened; the
paper-mode short-circuit never touches it, so post
feed-cutover paper sessions will run with zero `brokerd`
procs.

Pre-cutover this is a pure refactor: the registry lookup
just finds the same feed-spawned daemon.

Deats,
- new `open_brokerd_dialog()` sig: portal acquisition moves
  inside via an `acquire_live_portal()` helper; keep an
  explicit `portal: Portal|None` override for the
  `piker ledger` cli which boots its own ad-hoc actor.
- `Router.maybe_open_brokerd_dialog()` drops its `portal`
  param; `open_trade_relays()` drops the `feed.portals`
  lookup entirely.
- `.accounting.cli`: pass `portal=` by keyword.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:22:16 -04:00
Gud Boi 7de661c03e Add `datad` daemon machinery to `.data`
First half of the `brokerd` split: a new per-provider
data-feed-only daemon-actor `datad.<broker>` to (soon) host
all `validate._eps['datad']` eps (live quotes, history
loading, symbology search) leaving `brokerd` for live order
ctl only. Purely additive; nothing routes through it yet.

Deats,
- new `piker.data._daemon` mod mirroring the
  `.brokers._daemon` conventions (and the `samplerd`
  sub-daemon precedent):
  - `_setup_persistent_datad()` lifetime fixture owning the
    actor-global `_FeedsBus` alloc.
  - `datad_init()` building `enable_modules` from the
    backend's `_datad_mods` (falling back to
    `__enable_modules__` for not-yet-split backends) and
    copying `_spawn_kwargs` (critical for `ib`'s
    `infect_asyncio`).
  - `spawn_datad()`/`maybe_spawn_datad()` wrapping
    `Services` + `maybe_spawn_daemon()`.
- add `piker.data._daemon` to `_root_modules` so `pikerd`
  can run `spawn_datad()` requests.
- re-export the spawn eps from `piker.service`.
- add `test_datad_spawn` verifying actor boot + service
  registration via `ensure_service('datad.kraken')`.

Note the `Services`-based impl style deliberately mirrors
`spawn_brokerd()` so the eventual `tractor.hilevel`
`ServiceMngr` port (see the `service_mng_to_tractor`
branch's d8c21d44 prep work) lands symmetrically on both.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:21:59 -04:00
Gud Boi 66957ffdb0 Declare per-daemon-kind backend mod groups
Prep for the `brokerd` -> (`datad` + `brokerd`) actor split
by having each (split-style) backend declare which of its
submods host which daemon-kind's eps, exactly per the
`piker.data.validate._eps` groupings; `ib` already had
`_brokerd_mods`/`_datad_mods` so extend the convention to
`kraken`, `binance` and `deribit` (and add `'api'` to ib's
datad set since both kinds need the `Client` layer).

`__enable_modules__` stays as the (deduped) union so this
is a ZERO behavior change; flat backends (`kucoin` etc.)
just don't declare the split yet.

Also,
- add `validate.get_eps()` returning a backend's defined
  eps per daemon-kind for spawn-time introspection.
- import `NoBsWs`/`open_autorecon_ws` from
  `piker.data._web_bs` directly in `.kraken.broker` (they
  were only re-exported via `.kraken.feed`) so the trading
  mod doesn't depend on the feed mod for ws primitives.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:21:26 -04:00
Gud Boi fa8d413c84 Fix `pytest` config-dir isolation in subactors
The old (commented-out) `get_app_dir()` override gated on
`'pytest' in sys.modules` which can NEVER work in spawned
subactors (fresh procs, no pytest import); as a result test
`paperboi`/daemon actors were writing into the user's REAL
`~/.config/piker/accounting/` files.. friggin yikes.

Deats,
- add `config._maybe_use_test_dir()` which lazily (at
  conf-path access time, NOT import time) reads the
  `piker_test_dir` entry from
  `tractor.runtime._state._runtime_vars['piker_vars']` as
  pre-loaded by `open_piker_runtime()` from the
  `tests.conftest._open_test_pikerd()` overrides.
- hook it in `get_conf_dir()` and route `get_conf_path()`
  + `load()`'s mkdir through `get_conf_dir()`.
- route `.accounting._ledger` / `._pos` dir derivation
  through `config.get_conf_dir()` (was reading the
  `_config_dir` global directly, bypassing the override);
  also `mkdir(parents=True, exist_ok=True)` for nested
  tmp-dir creation.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:21:04 -04:00
Gud Boi fa98290808 Port service+tests to latest `tractor` APIs
Continue the `repair_tests`-branch mission (already merged
in this stack's ancestry, see f4c4f1e2 which ported
`conftest.py`) by fixing the remaining drift breakage vs.
`tractor` git `main`; without these NOTHING boots since the
`tractor.Address` port in 604e5fcf.

Deats,
- normalize reg addrs via `wrap_address()` in
  `open_pikerd()` before `.unwrap()`-ing; entries may be
  raw `tuple`s when passed in from (test) client code.
- port `check_for_service()` to `query_actor(regaddr=)`
  (was `arbiter_sockaddr=`) incl. its 2-tuple yield and
  the now-required `open_registry(addrs=)` arg.
- `wait_for_actor(registry_addr=)` + `.chan.raddr.unwrap()`
  raw-tuple compares in `test_runtime_boot` and
  `ensure_service()`.
- update `run_test_w_cancel_method()` for modern `tractor`
  cancel semantics: self-requested sub-service cancels are
  absorbed (no `ContextCancelled` raised to the opener) and
  single-exc groups collapse to a bare KBI.
- `RemoteActorError.boxed_type` (was `.type`) and
  `Position.cumsize` (was `.size`) renames in tests.
- bump the paper-EMS startup budget 9 -> 19s; it includes
  a live (kraken) symbology fetch so needs net headroom.
- woops, add the missing comma in `.deribit.api`'s
  `tractor.trionics` import tuple..

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:20:45 -04:00
Gud Boi 75cefe10f1 ib: attempt to REPL closed connection events 2026-06-09 14:03:41 -04:00
Gud Boi b547a33da4 ib: attempt to REPL closed connection events 2026-06-09 14:03:34 -04:00
Gud Boi 0df3943f3c Pin `tractor` to git `main`, bump `xonsh` deps
Switch the `tractor` dep from the local editable checkout
(`../tractor`) back to the upstream git `main` branch so a
fresh clone can `uv sync` without a sibling repo.

Also,
- add `xonsh>=0.23.8` as a core dep
- bump `repl` group pins: `xonsh>=0.23.0` and
  `prompt-toolkit>=3.0.50` (was `==3.0.40`)
- relock `uv.lock` accordingly

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-06-09 14:00:20 -04:00
30 changed files with 740 additions and 246 deletions

View File

@ -324,10 +324,13 @@ def load_ledger(
ldir: Path = (
dirpath
or
config._config_dir / 'accounting' / 'ledgers'
config.get_conf_dir() / 'accounting' / 'ledgers'
)
if not ldir.is_dir():
ldir.mkdir()
ldir.mkdir(
parents=True,
exist_ok=True,
)
fname = f'trades_{brokername}_{acctid}.toml'
fpath: Path = ldir / fname

View File

@ -785,9 +785,16 @@ def load_account(
legacy_fn: str = f'pps.{brokername}.{acctid}.toml'
fn: str = f'account.{brokername}.{acctid}.toml'
dirpath: Path = dirpath or (config._config_dir / 'accounting')
dirpath: Path = (
dirpath
or
(config.get_conf_dir() / 'accounting')
)
if not dirpath.is_dir():
dirpath.mkdir()
dirpath.mkdir(
parents=True,
exist_ok=True,
)
conf, path = config.load(
path=dirpath / fn,

View File

@ -143,12 +143,15 @@ def sync(
# (what the EMS normally does internall) B)
open_brokerd_dialog(
brokermod,
portal,
exec_mode=(
'paper'
if account == 'paper'
else 'live'
),
# use our own ad-hoc-spawned actor,
# do NOT (spawn and) use the
# `brokerd.<broker>` service daemon!
portal=portal,
loglevel=loglevel,
) as (
brokerd_stream,

View File

@ -25,10 +25,8 @@ from contextlib import (
)
from types import ModuleType
from typing import (
TYPE_CHECKING,
AsyncContextManager,
)
import exceptiongroup as eg
import tractor
import trio
@ -40,27 +38,20 @@ from piker.log import (
from . import _util
from . import get_brokermod
if TYPE_CHECKING:
from ..data import _FeedsBus
log = get_logger(name=__name__)
# `brokerd` enabled modules
# TODO: move this def to the `.data` subpkg..
# `brokerd`-actor-always-enabled mods.
# NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care!
_data_mods: str = [
'piker.brokers.core',
'piker.brokers.data',
# model and should be treated with utmost care! In particular NO
# `piker.data.*` feed mods should be enabled in this (live,
# credentialed) trading actor; all data-feed serving is the
# domain of the `datad.<broker>` sibling daemon, see
# `piker.data._daemon._datad_service_mods`.
_brokerd_service_mods: list[str] = [
'piker.brokers._daemon',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
]
# TODO: we should rename the daemon to datad prolly once we split up
# broker vs. data tasks into separate actors?
@tractor.context
async def _setup_persistent_brokerd(
ctx: tractor.Context,
@ -69,9 +60,15 @@ async def _setup_persistent_brokerd(
) -> None:
'''
Allocate a actor-wide service nursery in ``brokerd``
such that feeds can be run in the background persistently by
the broker backend as needed.
Trading-only daemon (lifetime) fixture: console logging
setup and a pinned-open context for service mgmt.
All data-feed-bus state now lives in the (data-feed-only)
`datad.<brokername>` sibling daemon, see
`piker.data._daemon._setup_persistent_datad()`; this
actor hosts only the backend's `open_trade_dialog()`
(live order-control) ep-task(s) which manage their own
task trees per `tractor.Context`.
'''
# NOTE: we only need to setup logging once (and only) here
@ -87,46 +84,12 @@ async def _setup_persistent_brokerd(
)
assert log.name == _util.subsys
# further, set the log level on any broker broker specific
# logger instance.
# unblock caller
await ctx.started()
from piker.data import feed
assert not feed._bus
# allocate a nursery to the bus for spawning background
# tasks to service client IPC requests, normally
# `tractor.Context` connections to explicitly required
# `brokerd` endpoints such as:
# - `stream_quotes()`,
# - `manage_history()`,
# - `allocate_persistent_feed()`,
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
try:
async with (
# tractor.trionics.collapse_eg(),
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,
)
assert bus is feed._bus
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
except eg.ExceptionGroup:
# TODO: likely some underlying `brokerd` IPC connection
# broke so here we handle a respawn and re-connect attempt!
# This likely should pair with development of the OCO task
# nusery in dev over @ `tractor` B)
# https://github.com/goodboy/tractor/pull/363
raise
# we pin this task to keep the daemon active until the
# parent actor decides to tear it down
await trio.sleep_forever()
def broker_init(
@ -147,8 +110,10 @@ def broker_init(
This includes:
- load the appropriate <brokername>.py pkg module,
- reads any declared `__enable_modules__: listr[str]` which will be
passed to `tractor.ActorNursery.start_actor(enabled_modules=<this>)`
- reads any declared `_brokerd_mods: list[str]` (falling
back to the full `__enable_modules__` set for
not-yet-split backends) which will be passed to
`tractor.ActorNursery.start_actor(enable_modules=)`
at actor start time,
- deliver a references to the daemon lifetime fixture, which
for now is always the `_setup_persistent_brokerd()` context defined
@ -183,8 +148,14 @@ def broker_init(
]
for submodname in getattr(
brokermod,
'__enable_modules__',
[],
'_brokerd_mods',
# fallback for (flat, less mature) backends which
# don't yet declare a daemon-kind mod split.
getattr(
brokermod,
'__enable_modules__',
[],
),
):
subpath: str = f'{modpath}.{submodname}'
enabled.append(subpath)
@ -212,6 +183,22 @@ async def spawn_brokerd(
f'backend: {brokername!r}'
)
# fail fast on (data-only) backends which don't offer
# ANY live order-control eps; the caller should instead
# be using paper-mode (and thus never spawning us)!
from ..data.validate import get_eps
brokerd_eps: dict = get_eps(
get_brokermod(brokername),
'brokerd',
)
if not brokerd_eps:
raise RuntimeError(
f'Backend {brokername!r} offers NO `brokerd` '
f'(live order-control) eps!?\n'
f'It is likely a datad-only provider, use '
f'paper-mode for clearing instead.\n'
)
(
brokermode,
tractor_kwargs,
@ -233,7 +220,11 @@ async def spawn_brokerd(
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
portal = await Services.actor_n.start_actor(
dname,
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
enable_modules=list(dict.fromkeys(
_brokerd_service_mods
+
tractor_kwargs.pop('enable_modules')
)),
debug_mode=Services.debug_mode,
**tractor_kwargs
)

View File

@ -52,9 +52,25 @@ __all__ = [
]
# `brokerd` modules
__enable_modules__: list[str] = [
# per-daemon-kind (sub)mod groups: declares which of our
# submods host the eps run by each daemon-actor kind as
# defined by `piker.data.validate._eps`.
# NOTE: `get_mkt_info` and `open_symbol_search` both live
# in `.feed` for this backend (no `symbols.py`).
_brokerd_mods: list[str] = [
'api',
'feed',
'broker',
]
_datad_mods: list[str] = [
'api',
'feed',
]
# tractor RPC enable arg
__enable_modules__: list[str] = list(dict.fromkeys(
_brokerd_mods
+
_datad_mods
))

View File

@ -35,7 +35,7 @@ from piker.log import (
get_logger,
)
from ..service import (
maybe_spawn_brokerd,
maybe_spawn_datad,
maybe_open_pikerd,
)
from ..brokers import (
@ -187,7 +187,7 @@ def brokercheck(config, broker):
'''
async def bcheck_main():
async with maybe_spawn_brokerd(broker) as portal:
async with maybe_spawn_datad(broker) as portal:
await portal.run(run_test, broker)
await portal.cancel_actor()
@ -317,7 +317,7 @@ def record(config, rate, name, dhost, filename):
return
async def main(tries):
async with maybe_spawn_brokerd(
async with maybe_spawn_datad(
tries=tries, loglevel=loglevel
) as portal:
# run app "main"

View File

@ -30,7 +30,7 @@ import trio
from piker.log import get_logger
from . import get_brokermod
from ..service import maybe_spawn_brokerd
from ..service import maybe_spawn_datad
from . import open_cached_client
from ..accounting import MktPair
@ -172,7 +172,7 @@ async def symbol_search(
# await tractor.devx._debug.maybe_init_greenback()
# tractor.pause_from_sync()
async with maybe_spawn_brokerd(
async with maybe_spawn_datad(
mod.name,
infect_asyncio=getattr(
mod,

View File

@ -47,13 +47,25 @@ __all__ = [
]
# tractor RPC enable arg
__enable_modules__: list[str] = [
# per-daemon-kind (sub)mod groups: declares which of our
# submods host the eps run by each daemon-actor kind as
# defined by `piker.data.validate._eps`.
# NOTE: datad-only backend (no `broker.py` yet)!
_brokerd_mods: list[str] = []
_datad_mods: list[str] = [
'api',
'feed',
# 'broker',
]
# tractor RPC enable arg
__enable_modules__: list[str] = list(dict.fromkeys(
_brokerd_mods
+
_datad_mods
))
# passed to ``tractor.ActorNursery.start_actor()``
_spawn_kwargs = {
'infect_asyncio': True,

View File

@ -37,7 +37,7 @@ from rapidfuzz import process as fuzzy
import numpy as np
from tractor.trionics import (
broadcast_receiver,
maybe_open_context
maybe_open_context,
collapse_eg,
)
from tractor import to_asyncio

View File

@ -65,17 +65,18 @@ _brokerd_mods: list[str] = [
]
_datad_mods: list[str] = [
'api',
'feed',
'symbols',
]
# tractor RPC enable arg
__enable_modules__: list[str] = (
__enable_modules__: list[str] = list(dict.fromkeys(
_brokerd_mods
+
_datad_mods
)
))
# passed to ``tractor.ActorNursery.start_actor()``
_spawn_kwargs = {

View File

@ -1340,6 +1340,21 @@ async def load_aio_clients(
ib = None
client = None
# XXX: post (brokerd vs. datad)-split BOTH per-broker
# daemons connect to the same API gw/tws endpoint(s); to
# avoid `clientId` collisions (and the long conn-timeout
# retry cycle they cause) we offset the data-daemon's
# default id-range to be disjoint from `brokerd.ib`'s
# (which also retries with `client_id + i` increments).
if client_id == 6116: # the default from above
aname: str = tractor.current_actor().name
if 'datad' in aname:
client_id += 16
# ad-hoc (test/cli) actors get their own range to
# avoid clashing with any live daemon-tree's conns.
elif 'brokerd' not in aname:
client_id += 32
# attempt to get connection info from config; if no .toml entry
# exists, we try to load from a default localhost connection.
localhost = '127.0.0.1'

View File

@ -1163,9 +1163,9 @@ async def deliver_trade_events(
case 'execDetailsEvent':
# unpack attrs pep-0526 style.
trade: Trade
con: Contract = trade.contract
fill: Fill
trade, fill = item
con: Contract = trade.contract
execu: Execution = fill.execution
execid: str = execu.execId
report: CommissionReport = fill.commissionReport
@ -1334,6 +1334,8 @@ async def deliver_trade_events(
# XXX known special (ignore) cases
elif code in {
# ^TODO, if this is it we should definitely raise
# or at least provide for reconnect attempts?
200, # uhh.. ni idea
# hist pacing / connectivity
@ -1344,10 +1346,18 @@ async def deliver_trade_events(
# 'No market data during competing live session'
1669,
}:
pcc: str = "Peer closed connection."
if pcc in err:
# TODO, emit and raise?
# [ ] try for reconnect?
# raise BrokerdError(
await tractor.pause()
log.error(
f'Order-API-error which is non-cancel-causing ?!\n'
f'\n'
f'{pformat(err)}\n'
f'code={code!r}\n'
f'err={pformat(err)}\n'
)
continue

View File

@ -66,10 +66,24 @@ __all__ = [
]
# tractor RPC enable arg
__enable_modules__: list[str] = [
# per-daemon-kind (sub)mod groups: declares which of our
# submods host the eps run by each daemon-actor kind as
# defined by `piker.data.validate._eps`.
_brokerd_mods: list[str] = [
'api',
'broker',
]
_datad_mods: list[str] = [
'api',
'feed',
'symbols',
]
# tractor RPC enable arg
__enable_modules__: list[str] = list(dict.fromkeys(
_brokerd_mods
+
_datad_mods
))

View File

@ -73,10 +73,12 @@ from piker.log import (
get_logger,
)
from piker.data import open_symcache
from . import api
from .feed import (
from piker.data._web_bs import (
open_autorecon_ws,
NoBsWs,
)
from . import api
from .feed import (
stream_messages,
)
from .ledger import (

View File

@ -335,9 +335,14 @@ class TradesRelay(Struct):
@acm
async def open_brokerd_dialog(
brokermod: ModuleType,
portal: tractor.Portal,
exec_mode: str,
fqme: str|None = None,
# XXX: explicit (already spawned) trading-actor override,
# currently only used by the `piker ledger` cli which
# boots its own ad-hoc `brokerd`-like actor; normally we
# (lazily) spawn/find the `brokerd.<broker>` daemon here.
portal: tractor.Portal|None = None,
loglevel: str|None = None,
) -> tuple[
@ -351,6 +356,10 @@ async def open_brokerd_dialog(
paper engine instance depending on live trading support for the
broker backend, configuration, or client code usage.
NOTE: this is now the ONE place where a (live, credentialed)
`brokerd.<broker>` daemon-actor gets (lazily) booted; pure
data/paper sessions should never spawn one!
'''
get_console_log(
level=loglevel,
@ -416,16 +425,29 @@ async def open_brokerd_dialog(
)
exec_mode: str = 'paper'
if (
trades_endpoint is not None
or
exec_mode != 'paper'
):
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
@acm
async def acquire_live_portal():
'''
Deliver a portal to the (live, credentialed) trading
actor hosting the backend's `open_trade_dialog()` ep:
either the caller-provided override or the (maybe
lazily spawned) `brokerd.<broker>` service daemon.
'''
if portal is not None:
yield portal
return
# XXX: the ONE (normal) place a `brokerd.<broker>`
# daemon-actor gets booted in the runtime B)
from piker.brokers._daemon import (
maybe_spawn_brokerd,
)
async with maybe_spawn_brokerd(
brokermod.name,
loglevel=loglevel,
) as live_portal:
yield live_portal
@acm
async def maybe_open_paper_ep():
@ -437,7 +459,14 @@ async def open_brokerd_dialog(
return
# open trades-dialog endpoint with backend broker
async with open_trades_endpoint as msg:
async with (
acquire_live_portal() as live_portal,
live_portal.open_context(
trades_endpoint,
loglevel=loglevel,
) as msg,
):
ctx, first = msg
# runtime indication that the backend can't support live
@ -581,7 +610,6 @@ class Router(Struct):
async def maybe_open_brokerd_dialog(
self,
brokermod: ModuleType,
portal: tractor.Portal,
exec_mode: str,
fqme: str,
loglevel: str,
@ -606,7 +634,6 @@ class Router(Struct):
async with open_brokerd_dialog(
brokermod=brokermod,
portal=portal,
exec_mode=exec_mode,
fqme=fqme,
loglevel=loglevel,
@ -668,7 +695,6 @@ class Router(Struct):
brokername, _, _, _ = unpack_fqme(fqme)
brokermod = feed.mods[brokername]
broker = brokermod.name
portal = feed.portals[brokermod]
# XXX: this should be initial price quote from target provider
flume = feed.flumes[fqme]
@ -682,7 +708,6 @@ class Router(Struct):
async with self.maybe_open_brokerd_dialog(
brokermod=brokermod,
portal=portal,
exec_mode=exec_mode,
fqme=fqme,
loglevel=loglevel,

View File

@ -250,7 +250,6 @@ def cli(
# TODO: load endpoints from `conf::[network].pikerd`
# - pikerd vs. regd, separate registry daemon?
# - expose datad vs. brokerd?
# - bind emsd with certain perms on public iface?
regaddrs: list[tuple[str, int]] = regaddr or [(
_default_registry_host,

View File

@ -188,6 +188,48 @@ def _override_config_dir(
_config_dir = path
def _maybe_use_test_dir() -> None:
'''
When running under the `pytest` harness, override
the config dir to the per-test-tmp dir "passed down"
the actor tree via `tractor`'s runtime-vars
inheritance mechanism.
See the `tractor_runtime_overrides` usage in our
`tests.conftest._open_test_pikerd()` as well as
`.service._actor_runtime.open_piker_runtime()` for
the root-actor's pre-loading of the var state.
NOTE: this must be checked lazily at config-path
access time (NOT import time) since sub-actors only
receive runtime-vars once their `tractor` runtime
has fully booted.
'''
global _config_dir
import tractor
actor = tractor.current_actor(
err_on_no_runtime=False,
)
if actor is None:
return
rvs: dict = tractor.runtime._state._runtime_vars
pvars: dict|None = rvs.get('piker_vars')
if (
pvars
and
(testdir := pvars.get('piker_test_dir'))
):
testdirpath = Path(testdir)
assert testdirpath.exists(), (
f'piker test harness might be borked!?\n'
f'testdirpath: {testdirpath!r}\n'
)
if _config_dir != testdirpath:
_override_config_dir(testdirpath)
def _conf_fn_w_ext(
name: str,
) -> str:
@ -201,6 +243,7 @@ def get_conf_dir() -> Path:
on the local filesystem.
'''
_maybe_use_test_dir()
return _config_dir
@ -226,7 +269,7 @@ def get_conf_path(
assert str(conf_name) in _conf_names
fn = _conf_fn_w_ext(conf_name)
return _config_dir / Path(fn)
return get_conf_dir() / Path(fn)
def repodir() -> Path:
@ -271,8 +314,9 @@ def load(
'''
# create the $HOME/.config/piker dir if dne
if not _config_dir.is_dir():
_config_dir.mkdir(
conf_dir: Path = get_conf_dir()
if not conf_dir.is_dir():
conf_dir.mkdir(
parents=True,
exist_ok=True,
)

View File

@ -0,0 +1,300 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Data-daemon-actor "endpoint-hooks": the service task entry
points for `datad.<brokername>`, the per-provider real-time
and historical market-data feed daemon.
The (data vs. broker)d-split mirrors the ep-groupings in
`piker.data.validate._eps`: this daemon hosts all `'datad'`
eps (live quotes, history loading, symbology search) while
its `brokerd.<brokername>` sibling hosts only the live
order-control (and thus credentialed) `'brokerd'` eps.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from types import ModuleType
from typing import (
TYPE_CHECKING,
AsyncContextManager,
)
import tractor
import trio
from piker.log import (
get_logger,
get_console_log,
)
from . import _util
if TYPE_CHECKING:
from .feed import _FeedsBus
log = get_logger(name=__name__)
# `datad`-actor-always-enabled mods: the data-side successor
# to the old `piker.brokers._daemon._data_mods` set.
# NOTE: keeping this list as small as possible is part of
# our caps-sec model and should be treated with utmost care!
_datad_service_mods: list[str] = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data.feed',
'piker.data._sampling',
'piker.data._daemon',
]
@tractor.context
async def _setup_persistent_datad(
ctx: tractor.Context,
brokername: str,
loglevel: str|None = None,
) -> None:
'''
Allocate an actor-wide service nursery in this
`datad.<brokername>` actor such that data-feed tasks
(shm writer-samplers, history backfillers, symbology
loaders) can be run in the background persistently by
the provider backend as needed.
'''
# NOTE: we only need to setup logging once (and only)
# here since all hosted daemon tasks will reference
# this same log instance's (actor local) state and thus
# don't require any further (level) configuration on
# their own B)
actor: tractor.Actor = tractor.current_actor()
tll: str = actor.loglevel
log = get_console_log(
level=loglevel or tll,
name=f'{_util.subsys}.{brokername}',
with_tractor_log=bool(tll),
)
assert log.name == _util.subsys
from piker.data import feed
assert not feed._bus
# allocate a nursery to the bus for spawning background
# tasks which service client IPC requests, normally
# `tractor.Context` connections to explicitly required
# `datad` endpoints such as:
# - `stream_quotes()`,
# - `manage_history()`,
# - `allocate_persistent_feed()`,
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
async with (
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,
)
assert bus is feed._bus
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active
# until the parent actor decides to tear it down
await trio.sleep_forever()
def datad_init(
brokername: str,
loglevel: str|None = None,
**start_actor_kwargs,
) -> tuple[
ModuleType,
dict,
AsyncContextManager,
]:
'''
Given an input broker name, load all named arguments
which can be passed for daemon endpoint + context spawn
as required in every `datad.<brokername>` (actor)
service.
This includes:
- load the appropriate <brokername>.py pkg module,
- reads any declared `_datad_mods: list[str]` (falling
back to the full `__enable_modules__` set for
not-yet-split backends) which will be passed to
`tractor.ActorNursery.start_actor(enable_modules=)`
at actor start time,
- deliver a reference to the daemon lifetime fixture,
which for now is always the
`_setup_persistent_datad()` context defined above.
'''
from piker.brokers import get_brokermod
from .validate import get_eps
brokermod = get_brokermod(brokername)
modpath: str = brokermod.__name__
# warn (but don't bail) when the backend is missing
# some/all of the `datad` ep contract defined by
# `piker.data.validate._eps`.
datad_eps: dict = get_eps(brokermod, 'datad')
if not datad_eps:
log.warning(
f'Backend {brokername!r} offers NO `datad` '
f'(data-feed) eps!?\n'
f'Most feed/chart functionality will be '
f'broken for this provider..\n'
)
start_actor_kwargs['name'] = f'datad.{brokername}'
# XXX CRITICAL: include any backend-declared spawn
# kwargs, eg. `{'infect_asyncio': True}` required by
# `ib`'s embedded `asyncio`-mode `ib_async` usage!
start_actor_kwargs.update(
getattr(
brokermod,
'_spawn_kwargs',
{},
)
)
# lookup actor-enabled modules declared by the backend
# offering the `datad` endpoint(s).
enabled: list[str]
enabled = start_actor_kwargs['enable_modules'] = [
__name__, # so that eps from THIS mod can be invoked
modpath,
]
for submodname in getattr(
brokermod,
'_datad_mods',
# fallback for (flat, less mature) backends which
# don't yet declare a daemon-kind mod split.
getattr(
brokermod,
'__enable_modules__',
[],
),
):
subpath: str = f'{modpath}.{submodname}'
enabled.append(subpath)
return (
brokermod,
start_actor_kwargs, # to `ActorNursery.start_actor()`
# XXX see impl above; contains all (actor global)
# setup/teardown expected in all `datad` actor
# instances.
_setup_persistent_datad,
)
async def spawn_datad(
brokername: str,
loglevel: str|None = None,
**tractor_kwargs,
) -> bool:
log.info(
f'Spawning data-daemon,\n'
f'backend: {brokername!r}'
)
(
brokermod,
tractor_kwargs,
daemon_fixture_ep,
) = datad_init(
brokername,
loglevel,
**tractor_kwargs,
)
# ask `pikerd` to spawn a new sub-actor and manage it
# under its actor nursery
from piker.service import Services
dname: str = tractor_kwargs.pop('name') # f'datad.{brokername}'
enable_mods: list[str] = list(dict.fromkeys(
_datad_service_mods
+
tractor_kwargs.pop('enable_modules')
))
portal = await Services.actor_n.start_actor(
dname,
enable_modules=enable_mods,
debug_mode=Services.debug_mode,
**tractor_kwargs,
)
# NOTE: the service mngr expects an already spawned
# actor + its portal ref in order to do non-blocking
# setup of the `datad` service nursery.
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
daemon_fixture_ep,
brokername=brokername,
loglevel=loglevel,
)
return True
@acm
async def maybe_spawn_datad(
brokername: str,
loglevel: str|None = None,
**pikerd_kwargs,
) -> tractor.Portal:
'''
Helper to spawn a datad service *from* a client who
wishes to use the sub-actor-daemon but is fine with
re-using any existing and contactable `datad`.
Mas o menos, acts as a cached-actor-getter factory.
'''
from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon(
service_name=f'datad.{brokername}',
service_task_target=spawn_datad,
spawn_args={
'brokername': brokername,
},
loglevel=loglevel,
**pikerd_kwargs,
) as portal:
yield portal

View File

@ -17,7 +17,7 @@
'''
Data feed apis and infra.
This module is enabled for ``brokerd`` daemons and includes mostly
This module is enabled for ``datad`` daemons and includes mostly
endpoints and middleware to support our real-time, provider agnostic,
live market quotes layer. Historical data loading and processing is also
initiated in parts of the feed bus startup but business logic and
@ -54,8 +54,11 @@ from piker.accounting import (
)
from piker.types import Struct
from piker.brokers import get_brokermod
from piker.service import (
maybe_spawn_brokerd,
# NOTE: must be a "relative-direct" import (NOT via
# `piker.service`) to avoid a partial-init cycle when this
# mod is loaded as part of `piker.service.__init__`.
from ._daemon import (
maybe_spawn_datad,
)
from piker.calc import humanize
from ._util import (
@ -110,7 +113,7 @@ class _FeedsBus(Struct):
'''
Data feeds broadcaster and persistence management.
This is a brokerd side api used to manager persistent real-time
This is a datad side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely. A bus is
associated one-to-one with a particular broker backend where the
"bus" refers so a multi-symbol bus where quotes are interleaved in
@ -249,7 +252,7 @@ async def allocate_persistent_feed(
'''
Create and maintain a "feed bus" which allocates tasks for real-time
streaming and optional historical data storage per broker/data provider
backend; this normally task runs *in* a `brokerd` actor.
backend; this normally task runs *in* a `datad` actor.
If none exists, this allocates a ``_FeedsBus`` which manages the
lifetimes of streaming tasks created for each requested symbol.
@ -318,8 +321,8 @@ async def allocate_persistent_feed(
# at max capacity.
# - the same ideas ^ but when a local core is maxxed out (like how
# binance does often with hft XD
# - if a brokerd is non-local then we can't just allocate a mem
# channel here and have the brokerd write it, we instead need
# - if a datad is non-local then we can't just allocate a mem
# channel here and have the datad write it, we instead need
# a small streaming machine around the remote feed which can then
# do the normal work of sampling and writing shm buffers
# (depending on if we want sampling done on the far end or not?)
@ -499,7 +502,7 @@ async def open_feed_bus(
# (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are
servicename = tractor.current_actor().name
assert 'brokerd' in servicename
assert 'datad' in servicename
assert brokername in servicename
bus: _FeedsBus = get_feed_bus(brokername)
@ -509,12 +512,12 @@ async def open_feed_bus(
for symbol in symbols:
# if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in
# datad yet, start persistent stream and shm writer task in
# service nursery
flume = bus.feeds.get(symbol)
if flume is None:
# allocate a new actor-local stream bus which
# will persist for this `brokerd`'s service lifetime.
# will persist for this `datad`'s service lifetime.
async with bus.task_lock:
await bus.nursery.start(
partial(
@ -721,7 +724,7 @@ class Feed(Struct):
mods = {name: self.mods[name] for name in brokers}
if len(mods) == 1:
# just pass the brokerd stream directly if only one provider
# just pass the datad stream directly if only one provider
# was detected.
stream = self.streams[list(brokers)[0]]
async with stream.subscribe() as bstream:
@ -763,7 +766,7 @@ class Feed(Struct):
@acm
async def install_brokerd_search(
async def install_datad_search(
portal: tractor.Portal,
brokermod: ModuleType,
@ -812,7 +815,7 @@ async def maybe_open_feed(
ReceiveChannel[dict[str, Any]],
):
'''
Maybe open a data to a ``brokerd`` daemon only if there is no
Maybe open a data feed to a ``datad`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver.
@ -885,14 +888,14 @@ async def open_feed(
providers.setdefault(mod, []).append(bs_fqme)
feed.mods[mod.name] = mod
# one actor per brokerd for now
brokerd_ctxs = []
# one actor per datad for now
datad_ctxs = []
for brokermod, bfqmes in providers.items():
# if no `brokerd` for this backend exists yet we spawn
# if no `datad` for this backend exists yet we spawn
# a daemon actor for it.
brokerd_ctxs.append(
maybe_spawn_brokerd(
datad_ctxs.append(
maybe_spawn_datad(
brokermod.name,
loglevel=loglevel
)
@ -900,7 +903,7 @@ async def open_feed(
portals: tuple[tractor.Portal]
async with trionics.gather_contexts(
brokerd_ctxs,
datad_ctxs,
) as portals:
bus_ctxs: list[AsyncContextManager] = []
@ -937,9 +940,9 @@ async def open_feed(
tick_throttle=tick_throttle,
# XXX: super important to avoid
# the brokerd from some other
# the datad from some other
# backend overruning the task here
# bc some other brokerd took longer
# bc some other datad took longer
# to startup before we hit the `.open_stream()`
# loop below XD .. really we should try to do each
# of these stream open sequences sequentially per
@ -1008,7 +1011,7 @@ async def open_feed(
assert stream
feed.streams[brokermod.name] = stream
# apply `brokerd`-common stream to each flume
# apply `datad`-common stream to each flume
# tracking a live market feed from that provider.
for fqme, flume in feed.flumes.items():
if brokermod.name == flume.mkt.broker:

View File

@ -91,6 +91,24 @@ _eps: dict[str, list[str]] = {
}
def get_eps(
mod: ModuleType,
kind: str, # 'middleware' | 'datad' | 'brokerd'
) -> dict[str, Callable]:
'''
Return the daemon-kind's ep funcs defined by the backend
`mod`, keyed by ep name; any eps from `_eps[kind]` not
implemented by the backend are excluded.
'''
return {
name: ep
for name in _eps[kind]
if (ep := getattr(mod, name, None))
}
def validate_backend(
mod: ModuleType,
syms: list[str],

View File

@ -56,3 +56,7 @@ from ..brokers._daemon import (
spawn_brokerd as spawn_brokerd,
maybe_spawn_brokerd as maybe_spawn_brokerd,
)
from ..data._daemon import (
spawn_datad as spawn_datad,
maybe_spawn_datad as maybe_spawn_datad,
)

View File

@ -157,6 +157,7 @@ _root_modules: list[str] = [
__name__,
'piker.service._daemon',
'piker.brokers._daemon',
'piker.data._daemon',
'piker.clearing._ems',
'piker.clearing._client',
@ -214,7 +215,13 @@ async def open_pikerd(
trio.open_nursery() as service_tn,
):
for addr in reg_addrs:
uaddr: tuple = addr.unwrap()
# normalize to a wrapped `tractor` addr-type;
# entries may be raw `tuple`s when passed in
# from (test) client code.
wladdr = tractor.discovery._addr.wrap_address(
addr,
)
uaddr: tuple = wladdr.unwrap()
if (
uaddr not in root_actor.accept_addrs
):

View File

@ -225,10 +225,13 @@ async def check_for_service(
'''
async with (
open_registry(ensure_exists=False) as reg_addr,
open_registry(
addrs=Registry.addrs,
ensure_exists=False,
) as reg_addrs,
tractor.query_actor(
service_name,
arbiter_sockaddr=reg_addr,
) as sockaddr,
regaddr=reg_addrs[0],
) as (sockaddr, _),
):
return sockaddr

View File

@ -32,12 +32,12 @@ from . import _event
from . import _search
from ..accounting import unpack_fqme
from ..data._symcache import open_symcache
from ..data.feed import install_brokerd_search
from ..data.feed import install_datad_search
from ..log import (
get_logger,
get_console_log,
)
from ..service import maybe_spawn_brokerd
from ..service import maybe_spawn_datad
from ._exec import run_qtractor
log = get_logger(__name__)
@ -50,16 +50,16 @@ async def load_provider_search(
) -> None:
name = brokermod.name
log.info(f'loading brokerd for {name}..')
log.info(f'loading datad for {name}..')
async with (
maybe_spawn_brokerd(
maybe_spawn_datad(
name,
loglevel=loglevel
) as portal,
install_brokerd_search(
install_datad_search(
portal,
brokermod,
),

View File

@ -27,7 +27,7 @@ from ..cli import (
load_trans_eps,
)
from .. import watchlists as wl
from ..service import maybe_spawn_brokerd
from ..service import maybe_spawn_datad
_config_dir = click.get_app_dir('piker')
@ -69,7 +69,7 @@ def monitor(config, rate, name, dhost, test, tl):
from .kivy.monitor import _async_main
async def main():
async with maybe_spawn_brokerd(
async with maybe_spawn_datad(
brokername=brokermod.name,
loglevel=loglevel
) as portal:
@ -118,7 +118,7 @@ def optschain(
from .kivy.option_chain import _async_main
async def main():
async with maybe_spawn_brokerd(
async with maybe_spawn_datad(
loglevel=loglevel
):
# run app "main"

View File

@ -495,7 +495,7 @@ async def _async_main(
async with trio.open_nursery() as nursery:
# get a portal to the data feed daemon
async with tractor.wait_for_actor('brokerd') as portal:
async with tractor.wait_for_actor('datad') as portal:
# set up a pager view for large ticker lists
chain = await new_chain_ui(

View File

@ -77,6 +77,7 @@ dependencies = [
"exchange-calendars>=4.13.1",
"ib-async>=2.1.0",
"aeventkit>=2.1.0", # XXX, imports as eventkit?
"xonsh>=0.23.8",
]
# ------ dependencies ------
# NOTE, by default we ship only a "headless" deps set bc
@ -132,8 +133,8 @@ repl = [
"greenback >=1.1.1, <2.0.0",
# @goodboy's preferred console toolz
"xonsh>=0.22.2",
"prompt-toolkit ==3.0.40",
"xonsh>=0.23.0",
"prompt-toolkit>=3.0.50",
"pyperclip>=1.9.0",
# for @claude's `snippets/claude_debug_helper.py` it uses to do
@ -206,8 +207,8 @@ pyvnc = { git = "https://github.com/regulad/pyvnc.git" }
# xonsh = { git = 'https://github.com/xonsh/xonsh.git', branch = 'main' }
# XXX since, we're like, always hacking new shite all-the-time. Bp
# tractor = { git = "https://github.com/goodboy/tractor.git", branch ="main" }
tractor = { git = "https://github.com/goodboy/tractor.git", branch ="main" }
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }
# ------ goodboy ------
# hackin dev-envs, usually there's something new he's hackin in..
tractor = { path = "../tractor", editable = true }
# tractor = { path = "../tractor", editable = true }

View File

@ -151,7 +151,7 @@ def load_and_check_pos(
# is the same the fqme.
pp: Position = table.pps[ppmsg.symbol]
assert ppmsg.size == pp.size
assert ppmsg.size == pp.cumsize
assert ppmsg.avg_price == pp.ppu
yield pp
@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
# NOTE: emsd should error on the actor's enabled modules
# import phase, when looking for a backend named `doggy`.
except tractor.RemoteActorError as re:
assert re.type is ModuleNotFoundError
assert re.boxed_type is ModuleNotFoundError
run_and_tollerate_cancels(load_bad_fqme)

View File

@ -53,11 +53,12 @@ def test_runtime_boot(
tractor.wait_for_actor(
'pikerd',
arbiter_sockaddr=daemon_addr,
registry_addr=daemon_addr,
) as portal,
):
assert pikerd_portal.channel.raddr == daemon_addr
assert pikerd_portal.channel.raddr == portal.channel.raddr
uw_raddr: tuple = pikerd_portal.chan.raddr.unwrap()
assert uw_raddr == daemon_addr
assert uw_raddr == portal.chan.raddr.unwrap()
# no service tasks should be started
assert not services.service_tasks
@ -65,6 +66,41 @@ def test_runtime_boot(
trio.run(main)
def test_datad_spawn(
open_test_pikerd: AsyncContextManager,
loglevel: str,
) -> None:
'''
Verify the new (data-feed-only) `datad.<broker>` daemon
can be spawned/registered as a `pikerd` sub-service via
the `maybe_spawn_datad()` factory.
'''
from piker.service import maybe_spawn_datad
backend: str = 'kraken'
datad_name: str = f'datad.{backend}'
async def main():
async with (
open_test_pikerd() as (_, _, _, services),
maybe_spawn_datad(
backend,
loglevel=loglevel,
) as portal,
):
assert portal
async with ensure_service(datad_name):
assert (
datad_name in services.service_tasks
)
trio.run(main)
def test_ensure_datafeed_actors(
open_test_pikerd: AsyncContextManager,
loglevel: str,
@ -72,14 +108,14 @@ def test_ensure_datafeed_actors(
) -> None:
'''
Verify that booting a data feed starts a `brokerd`
Verify that booting a data feed starts a `datad`
actor and a singleton global `samplerd` and opening
an order mode in paper opens the `paperboi` service.
'''
actor_name: str = 'brokerd'
actor_name: str = 'datad'
backend: str = 'kraken'
brokerd_name: str = f'{actor_name}.{backend}'
datad_name: str = f'{actor_name}.{backend}'
async def main():
async with (
@ -94,7 +130,7 @@ def test_ensure_datafeed_actors(
await feed.pause()
async with (
ensure_service(brokerd_name),
ensure_service(datad_name),
ensure_service('samplerd'),
):
await trio.sleep(0.1)
@ -108,7 +144,7 @@ async def ensure_service(
sockaddr: tuple[str, int] | None = None,
) -> None:
async with find_service(name) as portal:
remote_sockaddr = portal.channel.raddr
remote_sockaddr: tuple = portal.chan.raddr.unwrap()
print(f'FOUND `{name}` @ {remote_sockaddr}')
if sockaddr:
@ -131,40 +167,49 @@ def run_test_w_cancel_method(
"was remotely cancelled by remote actor (\'pikerd\'")
if cancel_method == 'sigint':
with pytest.raises(
# XXX: with modern `tractor` the (single-exc)
# group is collapsed so a bare KBI normally
# propagates; tolerate either form.
with pytest.raises((
KeyboardInterrupt,
BaseExceptionGroup,
) as exc_info:
)) as exc_info:
trio.run(main)
multi = exc_info.value
err = exc_info.value
match err:
case BaseExceptionGroup():
for suberr in err.exceptions:
match suberr:
# ensure we receive a remote
# cancellation error caused by the
# pikerd root actor.
case tractor.ContextCancelled():
assert (
cancelled_msg
in
suberr.args[0]
)
for suberr in multi.exceptions:
match suberr:
# ensure we receive a remote cancellation error caused
# by the pikerd root actor since we used the
# `.cancel_service()` API above B)
case tractor.ContextCancelled():
assert cancelled_msg in suberr.args[0]
case KeyboardInterrupt():
pass
case KeyboardInterrupt():
pass
case _:
pytest.fail(
f'Unexpected error {suberr}'
)
case _:
pytest.fail(f'Unexpected error {suberr}')
case KeyboardInterrupt():
pass
elif cancel_method == 'services':
# XXX NOTE: oddly, when you pass --pdb to pytest, i think since
# we also use that to enable the underlying tractor debug mode,
# it causes this to not raise for some reason? So if you see
# that while changing this test.. it's prolly that.
with pytest.raises(
tractor.ContextCancelled
) as exc_info:
trio.run(main)
assert cancelled_msg in exc_info.value.args[0]
# XXX: cancelling our own sub-service via
# `Services.cancel_service()` is a *self*
# requested cancel: modern `tractor` absorbs the
# resulting `ContextCancelled` (canceller is our
# own actor) so the runtime tears down gracefully
# with NO error raised to the opener.
trio.run(main)
else:
pytest.fail(f'Test is broken due to {cancel_method}')
@ -182,9 +227,9 @@ def test_ensure_ems_in_paper_actors(
) -> None:
actor_name: str = 'brokerd'
backend: str = 'kraken'
brokerd_name: str = f'{actor_name}.{backend}'
datad_name: str = f'datad.{backend}'
brokerd_name: str = f'brokerd.{backend}'
async def main():
@ -197,7 +242,9 @@ def test_ensure_ems_in_paper_actors(
# ensure we timeout after is startup is too slow.
# TODO: something like this should be our start point for
# benchmarking end-to-end startup B)
with trio.fail_after(9):
# NOTE: includes a live (kraken) symbology fetch so
# the budget needs some headroom for net latency..
with trio.fail_after(19):
async with (
open_test_pikerd() as (_, _, _, services),
@ -226,15 +273,25 @@ def test_ensure_ems_in_paper_actors(
async with (
ensure_service('emsd'),
ensure_service(brokerd_name),
ensure_service(datad_name),
ensure_service(f'paperboi.{backend}'),
):
for name in pikerd_subservices:
assert name in services.service_tasks
# brokerd.kraken actor should have been started
# implicitly by the ems.
assert brokerd_name in services.service_tasks
# datad.kraken actor should have been
# started implicitly by the feed layer.
assert datad_name in services.service_tasks
# XXX: paper-mode sessions should NEVER
# boot a (live, credentialed) `brokerd`;
# only emsd's `open_brokerd_dialog()`
# live-ep path is allowed to spawn it!
assert (
brokerd_name
not in
services.service_tasks
)
print('ALL SERVICES STARTED, cancelling runtime with:\n'
f'-> {cancel_method}')

73
uv.lock
View File

@ -1111,6 +1111,7 @@ dependencies = [
{ name = "trio-websocket" },
{ name = "typer" },
{ name = "websockets" },
{ name = "xonsh" },
]
[package.dev-dependencies]
@ -1181,13 +1182,14 @@ requires-dist = [
{ name = "tomli", specifier = ">=2.0.1,<3.0.0" },
{ name = "tomli-w", specifier = ">=1.0.0,<2.0.0" },
{ name = "tomlkit", git = "https://github.com/pikers/tomlkit.git?branch=piker_pin" },
{ name = "tractor", editable = "../tractor" },
{ name = "tractor", git = "https://github.com/goodboy/tractor.git?branch=main" },
{ name = "trio", specifier = ">=0.27" },
{ name = "trio-typing", specifier = ">=0.10.0" },
{ name = "trio-util", specifier = ">=0.7.0,<0.8.0" },
{ name = "trio-websocket", specifier = ">=0.10.3,<0.11.0" },
{ name = "typer", specifier = ">=0.9.0,<1.0.0" },
{ name = "websockets", specifier = "==12.0" },
{ name = "xonsh", specifier = ">=0.23.8" },
]
[package.metadata.requires-dev]
@ -1199,23 +1201,23 @@ dev = [
{ name = "i3ipc", specifier = ">=2.2.1" },
{ name = "pdbp", specifier = ">=1.8.2,<2.0.0" },
{ name = "pexpect", specifier = ">=4.9.0" },
{ name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pyqt6", specifier = ">=6.7.0,<7.0.0" },
{ name = "pyqtgraph", specifier = ">=0.14.0" },
{ name = "pytest" },
{ name = "qdarkstyle", specifier = ">=3.0.2,<4.0.0" },
{ name = "rapidfuzz", specifier = ">=3.2.0,<4.0.0" },
{ name = "xonsh", specifier = ">=0.22.2" },
{ name = "xonsh", specifier = ">=0.23.0" },
]
lint = [{ name = "ruff", specifier = ">=0.9.6" }]
repl = [
{ name = "greenback", specifier = ">=1.1.1,<2.0.0" },
{ name = "pdbp", specifier = ">=1.8.2,<2.0.0" },
{ name = "pexpect", specifier = ">=4.9.0" },
{ name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "xonsh", specifier = ">=0.22.2" },
{ name = "xonsh", specifier = ">=0.23.0" },
]
testing = [{ name = "pytest" }]
uis = [
@ -1287,14 +1289,14 @@ wheels = [
[[package]]
name = "prompt-toolkit"
version = "3.0.40"
version = "3.0.52"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "wcwidth" },
]
sdist = { url = "https://files.pythonhosted.org/packages/99/ce/172e474a87241a69baad1ce46bc8f31eae590a770cb138b9b73812c8234d/prompt_toolkit-3.0.40.tar.gz", hash = "sha256:a371c06bb1d66cd499fecd708e50c0b6ae00acba9822ba33c586e2f16d1b739e", size = 423978, upload-time = "2023-11-10T11:22:24.529Z" }
sdist = { url = "https://files.pythonhosted.org/packages/a1/96/06e01a7b38dce6fe1db213e061a4602dd6032a8a97ef6c1a862537732421/prompt_toolkit-3.0.52.tar.gz", hash = "sha256:28cde192929c8e7321de85de1ddbe736f1375148b02f2e17edd840042b1be855", size = 434198, upload-time = "2025-08-27T15:24:02.057Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/ed/29/cd63ff872dfc213e1cd8131f8060262db184b975868cef33302f44616c3e/prompt_toolkit-3.0.40-py3-none-any.whl", hash = "sha256:99ba3dfb23d5b5af89712f89e60a5f3d9b8b67a9482ca377c5771d0e9047a34b", size = 385150, upload-time = "2023-11-10T11:22:20.72Z" },
{ url = "https://files.pythonhosted.org/packages/84/03/0d3ce49e2505ae70cf43bc5bb3033955d2fc9f932163e84dc0779cc47f48/prompt_toolkit-3.0.52-py3-none-any.whl", hash = "sha256:9aac639a3bbd33284347de5ad8d68ecc044b91a762dc39b7c21095fcd6a19955", size = 391431, upload-time = "2025-08-27T15:23:59.498Z" },
]
[[package]]
@ -1913,7 +1915,7 @@ wheels = [
[[package]]
name = "tractor"
version = "0.1.0a6.dev0"
source = { editable = "../tractor" }
source = { git = "https://github.com/goodboy/tractor.git?branch=main#5c98ab1fb6e2cb7781aa75845c7f323a52853c46" }
dependencies = [
{ name = "bidict" },
{ name = "cffi" },
@ -1927,49 +1929,6 @@ dependencies = [
{ name = "wrapt" },
]
[package.metadata]
requires-dist = [
{ name = "bidict", specifier = ">=0.23.1" },
{ name = "cffi", specifier = ">=1.17.1" },
{ name = "colorlog", specifier = ">=6.8.2,<7" },
{ name = "msgspec", specifier = ">=0.19.0" },
{ name = "multiaddr", specifier = ">=0.2.0" },
{ name = "pdbp", specifier = ">=1.8.2,<2" },
{ name = "platformdirs", specifier = ">=4.4.0" },
{ name = "tricycle", specifier = ">=0.4.1,<0.5" },
{ name = "trio", specifier = ">0.27" },
{ name = "wrapt", specifier = ">=1.16.0,<2" },
]
[package.metadata.requires-dev]
dev = [
{ name = "greenback", specifier = ">=1.2.1,<2" },
{ name = "pexpect", specifier = ">=4.9.0,<5" },
{ name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pytest", specifier = ">=8.3.5" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" },
{ name = "typing-extensions", specifier = ">=4.14.1" },
{ name = "xonsh", specifier = ">=0.22.2" },
]
devx = [
{ name = "greenback", specifier = ">=1.2.1,<2" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" },
{ name = "typing-extensions", specifier = ">=4.14.1" },
]
lint = [{ name = "ruff", specifier = ">=0.9.6" }]
repl = [
{ name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "xonsh", specifier = ">=0.22.2" },
]
testing = [
{ name = "pexpect", specifier = ">=4.9.0,<5" },
{ name = "pytest", specifier = ">=8.3.5" },
]
[[package]]
name = "tricycle"
version = "0.4.1"
@ -2183,13 +2142,13 @@ wheels = [
[[package]]
name = "xonsh"
version = "0.22.4"
version = "0.23.8"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/48/df/1fc9ed62b3d7c14612e1713e9eb7bd41d54f6ad1028a8fbb6b7cddebc345/xonsh-0.22.4.tar.gz", hash = "sha256:6be346563fec2db75778ba5d2caee155525e634e99d9cc8cc347626025c0b3fa", size = 826665, upload-time = "2026-02-17T07:53:39.424Z" }
sdist = { url = "https://files.pythonhosted.org/packages/8b/77/0c4c39ad866d4ea1ef553f325d16e804d1bf1eeecc591f0e81b057aa37db/xonsh-0.23.8.tar.gz", hash = "sha256:541bb976c93a81571792644403bae8737145023da5f48d4c493909ab5c04ba0f", size = 1172271, upload-time = "2026-05-30T04:47:22.53Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2e/00/7cbc0c1fb64365a0a317c54ce3a151c9644eea5a509d9cbaae61c9fd1426/xonsh-0.22.4-py311-none-any.whl", hash = "sha256:38b29b29fa85aa756462d9d9bbcaa1d85478c2108da3de6cc590a69a4bcd1a01", size = 654375, upload-time = "2026-02-17T07:53:37.702Z" },
{ url = "https://files.pythonhosted.org/packages/2e/c2/3dd498dc28d8f89cdd52e39950c5e591499ae423f61694c0bb4d03ed1d82/xonsh-0.22.4-py312-none-any.whl", hash = "sha256:4e538fac9f4c3d866ddbdeca068f0c0515469c997ed58d3bfee963878c6df5a5", size = 654300, upload-time = "2026-02-17T07:53:35.813Z" },
{ url = "https://files.pythonhosted.org/packages/82/7d/1f9c7147518e9f03f6ce081b5bfc4f1aceb6ec5caba849024d005e41d3be/xonsh-0.22.4-py313-none-any.whl", hash = "sha256:cc5fabf0ad0c56a2a11bed1e6a43c4ec6416a5b30f24f126b8e768547c3793e2", size = 654818, upload-time = "2026-02-17T07:53:33.477Z" },
{ url = "https://files.pythonhosted.org/packages/ca/4a/2aab8300ad218dfc7678c34d5f703f09df5681fecc6e66d48c951ef58049/xonsh-0.23.8-py311-none-any.whl", hash = "sha256:4bab3e405643df2cc78ec2cac13241471841796fe710386d2179666aae8a5f9c", size = 799846, upload-time = "2026-05-30T04:47:21.211Z" },
{ url = "https://files.pythonhosted.org/packages/87/ec/aa66ef6046f90769dd8fcb3ddca9d00282d12e3d73645abbf12f190f17cf/xonsh-0.23.8-py312-none-any.whl", hash = "sha256:c7d0f0fba0cafe0bd75bf202820aeffc74b52943fa27d98d3b4346793f6ba493", size = 799868, upload-time = "2026-05-30T04:47:19.158Z" },
{ url = "https://files.pythonhosted.org/packages/12/fe/2d757d82b57332f1c6cd3f8c168fbcf060a275895a763542255ae1c53d75/xonsh-0.23.8-py313-none-any.whl", hash = "sha256:1b7335522a6ecd63f0d84151977a7a9050874d3ecec00cf79919d0770bebb1b4", size = 800388, upload-time = "2026-05-30T04:47:18.47Z" },
]
[[package]]