From aa403bd390d07cf60537d461a1b80c0788d080cf Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 11 Feb 2026 19:56:14 -0500 Subject: [PATCH] Pass `loglevel` down through `.data` callstack Add `loglevel` param propagation across the data feed and sampling subsystems to enable proper console log setup in downstream (distibuted) subactor tasks. This ensures sampler and history-mgmt tasks receive the same loglevel as their parent `.data.feed` tasks. Deats, - add `loglevel: str|None` param to `register_with_sampler()`, `maybe_open_samplerd()`, and `open_sample_stream()`. - pass `loglevel` through to `get_console_log()` in `register_with_sampler()` with fallback to actor `loglevel`. - use `partial()` in `allocate_persistent_feed()` to pass `loglevel` to `manage_history()` at task-start. - add `loglevel` param to `manage_history()` with default `'warning'` and pass through to `open_sample_stream()` from there. - capture `loglevel` var in `brokers.cli.search()` and pass to `symbol_search()` call. Also, - drop blank lines in fn sigs for consistency with piker style. - add debug bp in `open_feed()` when `loglevel != 'info'`. (this commit msg was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/brokers/cli.py | 4 +++- piker/data/_sampling.py | 23 +++++++++++++++++------ piker/data/feed.py | 27 ++++++++++++++++++--------- piker/tsp/_history.py | 8 ++++++-- 4 files changed, 44 insertions(+), 18 deletions(-) diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 96f4e2de..45c5c41c 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -481,11 +481,12 @@ def search( # the `piker --pdb` XD .. # -[ ] pull from the parent click ctx's values..dumdum # assert pdb + loglevel: str = config['loglevel'] # define tractor entrypoint async def main(func): async with maybe_open_pikerd( - loglevel=config['loglevel'], + loglevel=loglevel, debug_mode=pdb, ): return await func() @@ -498,6 +499,7 @@ def search( core.symbol_search, brokermods, pattern, + loglevel=loglevel, ), ) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index f8a0ec27..74ecf114 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -336,10 +336,18 @@ async def register_with_sampler( open_index_stream: bool = True, # open a 2way stream for sample step msgs? sub_for_broadcasts: bool = True, # sampler side to send step updates? + loglevel: str|None = None, ) -> set[int]: - get_console_log(tractor.current_actor().loglevel) + get_console_log( + level=( + loglevel + or + tractor.current_actor().loglevel + ), + name=__name__, + ) incr_was_started: bool = False try: @@ -476,6 +484,7 @@ async def spawn_samplerd( register_with_sampler, period_s=1, sub_for_broadcasts=False, + loglevel=loglevel, ) return True @@ -484,7 +493,6 @@ async def spawn_samplerd( @acm async def maybe_open_samplerd( - loglevel: str|None = None, **pikerd_kwargs, @@ -513,10 +521,10 @@ async def open_sample_stream( shms_by_period: dict[float, dict]|None = None, open_index_stream: bool = True, sub_for_broadcasts: bool = True, + loglevel: str|None = None, - cache_key: str|None = None, - allow_new_sampler: bool = True, - + # cache_key: str|None = None, + # allow_new_sampler: bool = True, ensure_is_active: bool = False, ) -> AsyncIterator[dict[str, float]]: @@ -551,7 +559,9 @@ async def open_sample_stream( # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes - maybe_open_samplerd() as portal, + maybe_open_samplerd( + loglevel=loglevel, + ) as portal, portal.open_context( register_with_sampler, @@ -560,6 +570,7 @@ async def open_sample_stream( 'shms_by_period': shms_by_period, 'open_index_stream': open_index_stream, 'sub_for_broadcasts': sub_for_broadcasts, + 'loglevel': loglevel, }, ) as (ctx, shm_periods) ): diff --git a/piker/data/feed.py b/piker/data/feed.py index 89b360ba..f096ff5d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -239,7 +239,6 @@ async def allocate_persistent_feed( brokername: str, symstr: str, - loglevel: str, start_stream: bool = True, init_timeout: float = 616, @@ -348,11 +347,14 @@ async def allocate_persistent_feed( izero_rt, rt_shm, ) = await bus.nursery.start( - manage_history, - mod, - mkt, - some_data_ready, - feed_is_live, + partial( + manage_history, + mod=mod, + mkt=mkt, + some_data_ready=some_data_ready, + feed_is_live=feed_is_live, + loglevel=loglevel, + ) ) # yield back control to starting nursery once we receive either @@ -460,7 +462,6 @@ async def allocate_persistent_feed( @tractor.context async def open_feed_bus( - ctx: tractor.Context, brokername: str, symbols: list[str], # normally expected to the broker-specific fqme @@ -802,7 +803,7 @@ async def install_brokerd_search( @acm async def maybe_open_feed( fqmes: list[str], - loglevel: str | None = None, + loglevel: str|None = None, **kwargs, @@ -818,6 +819,11 @@ async def maybe_open_feed( ''' fqme = fqmes[0] + # if ( + # loglevel != 'info' + # ): + # await tractor.pause() + async with trionics.maybe_open_context( acm_func=open_feed, kwargs={ @@ -884,9 +890,12 @@ async def open_feed( providers.setdefault(mod, []).append(bs_fqme) feed.mods[mod.name] = mod + if ( + loglevel != 'info' + ): + await tractor.pause() # one actor per brokerd for now brokerd_ctxs = [] - for brokermod, bfqmes in providers.items(): # if no `brokerd` for this backend exists yet we spawn diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py index 86f60610..99261342 100644 --- a/piker/tsp/_history.py +++ b/piker/tsp/_history.py @@ -63,8 +63,10 @@ from ..data._sharedmem import ( maybe_open_shm_array, ShmArray, ) -from ..data._source import def_iohlcv_fields -from ..data._sampling import ( +from piker.data._source import ( + def_iohlcv_fields, +) +from piker.data._sampling import ( open_sample_stream, ) @@ -1322,6 +1324,7 @@ async def manage_history( mkt: MktPair, some_data_ready: trio.Event, feed_is_live: trio.Event, + loglevel: str = 'warning', timeframe: float = 60, # in seconds wait_for_live_timeout: float = 0.5, @@ -1499,6 +1502,7 @@ async def manage_history( # data feed layer that needs to consume it). open_index_stream=True, sub_for_broadcasts=False, + loglevel=loglevel, ) as sample_stream: # register 1s and 1m buffers with the global