Compare commits

...

7 Commits

Author SHA1 Message Date
Gud Boi 14e6a319ed brokerd: slim RPC caps + `ib` client-id offset
Caps-sec tightening now that `brokerd` is trading-only: NO
`piker.data.*` (feed) mods are RPC-enabled in the (live,
credentialed) trading actor anymore.

Deats,
- drop `_data_mods` for a minimal `_brokerd_service_mods`
  (just `piker.brokers._daemon`); dedup-compose with the
  backend's set in `spawn_brokerd()`.
- `broker_init()` reads the backend's `_brokerd_mods`
  (fallback: `__enable_modules__` for flat backends).
- fail fast in `spawn_brokerd()` via `validate.get_eps()`
  when a backend offers NO live order-ctl eps (eg.
  `kucoin`, `deribit`) -> tells the caller to use
  paper-mode instead of booting a dead actor; analogous
  warning in `datad_init()` for datad-ep-less backends.
- offset `ib`'s default `client_id` per daemon-kind in
  `load_aio_clients()`: post-split BOTH `datad.ib` and
  `brokerd.ib` connect to the same gw/tws endpoint and the
  shared default (6116 + linear retry incrs) would collide
  and burn the full conn-timeout retry cycle; datad gets
  +16, ad-hoc (test/cli) actors +32.
- drop the import-cleanup leftovers (`exceptiongroup`,
  `_FeedsBus` type-only import) and the now-resolved
  "expose datad" TODO in `.cli`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:25:09 -04:00
Gud Boi cefbc74671 .data: cut feed layer over to `datad` actors
The topology flip: all data-feed consumers now route to the
new `datad.<broker>` sibling daemon; `brokerd` becomes
trading-only and is ONLY ever booted lazily by `emsd`'s
`open_brokerd_dialog()` (see prior commit). Chart-only and
paper sessions run with zero (live, credentialed) `brokerd`
procs B)

Deats,
- `open_feed()` -> `maybe_spawn_datad()` (NB: imported
  relative-direct from `._daemon` to dodge a partial-init
  cycle via `piker.service`); flip the `open_feed_bus()`
  actor-name assert to `'datad'`; comment sweep.
- slim `_setup_persistent_brokerd()` to a trading-only
  fixture: console logging + pinned-open ctx; the feed-bus
  alloc moves to `_setup_persistent_datad()` and backend
  `open_trade_dialog()` ctxs own their own task trees.
  (the `piker ledger` ad-hoc actor enters this same slimmed
  fixture - exactly what it needs.)
- repoint data-flavoured spawn sites to `maybe_spawn_datad`:
  `.ui._app` symbol-search (+ rename
  `install_brokerd_search` -> `install_datad_search`),
  `.brokers.core.symbol_search()`, `.brokers.cli`
  `brokercheck`/`record`, legacy kivy `.ui.cli` +
  `option_chain`'s `wait_for_actor()`.
- invert `tests.test_services` expectations: feed/EMS-paper
  flows must spawn `datad.kraken` and `paperboi.kraken`
  with an explicit negative assert that NO `brokerd.kraken`
  service task exists.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:25:09 -04:00
Gud Boi 233a53ea48 .clearing: lazily spawn `brokerd` from `emsd`
Drop the ONE coupling that forces feed + trading eps into
the same actor: `Router.open_trade_relays()` pulling its
trades-dialog portal from `feed.portals[brokermod]`.
Instead `open_brokerd_dialog()` now (maybe) spawns/finds
`brokerd.<broker>` itself via `maybe_spawn_brokerd()` and
ONLY when a live trades-ep will actually be opened; the
paper-mode short-circuit never touches it, so post
feed-cutover paper sessions will run with zero `brokerd`
procs.

Pre-cutover this is a pure refactor: the registry lookup
just finds the same feed-spawned daemon.

Deats,
- new `open_brokerd_dialog()` sig: portal acquisition moves
  inside via an `acquire_live_portal()` helper; keep an
  explicit `portal: Portal|None` override for the
  `piker ledger` cli which boots its own ad-hoc actor.
- `Router.maybe_open_brokerd_dialog()` drops its `portal`
  param; `open_trade_relays()` drops the `feed.portals`
  lookup entirely.
- `.accounting.cli`: pass `portal=` by keyword.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:25:09 -04:00
Gud Boi abb211219f Add `datad` daemon machinery to `.data`
First half of the `brokerd` split: a new per-provider
data-feed-only daemon-actor `datad.<broker>` to (soon) host
all `validate._eps['datad']` eps (live quotes, history
loading, symbology search) leaving `brokerd` for live order
ctl only. Purely additive; nothing routes through it yet.

Deats,
- new `piker.data._daemon` mod mirroring the
  `.brokers._daemon` conventions (and the `samplerd`
  sub-daemon precedent):
  - `_setup_persistent_datad()` lifetime fixture owning the
    actor-global `_FeedsBus` alloc.
  - `datad_init()` building `enable_modules` from the
    backend's `_datad_mods` (falling back to
    `__enable_modules__` for not-yet-split backends) and
    copying `_spawn_kwargs` (critical for `ib`'s
    `infect_asyncio`).
  - `spawn_datad()`/`maybe_spawn_datad()` wrapping
    `Services` + `maybe_spawn_daemon()`.
- add `piker.data._daemon` to `_root_modules` so `pikerd`
  can run `spawn_datad()` requests.
- re-export the spawn eps from `piker.service`.
- add `test_datad_spawn` verifying actor boot + service
  registration via `ensure_service('datad.kraken')`.

