diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 7c251568..bb01d33e 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -69,9 +69,15 @@ async def _setup_persistent_brokerd( ) -> None: ''' - Allocate a actor-wide service nursery in ``brokerd`` - such that feeds can be run in the background persistently by - the broker backend as needed. + Trading-only daemon (lifetime) fixture: console logging + setup and a pinned-open context for service mgmt. + + All data-feed-bus state now lives in the (data-feed-only) + `datad.` sibling daemon, see + `piker.data._daemon._setup_persistent_datad()`; this + actor hosts only the backend's `open_trade_dialog()` + (live order-control) ep-task(s) which manage their own + task trees per `tractor.Context`. ''' # NOTE: we only need to setup logging once (and only) here @@ -87,46 +93,12 @@ async def _setup_persistent_brokerd( ) assert log.name == _util.subsys - # further, set the log level on any broker broker specific - # logger instance. + # unblock caller + await ctx.started() - from piker.data import feed - assert not feed._bus - - # allocate a nursery to the bus for spawning background - # tasks to service client IPC requests, normally - # `tractor.Context` connections to explicitly required - # `brokerd` endpoints such as: - # - `stream_quotes()`, - # - `manage_history()`, - # - `allocate_persistent_feed()`, - # - `open_symbol_search()` - # NOTE: see ep invocation details inside `.data.feed`. - try: - async with ( - # tractor.trionics.collapse_eg(), - trio.open_nursery() as service_nursery - ): - bus: _FeedsBus = feed.get_feed_bus( - brokername, - service_nursery, - ) - assert bus is feed._bus - - # unblock caller - await ctx.started() - - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() - - except eg.ExceptionGroup: - # TODO: likely some underlying `brokerd` IPC connection - # broke so here we handle a respawn and re-connect attempt! - # This likely should pair with development of the OCO task - # nusery in dev over @ `tractor` B) - # https://github.com/goodboy/tractor/pull/363 - raise + # we pin this task to keep the daemon active until the + # parent actor decides to tear it down + await trio.sleep_forever() def broker_init( diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 45c5c41c..a81b3ff1 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -35,7 +35,7 @@ from piker.log import ( get_logger, ) from ..service import ( - maybe_spawn_brokerd, + maybe_spawn_datad, maybe_open_pikerd, ) from ..brokers import ( @@ -187,7 +187,7 @@ def brokercheck(config, broker): ''' async def bcheck_main(): - async with maybe_spawn_brokerd(broker) as portal: + async with maybe_spawn_datad(broker) as portal: await portal.run(run_test, broker) await portal.cancel_actor() @@ -317,7 +317,7 @@ def record(config, rate, name, dhost, filename): return async def main(tries): - async with maybe_spawn_brokerd( + async with maybe_spawn_datad( tries=tries, loglevel=loglevel ) as portal: # run app "main" diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 60623f85..937f384c 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -30,7 +30,7 @@ import trio from piker.log import get_logger from . import get_brokermod -from ..service import maybe_spawn_brokerd +from ..service import maybe_spawn_datad from . import open_cached_client from ..accounting import MktPair @@ -172,7 +172,7 @@ async def symbol_search( # await tractor.devx._debug.maybe_init_greenback() # tractor.pause_from_sync() - async with maybe_spawn_brokerd( + async with maybe_spawn_datad( mod.name, infect_asyncio=getattr( mod, diff --git a/piker/data/feed.py b/piker/data/feed.py index f10d66e7..d31d58a3 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -17,7 +17,7 @@ ''' Data feed apis and infra. -This module is enabled for ``brokerd`` daemons and includes mostly +This module is enabled for ``datad`` daemons and includes mostly endpoints and middleware to support our real-time, provider agnostic, live market quotes layer. Historical data loading and processing is also initiated in parts of the feed bus startup but business logic and @@ -54,8 +54,11 @@ from piker.accounting import ( ) from piker.types import Struct from piker.brokers import get_brokermod -from piker.service import ( - maybe_spawn_brokerd, +# NOTE: must be a "relative-direct" import (NOT via +# `piker.service`) to avoid a partial-init cycle when this +# mod is loaded as part of `piker.service.__init__`. +from ._daemon import ( + maybe_spawn_datad, ) from piker.calc import humanize from ._util import ( @@ -110,7 +113,7 @@ class _FeedsBus(Struct): ''' Data feeds broadcaster and persistence management. - This is a brokerd side api used to manager persistent real-time + This is a datad side api used to manager persistent real-time streams that can be allocated and left alive indefinitely. A bus is associated one-to-one with a particular broker backend where the "bus" refers so a multi-symbol bus where quotes are interleaved in @@ -249,7 +252,7 @@ async def allocate_persistent_feed( ''' Create and maintain a "feed bus" which allocates tasks for real-time streaming and optional historical data storage per broker/data provider - backend; this normally task runs *in* a `brokerd` actor. + backend; this normally task runs *in* a `datad` actor. If none exists, this allocates a ``_FeedsBus`` which manages the lifetimes of streaming tasks created for each requested symbol. @@ -318,8 +321,8 @@ async def allocate_persistent_feed( # at max capacity. # - the same ideas ^ but when a local core is maxxed out (like how # binance does often with hft XD - # - if a brokerd is non-local then we can't just allocate a mem - # channel here and have the brokerd write it, we instead need + # - if a datad is non-local then we can't just allocate a mem + # channel here and have the datad write it, we instead need # a small streaming machine around the remote feed which can then # do the normal work of sampling and writing shm buffers # (depending on if we want sampling done on the far end or not?) @@ -499,7 +502,7 @@ async def open_feed_bus( # (after we also group them in a nice `/dev/shm/piker/` subdir). # ensure we are who we think we are servicename = tractor.current_actor().name - assert 'brokerd' in servicename + assert 'datad' in servicename assert brokername in servicename bus: _FeedsBus = get_feed_bus(brokername) @@ -509,12 +512,12 @@ async def open_feed_bus( for symbol in symbols: # if no cached feed for this symbol has been created for this - # brokerd yet, start persistent stream and shm writer task in + # datad yet, start persistent stream and shm writer task in # service nursery flume = bus.feeds.get(symbol) if flume is None: # allocate a new actor-local stream bus which - # will persist for this `brokerd`'s service lifetime. + # will persist for this `datad`'s service lifetime. async with bus.task_lock: await bus.nursery.start( partial( @@ -721,7 +724,7 @@ class Feed(Struct): mods = {name: self.mods[name] for name in brokers} if len(mods) == 1: - # just pass the brokerd stream directly if only one provider + # just pass the datad stream directly if only one provider # was detected. stream = self.streams[list(brokers)[0]] async with stream.subscribe() as bstream: @@ -763,7 +766,7 @@ class Feed(Struct): @acm -async def install_brokerd_search( +async def install_datad_search( portal: tractor.Portal, brokermod: ModuleType, @@ -812,7 +815,7 @@ async def maybe_open_feed( ReceiveChannel[dict[str, Any]], ): ''' - Maybe open a data to a ``brokerd`` daemon only if there is no + Maybe open a data feed to a ``datad`` daemon only if there is no local one for the broker-symbol pair, if one is cached use it wrapped in a tractor broadcast receiver. @@ -885,14 +888,14 @@ async def open_feed( providers.setdefault(mod, []).append(bs_fqme) feed.mods[mod.name] = mod - # one actor per brokerd for now - brokerd_ctxs = [] + # one actor per datad for now + datad_ctxs = [] for brokermod, bfqmes in providers.items(): - # if no `brokerd` for this backend exists yet we spawn + # if no `datad` for this backend exists yet we spawn # a daemon actor for it. - brokerd_ctxs.append( - maybe_spawn_brokerd( + datad_ctxs.append( + maybe_spawn_datad( brokermod.name, loglevel=loglevel ) @@ -900,7 +903,7 @@ async def open_feed( portals: tuple[tractor.Portal] async with trionics.gather_contexts( - brokerd_ctxs, + datad_ctxs, ) as portals: bus_ctxs: list[AsyncContextManager] = [] @@ -937,9 +940,9 @@ async def open_feed( tick_throttle=tick_throttle, # XXX: super important to avoid - # the brokerd from some other + # the datad from some other # backend overruning the task here - # bc some other brokerd took longer + # bc some other datad took longer # to startup before we hit the `.open_stream()` # loop below XD .. really we should try to do each # of these stream open sequences sequentially per @@ -1008,7 +1011,7 @@ async def open_feed( assert stream feed.streams[brokermod.name] = stream - # apply `brokerd`-common stream to each flume + # apply `datad`-common stream to each flume # tracking a live market feed from that provider. for fqme, flume in feed.flumes.items(): if brokermod.name == flume.mkt.broker: diff --git a/piker/ui/_app.py b/piker/ui/_app.py index e47a9313..5dae0dd7 100644 --- a/piker/ui/_app.py +++ b/piker/ui/_app.py @@ -32,12 +32,12 @@ from . import _event from . import _search from ..accounting import unpack_fqme from ..data._symcache import open_symcache -from ..data.feed import install_brokerd_search +from ..data.feed import install_datad_search from ..log import ( get_logger, get_console_log, ) -from ..service import maybe_spawn_brokerd +from ..service import maybe_spawn_datad from ._exec import run_qtractor log = get_logger(__name__) @@ -50,16 +50,16 @@ async def load_provider_search( ) -> None: name = brokermod.name - log.info(f'loading brokerd for {name}..') + log.info(f'loading datad for {name}..') async with ( - maybe_spawn_brokerd( + maybe_spawn_datad( name, loglevel=loglevel ) as portal, - install_brokerd_search( + install_datad_search( portal, brokermod, ), diff --git a/piker/ui/cli.py b/piker/ui/cli.py index fec29afe..edeba8e8 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -27,7 +27,7 @@ from ..cli import ( load_trans_eps, ) from .. import watchlists as wl -from ..service import maybe_spawn_brokerd +from ..service import maybe_spawn_datad _config_dir = click.get_app_dir('piker') @@ -69,7 +69,7 @@ def monitor(config, rate, name, dhost, test, tl): from .kivy.monitor import _async_main async def main(): - async with maybe_spawn_brokerd( + async with maybe_spawn_datad( brokername=brokermod.name, loglevel=loglevel ) as portal: @@ -118,7 +118,7 @@ def optschain( from .kivy.option_chain import _async_main async def main(): - async with maybe_spawn_brokerd( + async with maybe_spawn_datad( loglevel=loglevel ): # run app "main" diff --git a/piker/ui/kivy/option_chain.py b/piker/ui/kivy/option_chain.py index 55161339..1cd372fd 100644 --- a/piker/ui/kivy/option_chain.py +++ b/piker/ui/kivy/option_chain.py @@ -495,7 +495,7 @@ async def _async_main( async with trio.open_nursery() as nursery: # get a portal to the data feed daemon - async with tractor.wait_for_actor('brokerd') as portal: + async with tractor.wait_for_actor('datad') as portal: # set up a pager view for large ticker lists chain = await new_chain_ui( diff --git a/tests/test_services.py b/tests/test_services.py index abbe6c6b..345bc017 100644 --- a/tests/test_services.py +++ b/tests/test_services.py @@ -108,14 +108,14 @@ def test_ensure_datafeed_actors( ) -> None: ''' - Verify that booting a data feed starts a `brokerd` + Verify that booting a data feed starts a `datad` actor and a singleton global `samplerd` and opening an order mode in paper opens the `paperboi` service. ''' - actor_name: str = 'brokerd' + actor_name: str = 'datad' backend: str = 'kraken' - brokerd_name: str = f'{actor_name}.{backend}' + datad_name: str = f'{actor_name}.{backend}' async def main(): async with ( @@ -130,7 +130,7 @@ def test_ensure_datafeed_actors( await feed.pause() async with ( - ensure_service(brokerd_name), + ensure_service(datad_name), ensure_service('samplerd'), ): await trio.sleep(0.1) @@ -227,9 +227,9 @@ def test_ensure_ems_in_paper_actors( ) -> None: - actor_name: str = 'brokerd' backend: str = 'kraken' - brokerd_name: str = f'{actor_name}.{backend}' + datad_name: str = f'datad.{backend}' + brokerd_name: str = f'brokerd.{backend}' async def main(): @@ -273,15 +273,25 @@ def test_ensure_ems_in_paper_actors( async with ( ensure_service('emsd'), - ensure_service(brokerd_name), + ensure_service(datad_name), ensure_service(f'paperboi.{backend}'), ): for name in pikerd_subservices: assert name in services.service_tasks - # brokerd.kraken actor should have been started - # implicitly by the ems. - assert brokerd_name in services.service_tasks + # datad.kraken actor should have been + # started implicitly by the feed layer. + assert datad_name in services.service_tasks + + # XXX: paper-mode sessions should NEVER + # boot a (live, credentialed) `brokerd`; + # only emsd's `open_brokerd_dialog()` + # live-ep path is allowed to spawn it! + assert ( + brokerd_name + not in + services.service_tasks + ) print('ALL SERVICES STARTED, cancelling runtime with:\n' f'-> {cancel_method}')