Compare commits
7 Commits
14e6a319ed
...
4dc50e1d1b
| Author | SHA1 | Date |
|---|---|---|
|
|
4dc50e1d1b | |
|
|
d7f1d70b61 | |
|
|
3548893337 | |
|
|
7de661c03e | |
|
|
66957ffdb0 | |
|
|
fa8d413c84 | |
|
|
fa98290808 |
|
|
@ -324,10 +324,13 @@ def load_ledger(
|
|||
ldir: Path = (
|
||||
dirpath
|
||||
or
|
||||
config._config_dir / 'accounting' / 'ledgers'
|
||||
config.get_conf_dir() / 'accounting' / 'ledgers'
|
||||
)
|
||||
if not ldir.is_dir():
|
||||
ldir.mkdir()
|
||||
ldir.mkdir(
|
||||
parents=True,
|
||||
exist_ok=True,
|
||||
)
|
||||
|
||||
fname = f'trades_{brokername}_{acctid}.toml'
|
||||
fpath: Path = ldir / fname
|
||||
|
|
|
|||
|
|
@ -785,9 +785,16 @@ def load_account(
|
|||
legacy_fn: str = f'pps.{brokername}.{acctid}.toml'
|
||||
fn: str = f'account.{brokername}.{acctid}.toml'
|
||||
|
||||
dirpath: Path = dirpath or (config._config_dir / 'accounting')
|
||||
dirpath: Path = (
|
||||
dirpath
|
||||
or
|
||||
(config.get_conf_dir() / 'accounting')
|
||||
)
|
||||
if not dirpath.is_dir():
|
||||
dirpath.mkdir()
|
||||
dirpath.mkdir(
|
||||
parents=True,
|
||||
exist_ok=True,
|
||||
)
|
||||
|
||||
conf, path = config.load(
|
||||
path=dirpath / fn,
|
||||
|
|
|
|||
|
|
@ -143,12 +143,15 @@ def sync(
|
|||
# (what the EMS normally does internall) B)
|
||||
open_brokerd_dialog(
|
||||
brokermod,
|
||||
portal,
|
||||
exec_mode=(
|
||||
'paper'
|
||||
if account == 'paper'
|
||||
else 'live'
|
||||
),
|
||||
# use our own ad-hoc-spawned actor,
|
||||
# do NOT (spawn and) use the
|
||||
# `brokerd.<broker>` service daemon!
|
||||
portal=portal,
|
||||
loglevel=loglevel,
|
||||
) as (
|
||||
brokerd_stream,
|
||||
|
|
|
|||
|
|
@ -25,10 +25,8 @@ from contextlib import (
|
|||
)
|
||||
from types import ModuleType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AsyncContextManager,
|
||||
)
|
||||
import exceptiongroup as eg
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
|
@ -40,27 +38,20 @@ from piker.log import (
|
|||
from . import _util
|
||||
from . import get_brokermod
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from ..data import _FeedsBus
|
||||
|
||||
log = get_logger(name=__name__)
|
||||
|
||||
# `brokerd` enabled modules
|
||||
# TODO: move this def to the `.data` subpkg..
|
||||
# `brokerd`-actor-always-enabled mods.
|
||||
# NOTE: keeping this list as small as possible is part of our caps-sec
|
||||
# model and should be treated with utmost care!
|
||||
_data_mods: str = [
|
||||
'piker.brokers.core',
|
||||
'piker.brokers.data',
|
||||
# model and should be treated with utmost care! In particular NO
|
||||
# `piker.data.*` feed mods should be enabled in this (live,
|
||||
# credentialed) trading actor; all data-feed serving is the
|
||||
# domain of the `datad.<broker>` sibling daemon, see
|
||||
# `piker.data._daemon._datad_service_mods`.
|
||||
_brokerd_service_mods: list[str] = [
|
||||
'piker.brokers._daemon',
|
||||
'piker.data',
|
||||
'piker.data.feed',
|
||||
'piker.data._sampling'
|
||||
]
|
||||
|
||||
|
||||
# TODO: we should rename the daemon to datad prolly once we split up
|
||||
# broker vs. data tasks into separate actors?
|
||||
@tractor.context
|
||||
async def _setup_persistent_brokerd(
|
||||
ctx: tractor.Context,
|
||||
|
|
@ -69,9 +60,15 @@ async def _setup_persistent_brokerd(
|
|||
|
||||
) -> None:
|
||||
'''
|
||||
Allocate a actor-wide service nursery in ``brokerd``
|
||||
such that feeds can be run in the background persistently by
|
||||
the broker backend as needed.
|
||||
Trading-only daemon (lifetime) fixture: console logging
|
||||
setup and a pinned-open context for service mgmt.
|
||||
|
||||
All data-feed-bus state now lives in the (data-feed-only)
|
||||
`datad.<brokername>` sibling daemon, see
|
||||
`piker.data._daemon._setup_persistent_datad()`; this
|
||||
actor hosts only the backend's `open_trade_dialog()`
|
||||
(live order-control) ep-task(s) which manage their own
|
||||
task trees per `tractor.Context`.
|
||||
|
||||
'''
|
||||
# NOTE: we only need to setup logging once (and only) here
|
||||
|
|
@ -87,46 +84,12 @@ async def _setup_persistent_brokerd(
|
|||
)
|
||||
assert log.name == _util.subsys
|
||||
|
||||
# further, set the log level on any broker broker specific
|
||||
# logger instance.
|
||||
# unblock caller
|
||||
await ctx.started()
|
||||
|
||||
from piker.data import feed
|
||||
assert not feed._bus
|
||||
|
||||
# allocate a nursery to the bus for spawning background
|
||||
# tasks to service client IPC requests, normally
|
||||
# `tractor.Context` connections to explicitly required
|
||||
# `brokerd` endpoints such as:
|
||||
# - `stream_quotes()`,
|
||||
# - `manage_history()`,
|
||||
# - `allocate_persistent_feed()`,
|
||||
# - `open_symbol_search()`
|
||||
# NOTE: see ep invocation details inside `.data.feed`.
|
||||
try:
|
||||
async with (
|
||||
# tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as service_nursery
|
||||
):
|
||||
bus: _FeedsBus = feed.get_feed_bus(
|
||||
brokername,
|
||||
service_nursery,
|
||||
)
|
||||
assert bus is feed._bus
|
||||
|
||||
# unblock caller
|
||||
await ctx.started()
|
||||
|
||||
# we pin this task to keep the feeds manager active until the
|
||||
# parent actor decides to tear it down
|
||||
await trio.sleep_forever()
|
||||
|
||||
except eg.ExceptionGroup:
|
||||
# TODO: likely some underlying `brokerd` IPC connection
|
||||
# broke so here we handle a respawn and re-connect attempt!
|
||||
# This likely should pair with development of the OCO task
|
||||
# nusery in dev over @ `tractor` B)
|
||||
# https://github.com/goodboy/tractor/pull/363
|
||||
raise
|
||||
# we pin this task to keep the daemon active until the
|
||||
# parent actor decides to tear it down
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
def broker_init(
|
||||
|
|
@ -147,8 +110,10 @@ def broker_init(
|
|||
|
||||
This includes:
|
||||
- load the appropriate <brokername>.py pkg module,
|
||||
- reads any declared `__enable_modules__: listr[str]` which will be
|
||||
passed to `tractor.ActorNursery.start_actor(enabled_modules=<this>)`
|
||||
- reads any declared `_brokerd_mods: list[str]` (falling
|
||||
back to the full `__enable_modules__` set for
|
||||
not-yet-split backends) which will be passed to
|
||||
`tractor.ActorNursery.start_actor(enable_modules=)`
|
||||
at actor start time,
|
||||
- deliver a references to the daemon lifetime fixture, which
|
||||
for now is always the `_setup_persistent_brokerd()` context defined
|
||||
|
|
@ -183,8 +148,14 @@ def broker_init(
|
|||
]
|
||||
for submodname in getattr(
|
||||
brokermod,
|
||||
'__enable_modules__',
|
||||
[],
|
||||
'_brokerd_mods',
|
||||
# fallback for (flat, less mature) backends which
|
||||
# don't yet declare a daemon-kind mod split.
|
||||
getattr(
|
||||
brokermod,
|
||||
'__enable_modules__',
|
||||
[],
|
||||
),
|
||||
):
|
||||
subpath: str = f'{modpath}.{submodname}'
|
||||
enabled.append(subpath)
|
||||
|
|
@ -212,6 +183,22 @@ async def spawn_brokerd(
|
|||
f'backend: {brokername!r}'
|
||||
)
|
||||
|
||||
# fail fast on (data-only) backends which don't offer
|
||||
# ANY live order-control eps; the caller should instead
|
||||
# be using paper-mode (and thus never spawning us)!
|
||||
from ..data.validate import get_eps
|
||||
brokerd_eps: dict = get_eps(
|
||||
get_brokermod(brokername),
|
||||
'brokerd',
|
||||
)
|
||||
if not brokerd_eps:
|
||||
raise RuntimeError(
|
||||
f'Backend {brokername!r} offers NO `brokerd` '
|
||||
f'(live order-control) eps!?\n'
|
||||
f'It is likely a datad-only provider, use '
|
||||
f'paper-mode for clearing instead.\n'
|
||||
)
|
||||
|
||||
(
|
||||
brokermode,
|
||||
tractor_kwargs,
|
||||
|
|
@ -233,7 +220,11 @@ async def spawn_brokerd(
|
|||
dname: str = tractor_kwargs.pop('name') # f'brokerd.{brokername}'
|
||||
portal = await Services.actor_n.start_actor(
|
||||
dname,
|
||||
enable_modules=_data_mods + tractor_kwargs.pop('enable_modules'),
|
||||
enable_modules=list(dict.fromkeys(
|
||||
_brokerd_service_mods
|
||||
+
|
||||
tractor_kwargs.pop('enable_modules')
|
||||
)),
|
||||
debug_mode=Services.debug_mode,
|
||||
**tractor_kwargs
|
||||
)
|
||||
|
|
|
|||
|
|
@ -52,9 +52,25 @@ __all__ = [
|
|||
]
|
||||
|
||||
|
||||
# `brokerd` modules
|
||||
__enable_modules__: list[str] = [
|
||||
# per-daemon-kind (sub)mod groups: declares which of our
|
||||
# submods host the eps run by each daemon-actor kind as
|
||||
# defined by `piker.data.validate._eps`.
|
||||
# NOTE: `get_mkt_info` and `open_symbol_search` both live
|
||||
# in `.feed` for this backend (no `symbols.py`).
|
||||
_brokerd_mods: list[str] = [
|
||||
'api',
|
||||
'feed',
|
||||
'broker',
|
||||
]
|
||||
|
||||
_datad_mods: list[str] = [
|
||||
'api',
|
||||
'feed',
|
||||
]
|
||||
|
||||
|
||||
# tractor RPC enable arg
|
||||
__enable_modules__: list[str] = list(dict.fromkeys(
|
||||
_brokerd_mods
|
||||
+
|
||||
_datad_mods
|
||||
))
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ from piker.log import (
|
|||
get_logger,
|
||||
)
|
||||
from ..service import (
|
||||
maybe_spawn_brokerd,
|
||||
maybe_spawn_datad,
|
||||
maybe_open_pikerd,
|
||||
)
|
||||
from ..brokers import (
|
||||
|
|
@ -187,7 +187,7 @@ def brokercheck(config, broker):
|
|||
|
||||
'''
|
||||
async def bcheck_main():
|
||||
async with maybe_spawn_brokerd(broker) as portal:
|
||||
async with maybe_spawn_datad(broker) as portal:
|
||||
await portal.run(run_test, broker)
|
||||
await portal.cancel_actor()
|
||||
|
||||
|
|
@ -317,7 +317,7 @@ def record(config, rate, name, dhost, filename):
|
|||
return
|
||||
|
||||
async def main(tries):
|
||||
async with maybe_spawn_brokerd(
|
||||
async with maybe_spawn_datad(
|
||||
tries=tries, loglevel=loglevel
|
||||
) as portal:
|
||||
# run app "main"
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ import trio
|
|||
|
||||
from piker.log import get_logger
|
||||
from . import get_brokermod
|
||||
from ..service import maybe_spawn_brokerd
|
||||
from ..service import maybe_spawn_datad
|
||||
from . import open_cached_client
|
||||
from ..accounting import MktPair
|
||||
|
||||
|
|
@ -172,7 +172,7 @@ async def symbol_search(
|
|||
# await tractor.devx._debug.maybe_init_greenback()
|
||||
# tractor.pause_from_sync()
|
||||
|
||||
async with maybe_spawn_brokerd(
|
||||
async with maybe_spawn_datad(
|
||||
mod.name,
|
||||
infect_asyncio=getattr(
|
||||
mod,
|
||||
|
|
|
|||
|
|
@ -47,13 +47,25 @@ __all__ = [
|
|||
]
|
||||
|
||||
|
||||
# tractor RPC enable arg
|
||||
__enable_modules__: list[str] = [
|
||||
# per-daemon-kind (sub)mod groups: declares which of our
|
||||
# submods host the eps run by each daemon-actor kind as
|
||||
# defined by `piker.data.validate._eps`.
|
||||
# NOTE: datad-only backend (no `broker.py` yet)!
|
||||
_brokerd_mods: list[str] = []
|
||||
|
||||
_datad_mods: list[str] = [
|
||||
'api',
|
||||
'feed',
|
||||
# 'broker',
|
||||
]
|
||||
|
||||
|
||||
# tractor RPC enable arg
|
||||
__enable_modules__: list[str] = list(dict.fromkeys(
|
||||
_brokerd_mods
|
||||
+
|
||||
_datad_mods
|
||||
))
|
||||
|
||||
# passed to ``tractor.ActorNursery.start_actor()``
|
||||
_spawn_kwargs = {
|
||||
'infect_asyncio': True,
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ from rapidfuzz import process as fuzzy
|
|||
import numpy as np
|
||||
from tractor.trionics import (
|
||||
broadcast_receiver,
|
||||
maybe_open_context
|
||||
maybe_open_context,
|
||||
collapse_eg,
|
||||
)
|
||||
from tractor import to_asyncio
|
||||
|
|
|
|||
|
|
@ -65,17 +65,18 @@ _brokerd_mods: list[str] = [
|
|||
]
|
||||
|
||||
_datad_mods: list[str] = [
|
||||
'api',
|
||||
'feed',
|
||||
'symbols',
|
||||
]
|
||||
|
||||
|
||||
# tractor RPC enable arg
|
||||
__enable_modules__: list[str] = (
|
||||
__enable_modules__: list[str] = list(dict.fromkeys(
|
||||
_brokerd_mods
|
||||
+
|
||||
_datad_mods
|
||||
)
|
||||
))
|
||||
|
||||
# passed to ``tractor.ActorNursery.start_actor()``
|
||||
_spawn_kwargs = {
|
||||
|
|
|
|||
|
|
@ -1340,6 +1340,21 @@ async def load_aio_clients(
|
|||
ib = None
|
||||
client = None
|
||||
|
||||
# XXX: post (brokerd vs. datad)-split BOTH per-broker
|
||||
# daemons connect to the same API gw/tws endpoint(s); to
|
||||
# avoid `clientId` collisions (and the long conn-timeout
|
||||
# retry cycle they cause) we offset the data-daemon's
|
||||
# default id-range to be disjoint from `brokerd.ib`'s
|
||||
# (which also retries with `client_id + i` increments).
|
||||
if client_id == 6116: # the default from above
|
||||
aname: str = tractor.current_actor().name
|
||||
if 'datad' in aname:
|
||||
client_id += 16
|
||||
# ad-hoc (test/cli) actors get their own range to
|
||||
# avoid clashing with any live daemon-tree's conns.
|
||||
elif 'brokerd' not in aname:
|
||||
client_id += 32
|
||||
|
||||
# attempt to get connection info from config; if no .toml entry
|
||||
# exists, we try to load from a default localhost connection.
|
||||
localhost = '127.0.0.1'
|
||||
|
|
|
|||
|
|
@ -66,10 +66,24 @@ __all__ = [
|
|||
]
|
||||
|
||||
|
||||
# tractor RPC enable arg
|
||||
__enable_modules__: list[str] = [
|
||||
# per-daemon-kind (sub)mod groups: declares which of our
|
||||
# submods host the eps run by each daemon-actor kind as
|
||||
# defined by `piker.data.validate._eps`.
|
||||
_brokerd_mods: list[str] = [
|
||||
'api',
|
||||
'broker',
|
||||
]
|
||||
|
||||
_datad_mods: list[str] = [
|
||||
'api',
|
||||
'feed',
|
||||
'symbols',
|
||||
]
|
||||
|
||||
|
||||
# tractor RPC enable arg
|
||||
__enable_modules__: list[str] = list(dict.fromkeys(
|
||||
_brokerd_mods
|
||||
+
|
||||
_datad_mods
|
||||
))
|
||||
|
|
|
|||
|
|
@ -73,10 +73,12 @@ from piker.log import (
|
|||
get_logger,
|
||||
)
|
||||
from piker.data import open_symcache
|
||||
from . import api
|
||||
from .feed import (
|
||||
from piker.data._web_bs import (
|
||||
open_autorecon_ws,
|
||||
NoBsWs,
|
||||
)
|
||||
from . import api
|
||||
from .feed import (
|
||||
stream_messages,
|
||||
)
|
||||
from .ledger import (
|
||||
|
|
|
|||
|
|
@ -335,9 +335,14 @@ class TradesRelay(Struct):
|
|||
@acm
|
||||
async def open_brokerd_dialog(
|
||||
brokermod: ModuleType,
|
||||
portal: tractor.Portal,
|
||||
exec_mode: str,
|
||||
fqme: str|None = None,
|
||||
|
||||
# XXX: explicit (already spawned) trading-actor override,
|
||||
# currently only used by the `piker ledger` cli which
|
||||
# boots its own ad-hoc `brokerd`-like actor; normally we
|
||||
# (lazily) spawn/find the `brokerd.<broker>` daemon here.
|
||||
portal: tractor.Portal|None = None,
|
||||
loglevel: str|None = None,
|
||||
|
||||
) -> tuple[
|
||||
|
|
@ -351,6 +356,10 @@ async def open_brokerd_dialog(
|
|||
paper engine instance depending on live trading support for the
|
||||
broker backend, configuration, or client code usage.
|
||||
|
||||
NOTE: this is now the ONE place where a (live, credentialed)
|
||||
`brokerd.<broker>` daemon-actor gets (lazily) booted; pure
|
||||
data/paper sessions should never spawn one!
|
||||
|
||||
'''
|
||||
get_console_log(
|
||||
level=loglevel,
|
||||
|
|
@ -416,16 +425,29 @@ async def open_brokerd_dialog(
|
|||
)
|
||||
exec_mode: str = 'paper'
|
||||
|
||||
if (
|
||||
trades_endpoint is not None
|
||||
or
|
||||
exec_mode != 'paper'
|
||||
):
|
||||
# open live brokerd trades endpoint
|
||||
open_trades_endpoint = portal.open_context(
|
||||
trades_endpoint,
|
||||
loglevel=loglevel,
|
||||
@acm
|
||||
async def acquire_live_portal():
|
||||
'''
|
||||
Deliver a portal to the (live, credentialed) trading
|
||||
actor hosting the backend's `open_trade_dialog()` ep:
|
||||
either the caller-provided override or the (maybe
|
||||
lazily spawned) `brokerd.<broker>` service daemon.
|
||||
|
||||
'''
|
||||
if portal is not None:
|
||||
yield portal
|
||||
return
|
||||
|
||||
# XXX: the ONE (normal) place a `brokerd.<broker>`
|
||||
# daemon-actor gets booted in the runtime B)
|
||||
from piker.brokers._daemon import (
|
||||
maybe_spawn_brokerd,
|
||||
)
|
||||
async with maybe_spawn_brokerd(
|
||||
brokermod.name,
|
||||
loglevel=loglevel,
|
||||
) as live_portal:
|
||||
yield live_portal
|
||||
|
||||
@acm
|
||||
async def maybe_open_paper_ep():
|
||||
|
|
@ -437,7 +459,14 @@ async def open_brokerd_dialog(
|
|||
return
|
||||
|
||||
# open trades-dialog endpoint with backend broker
|
||||
async with open_trades_endpoint as msg:
|
||||
async with (
|
||||
acquire_live_portal() as live_portal,
|
||||
|
||||
live_portal.open_context(
|
||||
trades_endpoint,
|
||||
loglevel=loglevel,
|
||||
) as msg,
|
||||
):
|
||||
ctx, first = msg
|
||||
|
||||
# runtime indication that the backend can't support live
|
||||
|
|
@ -581,7 +610,6 @@ class Router(Struct):
|
|||
async def maybe_open_brokerd_dialog(
|
||||
self,
|
||||
brokermod: ModuleType,
|
||||
portal: tractor.Portal,
|
||||
exec_mode: str,
|
||||
fqme: str,
|
||||
loglevel: str,
|
||||
|
|
@ -606,7 +634,6 @@ class Router(Struct):
|
|||
|
||||
async with open_brokerd_dialog(
|
||||
brokermod=brokermod,
|
||||
portal=portal,
|
||||
exec_mode=exec_mode,
|
||||
fqme=fqme,
|
||||
loglevel=loglevel,
|
||||
|
|
@ -668,7 +695,6 @@ class Router(Struct):
|
|||
brokername, _, _, _ = unpack_fqme(fqme)
|
||||
brokermod = feed.mods[brokername]
|
||||
broker = brokermod.name
|
||||
portal = feed.portals[brokermod]
|
||||
|
||||
# XXX: this should be initial price quote from target provider
|
||||
flume = feed.flumes[fqme]
|
||||
|
|
@ -682,7 +708,6 @@ class Router(Struct):
|
|||
|
||||
async with self.maybe_open_brokerd_dialog(
|
||||
brokermod=brokermod,
|
||||
portal=portal,
|
||||
exec_mode=exec_mode,
|
||||
fqme=fqme,
|
||||
loglevel=loglevel,
|
||||
|
|
|
|||
|
|
@ -250,7 +250,6 @@ def cli(
|
|||
|
||||
# TODO: load endpoints from `conf::[network].pikerd`
|
||||
# - pikerd vs. regd, separate registry daemon?
|
||||
# - expose datad vs. brokerd?
|
||||
# - bind emsd with certain perms on public iface?
|
||||
regaddrs: list[tuple[str, int]] = regaddr or [(
|
||||
_default_registry_host,
|
||||
|
|
|
|||
|
|
@ -188,6 +188,48 @@ def _override_config_dir(
|
|||
_config_dir = path
|
||||
|
||||
|
||||
def _maybe_use_test_dir() -> None:
|
||||
'''
|
||||
When running under the `pytest` harness, override
|
||||
the config dir to the per-test-tmp dir "passed down"
|
||||
the actor tree via `tractor`'s runtime-vars
|
||||
inheritance mechanism.
|
||||
|
||||
See the `tractor_runtime_overrides` usage in our
|
||||
`tests.conftest._open_test_pikerd()` as well as
|
||||
`.service._actor_runtime.open_piker_runtime()` for
|
||||
the root-actor's pre-loading of the var state.
|
||||
|
||||
NOTE: this must be checked lazily at config-path
|
||||
access time (NOT import time) since sub-actors only
|
||||
receive runtime-vars once their `tractor` runtime
|
||||
has fully booted.
|
||||
|
||||
'''
|
||||
global _config_dir
|
||||
import tractor
|
||||
actor = tractor.current_actor(
|
||||
err_on_no_runtime=False,
|
||||
)
|
||||
if actor is None:
|
||||
return
|
||||
|
||||
rvs: dict = tractor.runtime._state._runtime_vars
|
||||
pvars: dict|None = rvs.get('piker_vars')
|
||||
if (
|
||||
pvars
|
||||
and
|
||||
(testdir := pvars.get('piker_test_dir'))
|
||||
):
|
||||
testdirpath = Path(testdir)
|
||||
assert testdirpath.exists(), (
|
||||
f'piker test harness might be borked!?\n'
|
||||
f'testdirpath: {testdirpath!r}\n'
|
||||
)
|
||||
if _config_dir != testdirpath:
|
||||
_override_config_dir(testdirpath)
|
||||
|
||||
|
||||
def _conf_fn_w_ext(
|
||||
name: str,
|
||||
) -> str:
|
||||
|
|
@ -201,6 +243,7 @@ def get_conf_dir() -> Path:
|
|||
on the local filesystem.
|
||||
|
||||
'''
|
||||
_maybe_use_test_dir()
|
||||
return _config_dir
|
||||
|
||||
|
||||
|
|
@ -226,7 +269,7 @@ def get_conf_path(
|
|||
assert str(conf_name) in _conf_names
|
||||
|
||||
fn = _conf_fn_w_ext(conf_name)
|
||||
return _config_dir / Path(fn)
|
||||
return get_conf_dir() / Path(fn)
|
||||
|
||||
|
||||
def repodir() -> Path:
|
||||
|
|
@ -271,8 +314,9 @@ def load(
|
|||
|
||||
'''
|
||||
# create the $HOME/.config/piker dir if dne
|
||||
if not _config_dir.is_dir():
|
||||
_config_dir.mkdir(
|
||||
conf_dir: Path = get_conf_dir()
|
||||
if not conf_dir.is_dir():
|
||||
conf_dir.mkdir(
|
||||
parents=True,
|
||||
exist_ok=True,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -0,0 +1,300 @@
|
|||
# piker: trading gear for hackers
|
||||
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Data-daemon-actor "endpoint-hooks": the service task entry
|
||||
points for `datad.<brokername>`, the per-provider real-time
|
||||
and historical market-data feed daemon.
|
||||
|
||||
The (data vs. broker)d-split mirrors the ep-groupings in
|
||||
`piker.data.validate._eps`: this daemon hosts all `'datad'`
|
||||
eps (live quotes, history loading, symbology search) while
|
||||
its `brokerd.<brokername>` sibling hosts only the live
|
||||
order-control (and thus credentialed) `'brokerd'` eps.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from types import ModuleType
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
AsyncContextManager,
|
||||
)
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
from piker.log import (
|
||||
get_logger,
|
||||
get_console_log,
|
||||
)
|
||||
from . import _util
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .feed import _FeedsBus
|
||||
|
||||
log = get_logger(name=__name__)
|
||||
|
||||
# `datad`-actor-always-enabled mods: the data-side successor
|
||||
# to the old `piker.brokers._daemon._data_mods` set.
|
||||
# NOTE: keeping this list as small as possible is part of
|
||||
# our caps-sec model and should be treated with utmost care!
|
||||
_datad_service_mods: list[str] = [
|
||||
'piker.brokers.core',
|
||||
'piker.brokers.data',
|
||||
'piker.data',
|
||||
'piker.data.feed',
|
||||
'piker.data._sampling',
|
||||
'piker.data._daemon',
|
||||
]
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def _setup_persistent_datad(
|
||||
ctx: tractor.Context,
|
||||
brokername: str,
|
||||
loglevel: str|None = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Allocate an actor-wide service nursery in this
|
||||
`datad.<brokername>` actor such that data-feed tasks
|
||||
(shm writer-samplers, history backfillers, symbology
|
||||
loaders) can be run in the background persistently by
|
||||
the provider backend as needed.
|
||||
|
||||
'''
|
||||
# NOTE: we only need to setup logging once (and only)
|
||||
# here since all hosted daemon tasks will reference
|
||||
# this same log instance's (actor local) state and thus
|
||||
# don't require any further (level) configuration on
|
||||
# their own B)
|
||||
actor: tractor.Actor = tractor.current_actor()
|
||||
tll: str = actor.loglevel
|
||||
log = get_console_log(
|
||||
level=loglevel or tll,
|
||||
name=f'{_util.subsys}.{brokername}',
|
||||
with_tractor_log=bool(tll),
|
||||
)
|
||||
assert log.name == _util.subsys
|
||||
|
||||
from piker.data import feed
|
||||
assert not feed._bus
|
||||
|
||||
# allocate a nursery to the bus for spawning background
|
||||
# tasks which service client IPC requests, normally
|
||||
# `tractor.Context` connections to explicitly required
|
||||
# `datad` endpoints such as:
|
||||
# - `stream_quotes()`,
|
||||
# - `manage_history()`,
|
||||
# - `allocate_persistent_feed()`,
|
||||
# - `open_symbol_search()`
|
||||
# NOTE: see ep invocation details inside `.data.feed`.
|
||||
async with (
|
||||
trio.open_nursery() as service_nursery
|
||||
):
|
||||
bus: _FeedsBus = feed.get_feed_bus(
|
||||
brokername,
|
||||
service_nursery,
|
||||
)
|
||||
assert bus is feed._bus
|
||||
|
||||
# unblock caller
|
||||
await ctx.started()
|
||||
|
||||
# we pin this task to keep the feeds manager active
|
||||
# until the parent actor decides to tear it down
|
||||
await trio.sleep_forever()
|
||||
|
||||
|
||||
def datad_init(
|
||||
brokername: str,
|
||||
loglevel: str|None = None,
|
||||
|
||||
**start_actor_kwargs,
|
||||
|
||||
) -> tuple[
|
||||
ModuleType,
|
||||
dict,
|
||||
AsyncContextManager,
|
||||
]:
|
||||
'''
|
||||
Given an input broker name, load all named arguments
|
||||
which can be passed for daemon endpoint + context spawn
|
||||
as required in every `datad.<brokername>` (actor)
|
||||
service.
|
||||
|
||||
This includes:
|
||||
- load the appropriate <brokername>.py pkg module,
|
||||
- reads any declared `_datad_mods: list[str]` (falling
|
||||
back to the full `__enable_modules__` set for
|
||||
not-yet-split backends) which will be passed to
|
||||
`tractor.ActorNursery.start_actor(enable_modules=)`
|
||||
at actor start time,
|
||||
- deliver a reference to the daemon lifetime fixture,
|
||||
which for now is always the
|
||||
`_setup_persistent_datad()` context defined above.
|
||||
|
||||
'''
|
||||
from piker.brokers import get_brokermod
|
||||
from .validate import get_eps
|
||||
brokermod = get_brokermod(brokername)
|
||||
modpath: str = brokermod.__name__
|
||||
|
||||
# warn (but don't bail) when the backend is missing
|
||||
# some/all of the `datad` ep contract defined by
|
||||
# `piker.data.validate._eps`.
|
||||
datad_eps: dict = get_eps(brokermod, 'datad')
|
||||
if not datad_eps:
|
||||
log.warning(
|
||||
f'Backend {brokername!r} offers NO `datad` '
|
||||
f'(data-feed) eps!?\n'
|
||||
f'Most feed/chart functionality will be '
|
||||
f'broken for this provider..\n'
|
||||
)
|
||||
|
||||
start_actor_kwargs['name'] = f'datad.{brokername}'
|
||||
|
||||
# XXX CRITICAL: include any backend-declared spawn
|
||||
# kwargs, eg. `{'infect_asyncio': True}` required by
|
||||
# `ib`'s embedded `asyncio`-mode `ib_async` usage!
|
||||
start_actor_kwargs.update(
|
||||
getattr(
|
||||
brokermod,
|
||||
'_spawn_kwargs',
|
||||
{},
|
||||
)
|
||||
)
|
||||
|
||||
# lookup actor-enabled modules declared by the backend
|
||||
# offering the `datad` endpoint(s).
|
||||
enabled: list[str]
|
||||
enabled = start_actor_kwargs['enable_modules'] = [
|
||||
__name__, # so that eps from THIS mod can be invoked
|
||||
modpath,
|
||||
]
|
||||
for submodname in getattr(
|
||||
brokermod,
|
||||
'_datad_mods',
|
||||
# fallback for (flat, less mature) backends which
|
||||
# don't yet declare a daemon-kind mod split.
|
||||
getattr(
|
||||
brokermod,
|
||||
'__enable_modules__',
|
||||
[],
|
||||
),
|
||||
):
|
||||
subpath: str = f'{modpath}.{submodname}'
|
||||
enabled.append(subpath)
|
||||
|
||||
return (
|
||||
brokermod,
|
||||
start_actor_kwargs, # to `ActorNursery.start_actor()`
|
||||
|
||||
# XXX see impl above; contains all (actor global)
|
||||
# setup/teardown expected in all `datad` actor
|
||||
# instances.
|
||||
_setup_persistent_datad,
|
||||
)
|
||||
|
||||
|
||||
async def spawn_datad(
|
||||
brokername: str,
|
||||
loglevel: str|None = None,
|
||||
|
||||
**tractor_kwargs,
|
||||
|
||||
) -> bool:
|
||||
|
||||
log.info(
|
||||
f'Spawning data-daemon,\n'
|
||||
f'backend: {brokername!r}'
|
||||
)
|
||||
|
||||
(
|
||||
brokermod,
|
||||
tractor_kwargs,
|
||||
daemon_fixture_ep,
|
||||
) = datad_init(
|
||||
brokername,
|
||||
loglevel,
|
||||
**tractor_kwargs,
|
||||
)
|
||||
|
||||
# ask `pikerd` to spawn a new sub-actor and manage it
|
||||
# under its actor nursery
|
||||
from piker.service import Services
|
||||
|
||||
dname: str = tractor_kwargs.pop('name') # f'datad.{brokername}'
|
||||
enable_mods: list[str] = list(dict.fromkeys(
|
||||
_datad_service_mods
|
||||
+
|
||||
tractor_kwargs.pop('enable_modules')
|
||||
))
|
||||
portal = await Services.actor_n.start_actor(
|
||||
dname,
|
||||
enable_modules=enable_mods,
|
||||
debug_mode=Services.debug_mode,
|
||||
**tractor_kwargs,
|
||||
)
|
||||
|
||||
# NOTE: the service mngr expects an already spawned
|
||||
# actor + its portal ref in order to do non-blocking
|
||||
# setup of the `datad` service nursery.
|
||||
await Services.start_service_task(
|
||||
dname,
|
||||
portal,
|
||||
|
||||
# signature of target root-task endpoint
|
||||
daemon_fixture_ep,
|
||||
brokername=brokername,
|
||||
loglevel=loglevel,
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_spawn_datad(
|
||||
brokername: str,
|
||||
loglevel: str|None = None,
|
||||
|
||||
**pikerd_kwargs,
|
||||
|
||||
) -> tractor.Portal:
|
||||
'''
|
||||
Helper to spawn a datad service *from* a client who
|
||||
wishes to use the sub-actor-daemon but is fine with
|
||||
re-using any existing and contactable `datad`.
|
||||
|
||||
Mas o menos, acts as a cached-actor-getter factory.
|
||||
|
||||
'''
|
||||
from piker.service import maybe_spawn_daemon
|
||||
|
||||
async with maybe_spawn_daemon(
|
||||
service_name=f'datad.{brokername}',
|
||||
service_task_target=spawn_datad,
|
||||
spawn_args={
|
||||
'brokername': brokername,
|
||||
},
|
||||
loglevel=loglevel,
|
||||
|
||||
**pikerd_kwargs,
|
||||
|
||||
) as portal:
|
||||
yield portal
|
||||
|
|
@ -17,7 +17,7 @@
|
|||
'''
|
||||
Data feed apis and infra.
|
||||
|
||||
This module is enabled for ``brokerd`` daemons and includes mostly
|
||||
This module is enabled for ``datad`` daemons and includes mostly
|
||||
endpoints and middleware to support our real-time, provider agnostic,
|
||||
live market quotes layer. Historical data loading and processing is also
|
||||
initiated in parts of the feed bus startup but business logic and
|
||||
|
|
@ -54,8 +54,11 @@ from piker.accounting import (
|
|||
)
|
||||
from piker.types import Struct
|
||||
from piker.brokers import get_brokermod
|
||||
from piker.service import (
|
||||
maybe_spawn_brokerd,
|
||||
# NOTE: must be a "relative-direct" import (NOT via
|
||||
# `piker.service`) to avoid a partial-init cycle when this
|
||||
# mod is loaded as part of `piker.service.__init__`.
|
||||
from ._daemon import (
|
||||
maybe_spawn_datad,
|
||||
)
|
||||
from piker.calc import humanize
|
||||
from ._util import (
|
||||
|
|
@ -110,7 +113,7 @@ class _FeedsBus(Struct):
|
|||
'''
|
||||
Data feeds broadcaster and persistence management.
|
||||
|
||||
This is a brokerd side api used to manager persistent real-time
|
||||
This is a datad side api used to manager persistent real-time
|
||||
streams that can be allocated and left alive indefinitely. A bus is
|
||||
associated one-to-one with a particular broker backend where the
|
||||
"bus" refers so a multi-symbol bus where quotes are interleaved in
|
||||
|
|
@ -249,7 +252,7 @@ async def allocate_persistent_feed(
|
|||
'''
|
||||
Create and maintain a "feed bus" which allocates tasks for real-time
|
||||
streaming and optional historical data storage per broker/data provider
|
||||
backend; this normally task runs *in* a `brokerd` actor.
|
||||
backend; this normally task runs *in* a `datad` actor.
|
||||
|
||||
If none exists, this allocates a ``_FeedsBus`` which manages the
|
||||
lifetimes of streaming tasks created for each requested symbol.
|
||||
|
|
@ -318,8 +321,8 @@ async def allocate_persistent_feed(
|
|||
# at max capacity.
|
||||
# - the same ideas ^ but when a local core is maxxed out (like how
|
||||
# binance does often with hft XD
|
||||
# - if a brokerd is non-local then we can't just allocate a mem
|
||||
# channel here and have the brokerd write it, we instead need
|
||||
# - if a datad is non-local then we can't just allocate a mem
|
||||
# channel here and have the datad write it, we instead need
|
||||
# a small streaming machine around the remote feed which can then
|
||||
# do the normal work of sampling and writing shm buffers
|
||||
# (depending on if we want sampling done on the far end or not?)
|
||||
|
|
@ -499,7 +502,7 @@ async def open_feed_bus(
|
|||
# (after we also group them in a nice `/dev/shm/piker/` subdir).
|
||||
# ensure we are who we think we are
|
||||
servicename = tractor.current_actor().name
|
||||
assert 'brokerd' in servicename
|
||||
assert 'datad' in servicename
|
||||
assert brokername in servicename
|
||||
|
||||
bus: _FeedsBus = get_feed_bus(brokername)
|
||||
|
|
@ -509,12 +512,12 @@ async def open_feed_bus(
|
|||
for symbol in symbols:
|
||||
|
||||
# if no cached feed for this symbol has been created for this
|
||||
# brokerd yet, start persistent stream and shm writer task in
|
||||
# datad yet, start persistent stream and shm writer task in
|
||||
# service nursery
|
||||
flume = bus.feeds.get(symbol)
|
||||
if flume is None:
|
||||
# allocate a new actor-local stream bus which
|
||||
# will persist for this `brokerd`'s service lifetime.
|
||||
# will persist for this `datad`'s service lifetime.
|
||||
async with bus.task_lock:
|
||||
await bus.nursery.start(
|
||||
partial(
|
||||
|
|
@ -721,7 +724,7 @@ class Feed(Struct):
|
|||
mods = {name: self.mods[name] for name in brokers}
|
||||
|
||||
if len(mods) == 1:
|
||||
# just pass the brokerd stream directly if only one provider
|
||||
# just pass the datad stream directly if only one provider
|
||||
# was detected.
|
||||
stream = self.streams[list(brokers)[0]]
|
||||
async with stream.subscribe() as bstream:
|
||||
|
|
@ -763,7 +766,7 @@ class Feed(Struct):
|
|||
|
||||
|
||||
@acm
|
||||
async def install_brokerd_search(
|
||||
async def install_datad_search(
|
||||
|
||||
portal: tractor.Portal,
|
||||
brokermod: ModuleType,
|
||||
|
|
@ -812,7 +815,7 @@ async def maybe_open_feed(
|
|||
ReceiveChannel[dict[str, Any]],
|
||||
):
|
||||
'''
|
||||
Maybe open a data to a ``brokerd`` daemon only if there is no
|
||||
Maybe open a data feed to a ``datad`` daemon only if there is no
|
||||
local one for the broker-symbol pair, if one is cached use it wrapped
|
||||
in a tractor broadcast receiver.
|
||||
|
||||
|
|
@ -885,14 +888,14 @@ async def open_feed(
|
|||
providers.setdefault(mod, []).append(bs_fqme)
|
||||
feed.mods[mod.name] = mod
|
||||
|
||||
# one actor per brokerd for now
|
||||
brokerd_ctxs = []
|
||||
# one actor per datad for now
|
||||
datad_ctxs = []
|
||||
for brokermod, bfqmes in providers.items():
|
||||
|
||||
# if no `brokerd` for this backend exists yet we spawn
|
||||
# if no `datad` for this backend exists yet we spawn
|
||||
# a daemon actor for it.
|
||||
brokerd_ctxs.append(
|
||||
maybe_spawn_brokerd(
|
||||
datad_ctxs.append(
|
||||
maybe_spawn_datad(
|
||||
brokermod.name,
|
||||
loglevel=loglevel
|
||||
)
|
||||
|
|
@ -900,7 +903,7 @@ async def open_feed(
|
|||
|
||||
portals: tuple[tractor.Portal]
|
||||
async with trionics.gather_contexts(
|
||||
brokerd_ctxs,
|
||||
datad_ctxs,
|
||||
) as portals:
|
||||
|
||||
bus_ctxs: list[AsyncContextManager] = []
|
||||
|
|
@ -937,9 +940,9 @@ async def open_feed(
|
|||
tick_throttle=tick_throttle,
|
||||
|
||||
# XXX: super important to avoid
|
||||
# the brokerd from some other
|
||||
# the datad from some other
|
||||
# backend overruning the task here
|
||||
# bc some other brokerd took longer
|
||||
# bc some other datad took longer
|
||||
# to startup before we hit the `.open_stream()`
|
||||
# loop below XD .. really we should try to do each
|
||||
# of these stream open sequences sequentially per
|
||||
|
|
@ -1008,7 +1011,7 @@ async def open_feed(
|
|||
assert stream
|
||||
feed.streams[brokermod.name] = stream
|
||||
|
||||
# apply `brokerd`-common stream to each flume
|
||||
# apply `datad`-common stream to each flume
|
||||
# tracking a live market feed from that provider.
|
||||
for fqme, flume in feed.flumes.items():
|
||||
if brokermod.name == flume.mkt.broker:
|
||||
|
|
|
|||
|
|
@ -91,6 +91,24 @@ _eps: dict[str, list[str]] = {
|
|||
}
|
||||
|
||||
|
||||
def get_eps(
|
||||
mod: ModuleType,
|
||||
kind: str, # 'middleware' | 'datad' | 'brokerd'
|
||||
|
||||
) -> dict[str, Callable]:
|
||||
'''
|
||||
Return the daemon-kind's ep funcs defined by the backend
|
||||
`mod`, keyed by ep name; any eps from `_eps[kind]` not
|
||||
implemented by the backend are excluded.
|
||||
|
||||
'''
|
||||
return {
|
||||
name: ep
|
||||
for name in _eps[kind]
|
||||
if (ep := getattr(mod, name, None))
|
||||
}
|
||||
|
||||
|
||||
def validate_backend(
|
||||
mod: ModuleType,
|
||||
syms: list[str],
|
||||
|
|
|
|||
|
|
@ -56,3 +56,7 @@ from ..brokers._daemon import (
|
|||
spawn_brokerd as spawn_brokerd,
|
||||
maybe_spawn_brokerd as maybe_spawn_brokerd,
|
||||
)
|
||||
from ..data._daemon import (
|
||||
spawn_datad as spawn_datad,
|
||||
maybe_spawn_datad as maybe_spawn_datad,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -157,6 +157,7 @@ _root_modules: list[str] = [
|
|||
__name__,
|
||||
'piker.service._daemon',
|
||||
'piker.brokers._daemon',
|
||||
'piker.data._daemon',
|
||||
|
||||
'piker.clearing._ems',
|
||||
'piker.clearing._client',
|
||||
|
|
@ -214,7 +215,13 @@ async def open_pikerd(
|
|||
trio.open_nursery() as service_tn,
|
||||
):
|
||||
for addr in reg_addrs:
|
||||
uaddr: tuple = addr.unwrap()
|
||||
# normalize to a wrapped `tractor` addr-type;
|
||||
# entries may be raw `tuple`s when passed in
|
||||
# from (test) client code.
|
||||
wladdr = tractor.discovery._addr.wrap_address(
|
||||
addr,
|
||||
)
|
||||
uaddr: tuple = wladdr.unwrap()
|
||||
if (
|
||||
uaddr not in root_actor.accept_addrs
|
||||
):
|
||||
|
|
|
|||
|
|
@ -225,10 +225,13 @@ async def check_for_service(
|
|||
|
||||
'''
|
||||
async with (
|
||||
open_registry(ensure_exists=False) as reg_addr,
|
||||
open_registry(
|
||||
addrs=Registry.addrs,
|
||||
ensure_exists=False,
|
||||
) as reg_addrs,
|
||||
tractor.query_actor(
|
||||
service_name,
|
||||
arbiter_sockaddr=reg_addr,
|
||||
) as sockaddr,
|
||||
regaddr=reg_addrs[0],
|
||||
) as (sockaddr, _),
|
||||
):
|
||||
return sockaddr
|
||||
|
|
|
|||
|
|
@ -32,12 +32,12 @@ from . import _event
|
|||
from . import _search
|
||||
from ..accounting import unpack_fqme
|
||||
from ..data._symcache import open_symcache
|
||||
from ..data.feed import install_brokerd_search
|
||||
from ..data.feed import install_datad_search
|
||||
from ..log import (
|
||||
get_logger,
|
||||
get_console_log,
|
||||
)
|
||||
from ..service import maybe_spawn_brokerd
|
||||
from ..service import maybe_spawn_datad
|
||||
from ._exec import run_qtractor
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
|
@ -50,16 +50,16 @@ async def load_provider_search(
|
|||
) -> None:
|
||||
|
||||
name = brokermod.name
|
||||
log.info(f'loading brokerd for {name}..')
|
||||
log.info(f'loading datad for {name}..')
|
||||
|
||||
async with (
|
||||
|
||||
maybe_spawn_brokerd(
|
||||
maybe_spawn_datad(
|
||||
name,
|
||||
loglevel=loglevel
|
||||
) as portal,
|
||||
|
||||
install_brokerd_search(
|
||||
install_datad_search(
|
||||
portal,
|
||||
brokermod,
|
||||
),
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ from ..cli import (
|
|||
load_trans_eps,
|
||||
)
|
||||
from .. import watchlists as wl
|
||||
from ..service import maybe_spawn_brokerd
|
||||
from ..service import maybe_spawn_datad
|
||||
|
||||
|
||||
_config_dir = click.get_app_dir('piker')
|
||||
|
|
@ -69,7 +69,7 @@ def monitor(config, rate, name, dhost, test, tl):
|
|||
from .kivy.monitor import _async_main
|
||||
|
||||
async def main():
|
||||
async with maybe_spawn_brokerd(
|
||||
async with maybe_spawn_datad(
|
||||
brokername=brokermod.name,
|
||||
loglevel=loglevel
|
||||
) as portal:
|
||||
|
|
@ -118,7 +118,7 @@ def optschain(
|
|||
from .kivy.option_chain import _async_main
|
||||
|
||||
async def main():
|
||||
async with maybe_spawn_brokerd(
|
||||
async with maybe_spawn_datad(
|
||||
loglevel=loglevel
|
||||
):
|
||||
# run app "main"
|
||||
|
|
|
|||
|
|
@ -495,7 +495,7 @@ async def _async_main(
|
|||
|
||||
async with trio.open_nursery() as nursery:
|
||||
# get a portal to the data feed daemon
|
||||
async with tractor.wait_for_actor('brokerd') as portal:
|
||||
async with tractor.wait_for_actor('datad') as portal:
|
||||
|
||||
# set up a pager view for large ticker lists
|
||||
chain = await new_chain_ui(
|
||||
|
|
|
|||
|
|
@ -151,7 +151,7 @@ def load_and_check_pos(
|
|||
# is the same the fqme.
|
||||
pp: Position = table.pps[ppmsg.symbol]
|
||||
|
||||
assert ppmsg.size == pp.size
|
||||
assert ppmsg.size == pp.cumsize
|
||||
assert ppmsg.avg_price == pp.ppu
|
||||
|
||||
yield pp
|
||||
|
|
@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
|
|||
# NOTE: emsd should error on the actor's enabled modules
|
||||
# import phase, when looking for a backend named `doggy`.
|
||||
except tractor.RemoteActorError as re:
|
||||
assert re.type is ModuleNotFoundError
|
||||
assert re.boxed_type is ModuleNotFoundError
|
||||
|
||||
run_and_tollerate_cancels(load_bad_fqme)
|
||||
|
||||
|
|
|
|||
|
|
@ -53,11 +53,12 @@ def test_runtime_boot(
|
|||
|
||||
tractor.wait_for_actor(
|
||||
'pikerd',
|
||||
arbiter_sockaddr=daemon_addr,
|
||||
registry_addr=daemon_addr,
|
||||
) as portal,
|
||||
):
|
||||
assert pikerd_portal.channel.raddr == daemon_addr
|
||||
assert pikerd_portal.channel.raddr == portal.channel.raddr
|
||||
uw_raddr: tuple = pikerd_portal.chan.raddr.unwrap()
|
||||
assert uw_raddr == daemon_addr
|
||||
assert uw_raddr == portal.chan.raddr.unwrap()
|
||||
|
||||
# no service tasks should be started
|
||||
assert not services.service_tasks
|
||||
|
|
@ -65,6 +66,41 @@ def test_runtime_boot(
|
|||
trio.run(main)
|
||||
|
||||
|
||||
def test_datad_spawn(
|
||||
open_test_pikerd: AsyncContextManager,
|
||||
loglevel: str,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Verify the new (data-feed-only) `datad.<broker>` daemon
|
||||
can be spawned/registered as a `pikerd` sub-service via
|
||||
the `maybe_spawn_datad()` factory.
|
||||
|
||||
'''
|
||||
from piker.service import maybe_spawn_datad
|
||||
|
||||
backend: str = 'kraken'
|
||||
datad_name: str = f'datad.{backend}'
|
||||
|
||||
async def main():
|
||||
async with (
|
||||
open_test_pikerd() as (_, _, _, services),
|
||||
|
||||
maybe_spawn_datad(
|
||||
backend,
|
||||
loglevel=loglevel,
|
||||
) as portal,
|
||||
):
|
||||
assert portal
|
||||
|
||||
async with ensure_service(datad_name):
|
||||
assert (
|
||||
datad_name in services.service_tasks
|
||||
)
|
||||
|
||||
trio.run(main)
|
||||
|
||||
|
||||
def test_ensure_datafeed_actors(
|
||||
open_test_pikerd: AsyncContextManager,
|
||||
loglevel: str,
|
||||
|
|
@ -72,14 +108,14 @@ def test_ensure_datafeed_actors(
|
|||
|
||||
) -> None:
|
||||
'''
|
||||
Verify that booting a data feed starts a `brokerd`
|
||||
Verify that booting a data feed starts a `datad`
|
||||
actor and a singleton global `samplerd` and opening
|
||||
an order mode in paper opens the `paperboi` service.
|
||||
|
||||
'''
|
||||
actor_name: str = 'brokerd'
|
||||
actor_name: str = 'datad'
|
||||
backend: str = 'kraken'
|
||||
brokerd_name: str = f'{actor_name}.{backend}'
|
||||
datad_name: str = f'{actor_name}.{backend}'
|
||||
|
||||
async def main():
|
||||
async with (
|
||||
|
|
@ -94,7 +130,7 @@ def test_ensure_datafeed_actors(
|
|||
await feed.pause()
|
||||
|
||||
async with (
|
||||
ensure_service(brokerd_name),
|
||||
ensure_service(datad_name),
|
||||
ensure_service('samplerd'),
|
||||
):
|
||||
await trio.sleep(0.1)
|
||||
|
|
@ -108,7 +144,7 @@ async def ensure_service(
|
|||
sockaddr: tuple[str, int] | None = None,
|
||||
) -> None:
|
||||
async with find_service(name) as portal:
|
||||
remote_sockaddr = portal.channel.raddr
|
||||
remote_sockaddr: tuple = portal.chan.raddr.unwrap()
|
||||
print(f'FOUND `{name}` @ {remote_sockaddr}')
|
||||
|
||||
if sockaddr:
|
||||
|
|
@ -131,40 +167,49 @@ def run_test_w_cancel_method(
|
|||
"was remotely cancelled by remote actor (\'pikerd\'")
|
||||
|
||||
if cancel_method == 'sigint':
|
||||
with pytest.raises(
|
||||
# XXX: with modern `tractor` the (single-exc)
|
||||
# group is collapsed so a bare KBI normally
|
||||
# propagates; tolerate either form.
|
||||
with pytest.raises((
|
||||
KeyboardInterrupt,
|
||||
BaseExceptionGroup,
|
||||
) as exc_info:
|
||||
)) as exc_info:
|
||||
trio.run(main)
|
||||
|
||||
multi = exc_info.value
|
||||
err = exc_info.value
|
||||
match err:
|
||||
case BaseExceptionGroup():
|
||||
for suberr in err.exceptions:
|
||||
match suberr:
|
||||
# ensure we receive a remote
|
||||
# cancellation error caused by the
|
||||
# pikerd root actor.
|
||||
case tractor.ContextCancelled():
|
||||
assert (
|
||||
cancelled_msg
|
||||
in
|
||||
suberr.args[0]
|
||||
)
|
||||
|
||||
for suberr in multi.exceptions:
|
||||
match suberr:
|
||||
# ensure we receive a remote cancellation error caused
|
||||
# by the pikerd root actor since we used the
|
||||
# `.cancel_service()` API above B)
|
||||
case tractor.ContextCancelled():
|
||||
assert cancelled_msg in suberr.args[0]
|
||||
case KeyboardInterrupt():
|
||||
pass
|
||||
|
||||
case KeyboardInterrupt():
|
||||
pass
|
||||
case _:
|
||||
pytest.fail(
|
||||
f'Unexpected error {suberr}'
|
||||
)
|
||||
|
||||
case _:
|
||||
pytest.fail(f'Unexpected error {suberr}')
|
||||
case KeyboardInterrupt():
|
||||
pass
|
||||
|
||||
elif cancel_method == 'services':
|
||||
|
||||
# XXX NOTE: oddly, when you pass --pdb to pytest, i think since
|
||||
# we also use that to enable the underlying tractor debug mode,
|
||||
# it causes this to not raise for some reason? So if you see
|
||||
# that while changing this test.. it's prolly that.
|
||||
|
||||
with pytest.raises(
|
||||
tractor.ContextCancelled
|
||||
) as exc_info:
|
||||
trio.run(main)
|
||||
|
||||
assert cancelled_msg in exc_info.value.args[0]
|
||||
# XXX: cancelling our own sub-service via
|
||||
# `Services.cancel_service()` is a *self*
|
||||
# requested cancel: modern `tractor` absorbs the
|
||||
# resulting `ContextCancelled` (canceller is our
|
||||
# own actor) so the runtime tears down gracefully
|
||||
# with NO error raised to the opener.
|
||||
trio.run(main)
|
||||
|
||||
else:
|
||||
pytest.fail(f'Test is broken due to {cancel_method}')
|
||||
|
|
@ -182,9 +227,9 @@ def test_ensure_ems_in_paper_actors(
|
|||
|
||||
) -> None:
|
||||
|
||||
actor_name: str = 'brokerd'
|
||||
backend: str = 'kraken'
|
||||
brokerd_name: str = f'{actor_name}.{backend}'
|
||||
datad_name: str = f'datad.{backend}'
|
||||
brokerd_name: str = f'brokerd.{backend}'
|
||||
|
||||
async def main():
|
||||
|
||||
|
|
@ -197,7 +242,9 @@ def test_ensure_ems_in_paper_actors(
|
|||
# ensure we timeout after is startup is too slow.
|
||||
# TODO: something like this should be our start point for
|
||||
# benchmarking end-to-end startup B)
|
||||
with trio.fail_after(9):
|
||||
# NOTE: includes a live (kraken) symbology fetch so
|
||||
# the budget needs some headroom for net latency..
|
||||
with trio.fail_after(19):
|
||||
async with (
|
||||
open_test_pikerd() as (_, _, _, services),
|
||||
|
||||
|
|
@ -226,15 +273,25 @@ def test_ensure_ems_in_paper_actors(
|
|||
|
||||
async with (
|
||||
ensure_service('emsd'),
|
||||
ensure_service(brokerd_name),
|
||||
ensure_service(datad_name),
|
||||
ensure_service(f'paperboi.{backend}'),
|
||||
):
|
||||
for name in pikerd_subservices:
|
||||
assert name in services.service_tasks
|
||||
|
||||
# brokerd.kraken actor should have been started
|
||||
# implicitly by the ems.
|
||||
assert brokerd_name in services.service_tasks
|
||||
# datad.kraken actor should have been
|
||||
# started implicitly by the feed layer.
|
||||
assert datad_name in services.service_tasks
|
||||
|
||||
# XXX: paper-mode sessions should NEVER
|
||||
# boot a (live, credentialed) `brokerd`;
|
||||
# only emsd's `open_brokerd_dialog()`
|
||||
# live-ep path is allowed to spawn it!
|
||||
assert (
|
||||
brokerd_name
|
||||
not in
|
||||
services.service_tasks
|
||||
)
|
||||
|
||||
print('ALL SERVICES STARTED, cancelling runtime with:\n'
|
||||
f'-> {cancel_method}')
|
||||
|
|
|
|||
Loading…
Reference in New Issue