Note the `Services`-based impl style deliberately mirrors
`spawn_brokerd()` so the eventual `tractor.hilevel`
`ServiceMngr` port (see the `service_mng_to_tractor`
branch's d8c21d44 prep work) lands symmetrically on both.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:25:09 -04:00
Gud Boi 523e95d8ba Declare per-daemon-kind backend mod groups
Prep for the `brokerd` -> (`datad` + `brokerd`) actor split
by having each (split-style) backend declare which of its
submods host which daemon-kind's eps, exactly per the
`piker.data.validate._eps` groupings; `ib` already had
`_brokerd_mods`/`_datad_mods` so extend the convention to
`kraken`, `binance` and `deribit` (and add `'api'` to ib's
datad set since both kinds need the `Client` layer).

`__enable_modules__` stays as the (deduped) union so this
is a ZERO behavior change; flat backends (`kucoin` etc.)
just don't declare the split yet.

Also,
- add `validate.get_eps()` returning a backend's defined
  eps per daemon-kind for spawn-time introspection.
- import `NoBsWs`/`open_autorecon_ws` from
  `piker.data._web_bs` directly in `.kraken.broker` (they
  were only re-exported via `.kraken.feed`) so the trading
  mod doesn't depend on the feed mod for ws primitives.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:25:09 -04:00
Gud Boi 47a7cf5502 Port service+tests to latest `tractor` APIs
Continue the `repair_tests`-branch mission (already merged
in this stack's ancestry, see f4c4f1e2 which ported
`conftest.py`) by fixing the remaining drift breakage vs.
`tractor` git `main`; without these NOTHING boots since the
`tractor.Address` port in 604e5fcf.

Deats,
- normalize reg addrs via `wrap_address()` in
  `open_pikerd()` before `.unwrap()`-ing; entries may be
  raw `tuple`s when passed in from (test) client code.
- port `check_for_service()` to `query_actor(regaddr=)`
  (was `arbiter_sockaddr=`) incl. its 2-tuple yield and
  the now-required `open_registry(addrs=)` arg.
- `wait_for_actor(registry_addr=)` + `.chan.raddr.unwrap()`
  raw-tuple compares in `test_runtime_boot` and
  `ensure_service()`.
- update `run_test_w_cancel_method()` for modern `tractor`
  cancel semantics: self-requested sub-service cancels are
  absorbed (no `ContextCancelled` raised to the opener) and
  single-exc groups collapse to a bare KBI.
- `RemoteActorError.boxed_type` (was `.type`) and
  `Position.cumsize` (was `.size`) renames in tests.
- bump the paper-EMS startup budget 9 -> 19s; it includes
  a live (kraken) symbology fetch so needs net headroom.
- woops, add the missing comma in `.deribit.api`'s
  `tractor.trionics` import tuple..

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:25:09 -04:00
Gud Boi 06c0a4856e Fix `pytest` config-dir isolation in subactors
The old (commented-out) `get_app_dir()` override gated on
`'pytest' in sys.modules` which can NEVER work in spawned
subactors (fresh procs, no pytest import); as a result test
`paperboi`/daemon actors were writing into the user's REAL
`~/.config/piker/accounting/` files.. friggin yikes.

Deats,
- add `config._maybe_use_test_dir()` which lazily (at
  conf-path access time, NOT import time) reads the
  `piker_test_dir` entry from
  `tractor.runtime._state._runtime_vars['piker_vars']` as
  pre-loaded by `open_piker_runtime()` from the
  `tests.conftest._open_test_pikerd()` overrides.
- hook it in `get_conf_dir()` and route `get_conf_path()`
  + `load()`'s mkdir through `get_conf_dir()`.
- route `.accounting._ledger` / `._pos` dir derivation
  through `config.get_conf_dir()` (was reading the
  `_config_dir` global directly, bypassing the override);
  also `mkdir(parents=True, exist_ok=True)` for nested
  tmp-dir creation.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
2026-06-09 17:25:09 -04:00
27 changed files with 707 additions and 183 deletions

View File

@ -324,10 +324,13 @@ def load_ledger(
ldir: Path = ( ldir: Path = (
dirpath dirpath
or or
config._config_dir / 'accounting' / 'ledgers' config.get_conf_dir() / 'accounting' / 'ledgers'
) )
if not ldir.is_dir(): if not ldir.is_dir():
ldir.mkdir() ldir.mkdir(
parents=True,
exist_ok=True,
)
fname = f'trades_{brokername}_{acctid}.toml' fname = f'trades_{brokername}_{acctid}.toml'
fpath: Path = ldir / fname fpath: Path = ldir / fname

View File

@ -785,9 +785,16 @@ def load_account(
legacy_fn: str = f'pps.{brokername}.{acctid}.toml' legacy_fn: str = f'pps.{brokername}.{acctid}.toml'
fn: str = f'account.{brokername}.{acctid}.toml' fn: str = f'account.{brokername}.{acctid}.toml'
dirpath: Path = dirpath or (config._config_dir / 'accounting') dirpath: Path = (
dirpath
or
(config.get_conf_dir() / 'accounting')
)
if not dirpath.is_dir(): if not dirpath.is_dir():
dirpath.mkdir() dirpath.mkdir(
parents=True,
exist_ok=True,
)
conf, path = config.load( conf, path = config.load(
path=dirpath / fn, path=dirpath / fn,

View File

@ -143,12 +143,15 @@ def sync(
# (what the EMS normally does internall) B) # (what the EMS normally does internall) B)
open_brokerd_dialog( open_brokerd_dialog(
brokermod, brokermod,
portal,
exec_mode=( exec_mode=(
'paper' 'paper'
if account == 'paper' if account == 'paper'
else 'live' else 'live'
), ),
# use our own ad-hoc-spawned actor,
# do NOT (spawn and) use the
# `brokerd.<broker>` service daemon!
portal=portal,
loglevel=loglevel, loglevel=loglevel,
) as ( ) as (
brokerd_stream, brokerd_stream,

View File

@ -25,10 +25,8 @@ from contextlib import (
) )
from types import ModuleType from types import ModuleType
from typing import ( from typing import (
TYPE_CHECKING,
AsyncContextManager, AsyncContextManager,
) )
import exceptiongroup as eg
import tractor import tractor
import trio import trio
@ -40,27 +38,20 @@ from piker.log import (
from . import _util from . import _util
from . import get_brokermod from . import get_brokermod
if TYPE_CHECKING:
from ..data import _FeedsBus
log = get_logger(name=__name__) log = get_logger(name=__name__)
# `brokerd` enabled modules # `brokerd`-actor-always-enabled mods.
# TODO: move this def to the `.data` subpkg..
# NOTE: keeping this list as small as possible is part of our caps-sec # NOTE: keeping this list as small as possible is part of our caps-sec
# model and should be treated with utmost care! # model and should be treated with utmost care! In particular NO
_data_mods: str = [ # `piker.data.*` feed mods should be enabled in this (live,
'piker.brokers.core', # credentialed) trading actor; all data-feed serving is the
'piker.brokers.data', # domain of the `datad.<broker>` sibling daemon, see
# `piker.data._daemon._datad_service_mods`.
_brokerd_service_mods: list[str] = [
'piker.brokers._daemon', 'piker.brokers._daemon',
'piker.data',
'piker.data.feed',
'piker.data._sampling'
] ]
# TODO: we should rename the daemon to datad prolly once we split up
# broker vs. data tasks into separate actors?
@tractor.context @tractor.context
async def _setup_persistent_brokerd( async def _setup_persistent_brokerd(
ctx: tractor.Context, ctx: tractor.Context,
@ -69,9 +60,15 @@ async def _setup_persistent_brokerd(
) -> None: ) -> None:
''' '''
Allocate a actor-wide service nursery in ``brokerd`` Trading-only daemon (lifetime) fixture: console logging
such that feeds can be run in the background persistently by setup and a pinned-open context for service mgmt.
the broker backend as needed.
All data-feed-bus state now lives in the (data-feed-only)
`datad.<brokername>` sibling daemon, see
`piker.data._daemon._setup_persistent_datad()`; this
actor hosts only the backend's `open_trade_dialog()`
(live order-control) ep-task(s) which manage their own
task trees per `tractor.Context`.
''' '''
# NOTE: we only need to setup logging once (and only) here # NOTE: we only need to setup logging once (and only) here
@ -87,46 +84,12 @@ async def _setup_persistent_brokerd(
) )
assert log.name == _util.subsys assert log.name == _util.subsys
# further, set the log level on any broker broker specific # unblock caller
# logger instance. await ctx.started()
from piker.data import feed # we pin this task to keep the daemon active until the
assert not feed._bus # parent actor decides to tear it down
await trio.sleep_forever()
# allocate a nursery to the bus for spawning background
# tasks to service client IPC requests, normally
# `tractor.Context` connections to explicitly required
# `brokerd` endpoints such as:
# - `stream_quotes()`,
# - `manage_history()`,
# - `allocate_persistent_feed()`,
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
try:
async with (
# tractor.trionics.collapse_eg(),
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,
)
assert bus is feed._bus
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active until the
# parent actor decides to tear it down
await trio.sleep_forever()
except eg.ExceptionGroup:
# TODO: likely some underlying `brokerd` IPC connection
# broke so here we handle a respawn and re-connect attempt!
# This likely should pair with development of the OCO task
# nusery in dev over @ `tractor` B)
# https://github.com/goodboy/tractor/pull/363
raise
def broker_init( def broker_init(
@ -147,8 +110,10 @@ def broker_init(
This includes: This includes:
- load the appropriate <brokername>.py pkg module, - load the appropriate <brokername>.py pkg module,
- reads any declared `__enable_modules__: listr[str]` which will be - reads any declared `_brokerd_mods: list[str]` (falling
passed to `tractor.ActorNursery.start_actor(enabled_modules=<this>)` back to the full `__enable_modules__` set for
not-yet-split backends) which will be passed to
`tractor.ActorNursery.start_actor(enable_modules=)`
at actor start time, at actor start time,
- deliver a references to the daemon lifetime fixture, which - deliver a references to the daemon lifetime fixture, which
for now is always the `_setup_persistent_brokerd()` context defined for now is always the `_setup_persistent_brokerd()` context defined
@ -183,8 +148,14 @@ def broker_init(
] ]
for submodname in getattr( for submodname in getattr(
brokermod, brokermod,
'__enable_modules__', '_brokerd_mods',
[], # fallback for (flat, less mature) backends which
# don't yet declare a daemon-kind mod split.
getattr(
brokermod,
'__enable_modules__',
[],
),
): ):
subpath: str = f'{modpath}.{submodname}' subpath: str = f'{modpath}.{submodname}'
enabled.append(subpath) enabled.append(subpath)
@ -212,6 +183,22 @@ async def spawn_brokerd(
f'backend: {brokername!r}' f'backend: {brokername!r}'
) )
# fail fast on (data-only) backends which don't offer
# ANY live order-control eps; the caller should instead
# be using paper-mode (and thus never spawning us)!
from ..data.validate import get_eps
brokerd_eps: dict = get_eps(
get_brokermod(brokername),
'brokerd',
)
if not brokerd_eps:
raise RuntimeError(
f'Backend {brokername!r} offers NO `brokerd` '
f'(live order-control) eps!?\n'
f'It is likely a datad-only provider, use '
f'paper-mode for clearing instead.\n'
)
( (
brokermode, brokermode,
tractor_kwargs, tractor_kwargs,
@ -233,7 +220,11 @@ async def spawn_brokerd(
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}' dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
portal = await Services.actor_n.start_actor( portal = await Services.actor_n.start_actor(
dname, dname,
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'), enable_modules=list(dict.fromkeys(
_brokerd_service_mods
+
tractor_kwargs.pop('enable_modules')
)),
debug_mode=Services.debug_mode, debug_mode=Services.debug_mode,
**tractor_kwargs **tractor_kwargs
) )

View File

@ -52,9 +52,25 @@ __all__ = [
] ]
# `brokerd` modules # per-daemon-kind (sub)mod groups: declares which of our
__enable_modules__: list[str] = [ # submods host the eps run by each daemon-actor kind as
# defined by `piker.data.validate._eps`.
# NOTE: `get_mkt_info` and `open_symbol_search` both live
# in `.feed` for this backend (no `symbols.py`).
_brokerd_mods: list[str] = [
'api', 'api',
'feed',
'broker', 'broker',
] ]
_datad_mods: list[str] = [
'api',
'feed',
]
# tractor RPC enable arg
__enable_modules__: list[str] = list(dict.fromkeys(
_brokerd_mods
+
_datad_mods
))

View File

@ -35,7 +35,7 @@ from piker.log import (
get_logger, get_logger,
) )
from ..service import ( from ..service import (
maybe_spawn_brokerd, maybe_spawn_datad,
maybe_open_pikerd, maybe_open_pikerd,
) )
from ..brokers import ( from ..brokers import (
@ -187,7 +187,7 @@ def brokercheck(config, broker):
''' '''
async def bcheck_main(): async def bcheck_main():
async with maybe_spawn_brokerd(broker) as portal: async with maybe_spawn_datad(broker) as portal:
await portal.run(run_test, broker) await portal.run(run_test, broker)
await portal.cancel_actor() await portal.cancel_actor()
@ -317,7 +317,7 @@ def record(config, rate, name, dhost, filename):
return return
async def main(tries): async def main(tries):
async with maybe_spawn_brokerd( async with maybe_spawn_datad(
tries=tries, loglevel=loglevel tries=tries, loglevel=loglevel
) as portal: ) as portal:
# run app "main" # run app "main"

View File

@ -30,7 +30,7 @@ import trio
from piker.log import get_logger from piker.log import get_logger
from . import get_brokermod from . import get_brokermod
from ..service import maybe_spawn_brokerd from ..service import maybe_spawn_datad
from . import open_cached_client from . import open_cached_client
from ..accounting import MktPair from ..accounting import MktPair
@ -172,7 +172,7 @@ async def symbol_search(
# await tractor.devx._debug.maybe_init_greenback() # await tractor.devx._debug.maybe_init_greenback()
# tractor.pause_from_sync() # tractor.pause_from_sync()
async with maybe_spawn_brokerd( async with maybe_spawn_datad(
mod.name, mod.name,
infect_asyncio=getattr( infect_asyncio=getattr(
mod, mod,

View File

@ -47,13 +47,25 @@ __all__ = [
] ]
# tractor RPC enable arg # per-daemon-kind (sub)mod groups: declares which of our
__enable_modules__: list[str] = [ # submods host the eps run by each daemon-actor kind as
# defined by `piker.data.validate._eps`.
# NOTE: datad-only backend (no `broker.py` yet)!
_brokerd_mods: list[str] = []
_datad_mods: list[str] = [
'api', 'api',
'feed', 'feed',
# 'broker',
] ]
# tractor RPC enable arg
__enable_modules__: list[str] = list(dict.fromkeys(
_brokerd_mods
+
_datad_mods
))
# passed to ``tractor.ActorNursery.start_actor()`` # passed to ``tractor.ActorNursery.start_actor()``
_spawn_kwargs = { _spawn_kwargs = {
'infect_asyncio': True, 'infect_asyncio': True,

View File

@ -37,7 +37,7 @@ from rapidfuzz import process as fuzzy
import numpy as np import numpy as np
from tractor.trionics import ( from tractor.trionics import (
broadcast_receiver, broadcast_receiver,
maybe_open_context maybe_open_context,
collapse_eg, collapse_eg,
) )
from tractor import to_asyncio from tractor import to_asyncio

View File

@ -65,17 +65,18 @@ _brokerd_mods: list[str] = [
] ]
_datad_mods: list[str] = [ _datad_mods: list[str] = [
'api',
'feed', 'feed',
'symbols', 'symbols',
] ]
# tractor RPC enable arg # tractor RPC enable arg
__enable_modules__: list[str] = ( __enable_modules__: list[str] = list(dict.fromkeys(
_brokerd_mods _brokerd_mods
+ +
_datad_mods _datad_mods
) ))
# passed to ``tractor.ActorNursery.start_actor()`` # passed to ``tractor.ActorNursery.start_actor()``
_spawn_kwargs = { _spawn_kwargs = {

View File

@ -1340,6 +1340,21 @@ async def load_aio_clients(
ib = None ib = None
client = None client = None
# XXX: post (brokerd vs. datad)-split BOTH per-broker
# daemons connect to the same API gw/tws endpoint(s); to
# avoid `clientId` collisions (and the long conn-timeout
# retry cycle they cause) we offset the data-daemon's
# default id-range to be disjoint from `brokerd.ib`'s
# (which also retries with `client_id + i` increments).
if client_id == 6116: # the default from above
aname: str = tractor.current_actor().name
if 'datad' in aname:
client_id += 16
# ad-hoc (test/cli) actors get their own range to
# avoid clashing with any live daemon-tree's conns.
elif 'brokerd' not in aname:
client_id += 32
# attempt to get connection info from config; if no .toml entry # attempt to get connection info from config; if no .toml entry
# exists, we try to load from a default localhost connection. # exists, we try to load from a default localhost connection.
localhost = '127.0.0.1' localhost = '127.0.0.1'

View File

@ -66,10 +66,24 @@ __all__ = [
] ]
# tractor RPC enable arg # per-daemon-kind (sub)mod groups: declares which of our
__enable_modules__: list[str] = [ # submods host the eps run by each daemon-actor kind as
# defined by `piker.data.validate._eps`.
_brokerd_mods: list[str] = [
'api', 'api',
'broker', 'broker',
]
_datad_mods: list[str] = [
'api',
'feed', 'feed',
'symbols', 'symbols',
] ]
# tractor RPC enable arg
__enable_modules__: list[str] = list(dict.fromkeys(
_brokerd_mods
+
_datad_mods
))

View File

@ -73,10 +73,12 @@ from piker.log import (
get_logger, get_logger,
) )
from piker.data import open_symcache from piker.data import open_symcache
from . import api from piker.data._web_bs import (
from .feed import (
open_autorecon_ws, open_autorecon_ws,
NoBsWs, NoBsWs,
)
from . import api
from .feed import (
stream_messages, stream_messages,
) )
from .ledger import ( from .ledger import (

View File

@ -335,9 +335,14 @@ class TradesRelay(Struct):
@acm @acm
async def open_brokerd_dialog( async def open_brokerd_dialog(
brokermod: ModuleType, brokermod: ModuleType,
portal: tractor.Portal,
exec_mode: str, exec_mode: str,
fqme: str|None = None, fqme: str|None = None,
# XXX: explicit (already spawned) trading-actor override,
# currently only used by the `piker ledger` cli which
# boots its own ad-hoc `brokerd`-like actor; normally we
# (lazily) spawn/find the `brokerd.<broker>` daemon here.
portal: tractor.Portal|None = None,
loglevel: str|None = None, loglevel: str|None = None,
) -> tuple[ ) -> tuple[
@ -351,6 +356,10 @@ async def open_brokerd_dialog(
paper engine instance depending on live trading support for the paper engine instance depending on live trading support for the
broker backend, configuration, or client code usage. broker backend, configuration, or client code usage.
NOTE: this is now the ONE place where a (live, credentialed)
`brokerd.<broker>` daemon-actor gets (lazily) booted; pure
data/paper sessions should never spawn one!
''' '''
get_console_log( get_console_log(
level=loglevel, level=loglevel,
@ -416,16 +425,29 @@ async def open_brokerd_dialog(
) )
exec_mode: str = 'paper' exec_mode: str = 'paper'
if ( @acm
trades_endpoint is not None async def acquire_live_portal():
or '''
exec_mode != 'paper' Deliver a portal to the (live, credentialed) trading
): actor hosting the backend's `open_trade_dialog()` ep:
# open live brokerd trades endpoint either the caller-provided override or the (maybe
open_trades_endpoint = portal.open_context( lazily spawned) `brokerd.<broker>` service daemon.
trades_endpoint,
loglevel=loglevel, '''
if portal is not None:
yield portal
return
# XXX: the ONE (normal) place a `brokerd.<broker>`
# daemon-actor gets booted in the runtime B)
from piker.brokers._daemon import (
maybe_spawn_brokerd,
) )
async with maybe_spawn_brokerd(
brokermod.name,
loglevel=loglevel,
) as live_portal:
yield live_portal
@acm @acm
async def maybe_open_paper_ep(): async def maybe_open_paper_ep():
@ -437,7 +459,14 @@ async def open_brokerd_dialog(
return return
# open trades-dialog endpoint with backend broker # open trades-dialog endpoint with backend broker
async with open_trades_endpoint as msg: async with (
acquire_live_portal() as live_portal,
live_portal.open_context(
trades_endpoint,
loglevel=loglevel,
) as msg,
):
ctx, first = msg ctx, first = msg
# runtime indication that the backend can't support live # runtime indication that the backend can't support live
@ -581,7 +610,6 @@ class Router(Struct):
async def maybe_open_brokerd_dialog( async def maybe_open_brokerd_dialog(
self, self,
brokermod: ModuleType, brokermod: ModuleType,
portal: tractor.Portal,
exec_mode: str, exec_mode: str,
fqme: str, fqme: str,
loglevel: str, loglevel: str,
@ -606,7 +634,6 @@ class Router(Struct):
async with open_brokerd_dialog( async with open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
portal=portal,
exec_mode=exec_mode, exec_mode=exec_mode,
fqme=fqme, fqme=fqme,
loglevel=loglevel, loglevel=loglevel,
@ -668,7 +695,6 @@ class Router(Struct):
brokername, _, _, _ = unpack_fqme(fqme) brokername, _, _, _ = unpack_fqme(fqme)
brokermod = feed.mods[brokername] brokermod = feed.mods[brokername]
broker = brokermod.name broker = brokermod.name
portal = feed.portals[brokermod]
# XXX: this should be initial price quote from target provider # XXX: this should be initial price quote from target provider
flume = feed.flumes[fqme] flume = feed.flumes[fqme]
@ -682,7 +708,6 @@ class Router(Struct):
async with self.maybe_open_brokerd_dialog( async with self.maybe_open_brokerd_dialog(
brokermod=brokermod, brokermod=brokermod,
portal=portal,
exec_mode=exec_mode, exec_mode=exec_mode,
fqme=fqme, fqme=fqme,
loglevel=loglevel, loglevel=loglevel,

View File

@ -250,7 +250,6 @@ def cli(
# TODO: load endpoints from `conf::[network].pikerd` # TODO: load endpoints from `conf::[network].pikerd`
# - pikerd vs. regd, separate registry daemon? # - pikerd vs. regd, separate registry daemon?
# - expose datad vs. brokerd?
# - bind emsd with certain perms on public iface? # - bind emsd with certain perms on public iface?
regaddrs: list[tuple[str, int]] = regaddr or [( regaddrs: list[tuple[str, int]] = regaddr or [(
_default_registry_host, _default_registry_host,

View File

@ -188,6 +188,48 @@ def _override_config_dir(
_config_dir = path _config_dir = path
def _maybe_use_test_dir() -> None:
'''
When running under the `pytest` harness, override
the config dir to the per-test-tmp dir "passed down"
the actor tree via `tractor`'s runtime-vars
inheritance mechanism.
See the `tractor_runtime_overrides` usage in our
`tests.conftest._open_test_pikerd()` as well as
`.service._actor_runtime.open_piker_runtime()` for
the root-actor's pre-loading of the var state.
NOTE: this must be checked lazily at config-path
access time (NOT import time) since sub-actors only
receive runtime-vars once their `tractor` runtime
has fully booted.
'''
global _config_dir
import tractor
actor = tractor.current_actor(
err_on_no_runtime=False,
)
if actor is None:
return
rvs: dict = tractor.runtime._state._runtime_vars
pvars: dict|None = rvs.get('piker_vars')
if (
pvars
and
(testdir := pvars.get('piker_test_dir'))
):
testdirpath = Path(testdir)
assert testdirpath.exists(), (
f'piker test harness might be borked!?\n'
f'testdirpath: {testdirpath!r}\n'
)
if _config_dir != testdirpath:
_override_config_dir(testdirpath)
def _conf_fn_w_ext( def _conf_fn_w_ext(
name: str, name: str,
) -> str: ) -> str:
@ -201,6 +243,7 @@ def get_conf_dir() -> Path:
on the local filesystem. on the local filesystem.
''' '''
_maybe_use_test_dir()
return _config_dir return _config_dir
@ -226,7 +269,7 @@ def get_conf_path(
assert str(conf_name) in _conf_names assert str(conf_name) in _conf_names
fn = _conf_fn_w_ext(conf_name) fn = _conf_fn_w_ext(conf_name)
return _config_dir / Path(fn) return get_conf_dir() / Path(fn)
def repodir() -> Path: def repodir() -> Path:
@ -271,8 +314,9 @@ def load(
''' '''
# create the $HOME/.config/piker dir if dne # create the $HOME/.config/piker dir if dne
if not _config_dir.is_dir(): conf_dir: Path = get_conf_dir()
_config_dir.mkdir( if not conf_dir.is_dir():
conf_dir.mkdir(
parents=True, parents=True,
exist_ok=True, exist_ok=True,
) )

View File

@ -0,0 +1,300 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Data-daemon-actor "endpoint-hooks": the service task entry
points for `datad.<brokername>`, the per-provider real-time
and historical market-data feed daemon.
The (data vs. broker)d-split mirrors the ep-groupings in
`piker.data.validate._eps`: this daemon hosts all `'datad'`
eps (live quotes, history loading, symbology search) while
its `brokerd.<brokername>` sibling hosts only the live
order-control (and thus credentialed) `'brokerd'` eps.
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from types import ModuleType
from typing import (
TYPE_CHECKING,
AsyncContextManager,
)
import tractor
import trio
from piker.log import (
get_logger,
get_console_log,
)
from . import _util
if TYPE_CHECKING:
from .feed import _FeedsBus
log = get_logger(name=__name__)
# `datad`-actor-always-enabled mods: the data-side successor
# to the old `piker.brokers._daemon._data_mods` set.
# NOTE: keeping this list as small as possible is part of
# our caps-sec model and should be treated with utmost care!
_datad_service_mods: list[str] = [
'piker.brokers.core',
'piker.brokers.data',
'piker.data',
'piker.data.feed',
'piker.data._sampling',
'piker.data._daemon',
]
@tractor.context
async def _setup_persistent_datad(
ctx: tractor.Context,
brokername: str,
loglevel: str|None = None,
) -> None:
'''
Allocate an actor-wide service nursery in this
`datad.<brokername>` actor such that data-feed tasks
(shm writer-samplers, history backfillers, symbology
loaders) can be run in the background persistently by
the provider backend as needed.
'''
# NOTE: we only need to setup logging once (and only)
# here since all hosted daemon tasks will reference
# this same log instance's (actor local) state and thus
# don't require any further (level) configuration on
# their own B)
actor: tractor.Actor = tractor.current_actor()
tll: str = actor.loglevel
log = get_console_log(
level=loglevel or tll,
name=f'{_util.subsys}.{brokername}',
with_tractor_log=bool(tll),
)
assert log.name == _util.subsys
from piker.data import feed
assert not feed._bus
# allocate a nursery to the bus for spawning background
# tasks which service client IPC requests, normally
# `tractor.Context` connections to explicitly required
# `datad` endpoints such as:
# - `stream_quotes()`,
# - `manage_history()`,
# - `allocate_persistent_feed()`,
# - `open_symbol_search()`
# NOTE: see ep invocation details inside `.data.feed`.
async with (
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus(
brokername,
service_nursery,
)
assert bus is feed._bus
# unblock caller
await ctx.started()
# we pin this task to keep the feeds manager active
# until the parent actor decides to tear it down
await trio.sleep_forever()
def datad_init(
brokername: str,
loglevel: str|None = None,
**start_actor_kwargs,
) -> tuple[
ModuleType,
dict,
AsyncContextManager,
]:
'''
Given an input broker name, load all named arguments
which can be passed for daemon endpoint + context spawn
as required in every `datad.<brokername>` (actor)
service.
This includes:
- load the appropriate <brokername>.py pkg module,
- reads any declared `_datad_mods: list[str]` (falling
back to the full `__enable_modules__` set for
not-yet-split backends) which will be passed to
`tractor.ActorNursery.start_actor(enable_modules=)`
at actor start time,
- deliver a reference to the daemon lifetime fixture,
which for now is always the
`_setup_persistent_datad()` context defined above.
'''
from piker.brokers import get_brokermod
from .validate import get_eps
brokermod = get_brokermod(brokername)
modpath: str = brokermod.__name__
# warn (but don't bail) when the backend is missing
# some/all of the `datad` ep contract defined by
# `piker.data.validate._eps`.
datad_eps: dict = get_eps(brokermod, 'datad')
if not datad_eps:
log.warning(
f'Backend {brokername!r} offers NO `datad` '
f'(data-feed) eps!?\n'
f'Most feed/chart functionality will be '
f'broken for this provider..\n'
)
start_actor_kwargs['name'] = f'datad.{brokername}'
# XXX CRITICAL: include any backend-declared spawn
# kwargs, eg. `{'infect_asyncio': True}` required by
# `ib`'s embedded `asyncio`-mode `ib_async` usage!
start_actor_kwargs.update(
getattr(
brokermod,
'_spawn_kwargs',
{},
)
)
# lookup actor-enabled modules declared by the backend
# offering the `datad` endpoint(s).
enabled: list[str]
enabled = start_actor_kwargs['enable_modules'] = [
__name__, # so that eps from THIS mod can be invoked
modpath,
]
for submodname in getattr(
brokermod,
'_datad_mods',
# fallback for (flat, less mature) backends which
# don't yet declare a daemon-kind mod split.
getattr(
brokermod,
'__enable_modules__',
[],
),
):
subpath: str = f'{modpath}.{submodname}'
enabled.append(subpath)
return (
brokermod,
start_actor_kwargs, # to `ActorNursery.start_actor()`
# XXX see impl above; contains all (actor global)
# setup/teardown expected in all `datad` actor
# instances.
_setup_persistent_datad,
)
async def spawn_datad(
brokername: str,
loglevel: str|None = None,
**tractor_kwargs,
) -> bool:
log.info(
f'Spawning data-daemon,\n'
f'backend: {brokername!r}'
)
(
brokermod,
tractor_kwargs,
daemon_fixture_ep,
) = datad_init(
brokername,
loglevel,
**tractor_kwargs,
)
# ask `pikerd` to spawn a new sub-actor and manage it
# under its actor nursery
from piker.service import Services
dname: str = tractor_kwargs.pop('name') # f'datad.{brokername}'
enable_mods: list[str] = list(dict.fromkeys(
_datad_service_mods
+
tractor_kwargs.pop('enable_modules')
))
portal = await Services.actor_n.start_actor(
dname,
enable_modules=enable_mods,
debug_mode=Services.debug_mode,
**tractor_kwargs,
)
# NOTE: the service mngr expects an already spawned
# actor + its portal ref in order to do non-blocking
# setup of the `datad` service nursery.
await Services.start_service_task(
dname,
portal,
# signature of target root-task endpoint
daemon_fixture_ep,
brokername=brokername,
loglevel=loglevel,
)
return True
@acm
async def maybe_spawn_datad(
brokername: str,
loglevel: str|None = None,
**pikerd_kwargs,
) -> tractor.Portal:
'''
Helper to spawn a datad service *from* a client who
wishes to use the sub-actor-daemon but is fine with
re-using any existing and contactable `datad`.
Mas o menos, acts as a cached-actor-getter factory.
'''
from piker.service import maybe_spawn_daemon
async with maybe_spawn_daemon(
service_name=f'datad.{brokername}',
service_task_target=spawn_datad,
spawn_args={
'brokername': brokername,
},
loglevel=loglevel,
**pikerd_kwargs,
) as portal:
yield portal

View File

@ -17,7 +17,7 @@
''' '''
Data feed apis and infra. Data feed apis and infra.
This module is enabled for ``brokerd`` daemons and includes mostly This module is enabled for ``datad`` daemons and includes mostly
endpoints and middleware to support our real-time, provider agnostic, endpoints and middleware to support our real-time, provider agnostic,
live market quotes layer. Historical data loading and processing is also live market quotes layer. Historical data loading and processing is also
initiated in parts of the feed bus startup but business logic and initiated in parts of the feed bus startup but business logic and
@ -54,8 +54,11 @@ from piker.accounting import (
) )
from piker.types import Struct from piker.types import Struct
from piker.brokers import get_brokermod from piker.brokers import get_brokermod
from piker.service import ( # NOTE: must be a "relative-direct" import (NOT via
maybe_spawn_brokerd, # `piker.service`) to avoid a partial-init cycle when this
# mod is loaded as part of `piker.service.__init__`.
from ._daemon import (
maybe_spawn_datad,
) )
from piker.calc import humanize from piker.calc import humanize
from ._util import ( from ._util import (
@ -110,7 +113,7 @@ class _FeedsBus(Struct):
''' '''
Data feeds broadcaster and persistence management. Data feeds broadcaster and persistence management.
This is a brokerd side api used to manager persistent real-time This is a datad side api used to manager persistent real-time
streams that can be allocated and left alive indefinitely. A bus is streams that can be allocated and left alive indefinitely. A bus is
associated one-to-one with a particular broker backend where the associated one-to-one with a particular broker backend where the
"bus" refers so a multi-symbol bus where quotes are interleaved in "bus" refers so a multi-symbol bus where quotes are interleaved in
@ -249,7 +252,7 @@ async def allocate_persistent_feed(
''' '''
Create and maintain a "feed bus" which allocates tasks for real-time Create and maintain a "feed bus" which allocates tasks for real-time
streaming and optional historical data storage per broker/data provider streaming and optional historical data storage per broker/data provider
backend; this normally task runs *in* a `brokerd` actor. backend; this normally task runs *in* a `datad` actor.
If none exists, this allocates a ``_FeedsBus`` which manages the If none exists, this allocates a ``_FeedsBus`` which manages the
lifetimes of streaming tasks created for each requested symbol. lifetimes of streaming tasks created for each requested symbol.
@ -318,8 +321,8 @@ async def allocate_persistent_feed(
# at max capacity. # at max capacity.
# - the same ideas ^ but when a local core is maxxed out (like how # - the same ideas ^ but when a local core is maxxed out (like how
# binance does often with hft XD # binance does often with hft XD
# - if a brokerd is non-local then we can't just allocate a mem # - if a datad is non-local then we can't just allocate a mem
# channel here and have the brokerd write it, we instead need # channel here and have the datad write it, we instead need
# a small streaming machine around the remote feed which can then # a small streaming machine around the remote feed which can then
# do the normal work of sampling and writing shm buffers # do the normal work of sampling and writing shm buffers
# (depending on if we want sampling done on the far end or not?) # (depending on if we want sampling done on the far end or not?)
@ -499,7 +502,7 @@ async def open_feed_bus(
# (after we also group them in a nice `/dev/shm/piker/` subdir). # (after we also group them in a nice `/dev/shm/piker/` subdir).
# ensure we are who we think we are # ensure we are who we think we are
servicename = tractor.current_actor().name servicename = tractor.current_actor().name
assert 'brokerd' in servicename assert 'datad' in servicename
assert brokername in servicename assert brokername in servicename
bus: _FeedsBus = get_feed_bus(brokername) bus: _FeedsBus = get_feed_bus(brokername)
@ -509,12 +512,12 @@ async def open_feed_bus(
for symbol in symbols: for symbol in symbols:
# if no cached feed for this symbol has been created for this # if no cached feed for this symbol has been created for this
# brokerd yet, start persistent stream and shm writer task in # datad yet, start persistent stream and shm writer task in
# service nursery # service nursery
flume = bus.feeds.get(symbol) flume = bus.feeds.get(symbol)
if flume is None: if flume is None:
# allocate a new actor-local stream bus which # allocate a new actor-local stream bus which
# will persist for this `brokerd`'s service lifetime. # will persist for this `datad`'s service lifetime.
async with bus.task_lock: async with bus.task_lock:
await bus.nursery.start( await bus.nursery.start(
partial( partial(
@ -721,7 +724,7 @@ class Feed(Struct):
mods = {name: self.mods[name] for name in brokers} mods = {name: self.mods[name] for name in brokers}
if len(mods) == 1: if len(mods) == 1:
# just pass the brokerd stream directly if only one provider # just pass the datad stream directly if only one provider
# was detected. # was detected.
stream = self.streams[list(brokers)[0]] stream = self.streams[list(brokers)[0]]
async with stream.subscribe() as bstream: async with stream.subscribe() as bstream:
@ -763,7 +766,7 @@ class Feed(Struct):
@acm @acm
async def install_brokerd_search( async def install_datad_search(
portal: tractor.Portal, portal: tractor.Portal,
brokermod: ModuleType, brokermod: ModuleType,
@ -812,7 +815,7 @@ async def maybe_open_feed(
ReceiveChannel[dict[str, Any]], ReceiveChannel[dict[str, Any]],
): ):
''' '''
Maybe open a data to a ``brokerd`` daemon only if there is no Maybe open a data feed to a ``datad`` daemon only if there is no
local one for the broker-symbol pair, if one is cached use it wrapped local one for the broker-symbol pair, if one is cached use it wrapped
in a tractor broadcast receiver. in a tractor broadcast receiver.
@ -885,14 +888,14 @@ async def open_feed(
providers.setdefault(mod, []).append(bs_fqme) providers.setdefault(mod, []).append(bs_fqme)
feed.mods[mod.name] = mod feed.mods[mod.name] = mod
# one actor per brokerd for now # one actor per datad for now
brokerd_ctxs = [] datad_ctxs = []
for brokermod, bfqmes in providers.items(): for brokermod, bfqmes in providers.items():
# if no `brokerd` for this backend exists yet we spawn # if no `datad` for this backend exists yet we spawn
# a daemon actor for it. # a daemon actor for it.
brokerd_ctxs.append( datad_ctxs.append(
maybe_spawn_brokerd( maybe_spawn_datad(
brokermod.name, brokermod.name,
loglevel=loglevel loglevel=loglevel
) )
@ -900,7 +903,7 @@ async def open_feed(
portals: tuple[tractor.Portal] portals: tuple[tractor.Portal]
async with trionics.gather_contexts( async with trionics.gather_contexts(
brokerd_ctxs, datad_ctxs,
) as portals: ) as portals:
bus_ctxs: list[AsyncContextManager] = [] bus_ctxs: list[AsyncContextManager] = []
@ -937,9 +940,9 @@ async def open_feed(
tick_throttle=tick_throttle, tick_throttle=tick_throttle,
# XXX: super important to avoid # XXX: super important to avoid
# the brokerd from some other # the datad from some other
# backend overruning the task here # backend overruning the task here
# bc some other brokerd took longer # bc some other datad took longer
# to startup before we hit the `.open_stream()` # to startup before we hit the `.open_stream()`
# loop below XD .. really we should try to do each # loop below XD .. really we should try to do each
# of these stream open sequences sequentially per # of these stream open sequences sequentially per
@ -1008,7 +1011,7 @@ async def open_feed(
assert stream assert stream
feed.streams[brokermod.name] = stream feed.streams[brokermod.name] = stream
# apply `brokerd`-common stream to each flume # apply `datad`-common stream to each flume
# tracking a live market feed from that provider. # tracking a live market feed from that provider.
for fqme, flume in feed.flumes.items(): for fqme, flume in feed.flumes.items():
if brokermod.name == flume.mkt.broker: if brokermod.name == flume.mkt.broker:

View File

@ -91,6 +91,24 @@ _eps: dict[str, list[str]] = {
} }
def get_eps(
mod: ModuleType,
kind: str, # 'middleware' | 'datad' | 'brokerd'
) -> dict[str, Callable]:
'''
Return the daemon-kind's ep funcs defined by the backend
`mod`, keyed by ep name; any eps from `_eps[kind]` not
implemented by the backend are excluded.
'''
return {
name: ep
for name in _eps[kind]
if (ep := getattr(mod, name, None))
}
def validate_backend( def validate_backend(
mod: ModuleType, mod: ModuleType,
syms: list[str], syms: list[str],

View File

@ -56,3 +56,7 @@ from ..brokers._daemon import (
spawn_brokerd as spawn_brokerd, spawn_brokerd as spawn_brokerd,
maybe_spawn_brokerd as maybe_spawn_brokerd, maybe_spawn_brokerd as maybe_spawn_brokerd,
) )
from ..data._daemon import (
spawn_datad as spawn_datad,
maybe_spawn_datad as maybe_spawn_datad,
)

View File

@ -157,6 +157,7 @@ _root_modules: list[str] = [
__name__, __name__,
'piker.service._daemon', 'piker.service._daemon',
'piker.brokers._daemon', 'piker.brokers._daemon',
'piker.data._daemon',
'piker.clearing._ems', 'piker.clearing._ems',
'piker.clearing._client', 'piker.clearing._client',
@ -214,7 +215,13 @@ async def open_pikerd(
trio.open_nursery() as service_tn, trio.open_nursery() as service_tn,
): ):
for addr in reg_addrs: for addr in reg_addrs:
uaddr: tuple = addr.unwrap() # normalize to a wrapped `tractor` addr-type;
# entries may be raw `tuple`s when passed in
# from (test) client code.
wladdr = tractor.discovery._addr.wrap_address(
addr,
)
uaddr: tuple = wladdr.unwrap()
if ( if (
uaddr not in root_actor.accept_addrs uaddr not in root_actor.accept_addrs
): ):

View File

@ -225,10 +225,13 @@ async def check_for_service(
''' '''
async with ( async with (
open_registry(ensure_exists=False) as reg_addr, open_registry(
addrs=Registry.addrs,
ensure_exists=False,
) as reg_addrs,
tractor.query_actor( tractor.query_actor(
service_name, service_name,
arbiter_sockaddr=reg_addr, regaddr=reg_addrs[0],
) as sockaddr, ) as (sockaddr, _),
): ):
return sockaddr return sockaddr

View File

@ -32,12 +32,12 @@ from . import _event
from . import _search from . import _search
from ..accounting import unpack_fqme from ..accounting import unpack_fqme
from ..data._symcache import open_symcache from ..data._symcache import open_symcache
from ..data.feed import install_brokerd_search from ..data.feed import install_datad_search
from ..log import ( from ..log import (
get_logger, get_logger,
get_console_log, get_console_log,
) )
from ..service import maybe_spawn_brokerd from ..service import maybe_spawn_datad
from ._exec import run_qtractor from ._exec import run_qtractor
log = get_logger(__name__) log = get_logger(__name__)
@ -50,16 +50,16 @@ async def load_provider_search(
) -> None: ) -> None:
name = brokermod.name name = brokermod.name
log.info(f'loading brokerd for {name}..') log.info(f'loading datad for {name}..')
async with ( async with (
maybe_spawn_brokerd( maybe_spawn_datad(
name, name,
loglevel=loglevel loglevel=loglevel
) as portal, ) as portal,
install_brokerd_search( install_datad_search(
portal, portal,
brokermod, brokermod,
), ),

View File

@ -27,7 +27,7 @@ from ..cli import (
load_trans_eps, load_trans_eps,
) )
from .. import watchlists as wl from .. import watchlists as wl
from ..service import maybe_spawn_brokerd from ..service import maybe_spawn_datad
_config_dir = click.get_app_dir('piker') _config_dir = click.get_app_dir('piker')
@ -69,7 +69,7 @@ def monitor(config, rate, name, dhost, test, tl):
from .kivy.monitor import _async_main from .kivy.monitor import _async_main
async def main(): async def main():
async with maybe_spawn_brokerd( async with maybe_spawn_datad(
brokername=brokermod.name, brokername=brokermod.name,
loglevel=loglevel loglevel=loglevel
) as portal: ) as portal:
@ -118,7 +118,7 @@ def optschain(
from .kivy.option_chain import _async_main from .kivy.option_chain import _async_main
async def main(): async def main():
async with maybe_spawn_brokerd( async with maybe_spawn_datad(
loglevel=loglevel loglevel=loglevel
): ):
# run app "main" # run app "main"

View File

@ -495,7 +495,7 @@ async def _async_main(
async with trio.open_nursery() as nursery: async with trio.open_nursery() as nursery:
# get a portal to the data feed daemon # get a portal to the data feed daemon
async with tractor.wait_for_actor('brokerd') as portal: async with tractor.wait_for_actor('datad') as portal:
# set up a pager view for large ticker lists # set up a pager view for large ticker lists
chain = await new_chain_ui( chain = await new_chain_ui(

View File

@ -151,7 +151,7 @@ def load_and_check_pos(
# is the same the fqme. # is the same the fqme.
pp: Position = table.pps[ppmsg.symbol] pp: Position = table.pps[ppmsg.symbol]
assert ppmsg.size == pp.size assert ppmsg.size == pp.cumsize
assert ppmsg.avg_price == pp.ppu assert ppmsg.avg_price == pp.ppu
yield pp yield pp
@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
# NOTE: emsd should error on the actor's enabled modules # NOTE: emsd should error on the actor's enabled modules
# import phase, when looking for a backend named `doggy`. # import phase, when looking for a backend named `doggy`.
except tractor.RemoteActorError as re: except tractor.RemoteActorError as re:
assert re.type is ModuleNotFoundError assert re.boxed_type is ModuleNotFoundError
run_and_tollerate_cancels(load_bad_fqme) run_and_tollerate_cancels(load_bad_fqme)

View File

@ -53,11 +53,12 @@ def test_runtime_boot(
tractor.wait_for_actor( tractor.wait_for_actor(
'pikerd', 'pikerd',
arbiter_sockaddr=daemon_addr, registry_addr=daemon_addr,
) as portal, ) as portal,
): ):
assert pikerd_portal.channel.raddr == daemon_addr uw_raddr: tuple = pikerd_portal.chan.raddr.unwrap()
assert pikerd_portal.channel.raddr == portal.channel.raddr assert uw_raddr == daemon_addr
assert uw_raddr == portal.chan.raddr.unwrap()
# no service tasks should be started # no service tasks should be started
assert not services.service_tasks assert not services.service_tasks
@ -65,6 +66,41 @@ def test_runtime_boot(
trio.run(main) trio.run(main)
def test_datad_spawn(
open_test_pikerd: AsyncContextManager,
loglevel: str,
) -> None:
'''
Verify the new (data-feed-only) `datad.<broker>` daemon
can be spawned/registered as a `pikerd` sub-service via
the `maybe_spawn_datad()` factory.
'''
from piker.service import maybe_spawn_datad
backend: str = 'kraken'
datad_name: str = f'datad.{backend}'
async def main():
async with (
open_test_pikerd() as (_, _, _, services),
maybe_spawn_datad(
backend,
loglevel=loglevel,
) as portal,
):
assert portal
async with ensure_service(datad_name):
assert (
datad_name in services.service_tasks
)
trio.run(main)
def test_ensure_datafeed_actors( def test_ensure_datafeed_actors(
open_test_pikerd: AsyncContextManager, open_test_pikerd: AsyncContextManager,
loglevel: str, loglevel: str,
@ -72,14 +108,14 @@ def test_ensure_datafeed_actors(
) -> None: ) -> None:
''' '''
Verify that booting a data feed starts a `brokerd` Verify that booting a data feed starts a `datad`
actor and a singleton global `samplerd` and opening actor and a singleton global `samplerd` and opening
an order mode in paper opens the `paperboi` service. an order mode in paper opens the `paperboi` service.
''' '''
actor_name: str = 'brokerd' actor_name: str = 'datad'
backend: str = 'kraken' backend: str = 'kraken'
brokerd_name: str = f'{actor_name}.{backend}' datad_name: str = f'{actor_name}.{backend}'
async def main(): async def main():
async with ( async with (
@ -94,7 +130,7 @@ def test_ensure_datafeed_actors(
await feed.pause() await feed.pause()
async with ( async with (
ensure_service(brokerd_name), ensure_service(datad_name),
ensure_service('samplerd'), ensure_service('samplerd'),
): ):
await trio.sleep(0.1) await trio.sleep(0.1)
@ -108,7 +144,7 @@ async def ensure_service(
sockaddr: tuple[str, int] | None = None, sockaddr: tuple[str, int] | None = None,
) -> None: ) -> None:
async with find_service(name) as portal: async with find_service(name) as portal:
remote_sockaddr = portal.channel.raddr remote_sockaddr: tuple = portal.chan.raddr.unwrap()
print(f'FOUND `{name}` @ {remote_sockaddr}') print(f'FOUND `{name}` @ {remote_sockaddr}')
if sockaddr: if sockaddr:
@ -131,40 +167,49 @@ def run_test_w_cancel_method(
"was remotely cancelled by remote actor (\'pikerd\'") "was remotely cancelled by remote actor (\'pikerd\'")
if cancel_method == 'sigint': if cancel_method == 'sigint':
with pytest.raises( # XXX: with modern `tractor` the (single-exc)
# group is collapsed so a bare KBI normally
# propagates; tolerate either form.
with pytest.raises((
KeyboardInterrupt,
BaseExceptionGroup, BaseExceptionGroup,
) as exc_info: )) as exc_info:
trio.run(main) trio.run(main)
multi = exc_info.value err = exc_info.value
match err:
case BaseExceptionGroup():
for suberr in err.exceptions:
match suberr:
# ensure we receive a remote
# cancellation error caused by the
# pikerd root actor.
case tractor.ContextCancelled():
assert (
cancelled_msg
in
suberr.args[0]
)
for suberr in multi.exceptions: case KeyboardInterrupt():
match suberr: pass
# ensure we receive a remote cancellation error caused
# by the pikerd root actor since we used the
# `.cancel_service()` API above B)
case tractor.ContextCancelled():
assert cancelled_msg in suberr.args[0]
case KeyboardInterrupt(): case _:
pass pytest.fail(
f'Unexpected error {suberr}'
)
case _: case KeyboardInterrupt():
pytest.fail(f'Unexpected error {suberr}') pass
elif cancel_method == 'services': elif cancel_method == 'services':
# XXX: cancelling our own sub-service via
# XXX NOTE: oddly, when you pass --pdb to pytest, i think since # `Services.cancel_service()` is a *self*
# we also use that to enable the underlying tractor debug mode, # requested cancel: modern `tractor` absorbs the
# it causes this to not raise for some reason? So if you see # resulting `ContextCancelled` (canceller is our
# that while changing this test.. it's prolly that. # own actor) so the runtime tears down gracefully
# with NO error raised to the opener.
with pytest.raises( trio.run(main)
tractor.ContextCancelled
) as exc_info:
trio.run(main)
assert cancelled_msg in exc_info.value.args[0]
else: else:
pytest.fail(f'Test is broken due to {cancel_method}') pytest.fail(f'Test is broken due to {cancel_method}')
@ -182,9 +227,9 @@ def test_ensure_ems_in_paper_actors(
) -> None: ) -> None:
actor_name: str = 'brokerd'
backend: str = 'kraken' backend: str = 'kraken'
brokerd_name: str = f'{actor_name}.{backend}' datad_name: str = f'datad.{backend}'
brokerd_name: str = f'brokerd.{backend}'
async def main(): async def main():
@ -197,7 +242,9 @@ def test_ensure_ems_in_paper_actors(
# ensure we timeout after is startup is too slow. # ensure we timeout after is startup is too slow.
# TODO: something like this should be our start point for # TODO: something like this should be our start point for
# benchmarking end-to-end startup B) # benchmarking end-to-end startup B)
with trio.fail_after(9): # NOTE: includes a live (kraken) symbology fetch so
# the budget needs some headroom for net latency..
with trio.fail_after(19):
async with ( async with (
open_test_pikerd() as (_, _, _, services), open_test_pikerd() as (_, _, _, services),
@ -226,15 +273,25 @@ def test_ensure_ems_in_paper_actors(
async with ( async with (
ensure_service('emsd'), ensure_service('emsd'),
ensure_service(brokerd_name), ensure_service(datad_name),
ensure_service(f'paperboi.{backend}'), ensure_service(f'paperboi.{backend}'),
): ):
for name in pikerd_subservices: for name in pikerd_subservices:
assert name in services.service_tasks assert name in services.service_tasks
# brokerd.kraken actor should have been started # datad.kraken actor should have been
# implicitly by the ems. # started implicitly by the feed layer.
assert brokerd_name in services.service_tasks assert datad_name in services.service_tasks
# XXX: paper-mode sessions should NEVER
# boot a (live, credentialed) `brokerd`;
# only emsd's `open_brokerd_dialog()`
# live-ep path is allowed to spawn it!
assert (
brokerd_name
not in
services.service_tasks
)
print('ALL SERVICES STARTED, cancelling runtime with:\n' print('ALL SERVICES STARTED, cancelling runtime with:\n'
f'-> {cancel_method}') f'-> {cancel_method}')