Pass `loglevel` down through `.data` callstack

Add `loglevel` param propagation across the data feed and sampling
subsystems to enable proper console log setup in downstream (distibuted)
subactor tasks. This ensures sampler and history-mgmt tasks receive the
same loglevel as their parent `.data.feed` tasks.

Deats,
- add `loglevel: str|None` param to `register_with_sampler()`,
  `maybe_open_samplerd()`, and `open_sample_stream()`.
- pass `loglevel` through to `get_console_log()` in
  `register_with_sampler()` with fallback to actor `loglevel`.
- use `partial()` in `allocate_persistent_feed()` to pass
  `loglevel` to `manage_history()` at task-start.
- add `loglevel` param to `manage_history()` with default
  `'warning'` and pass through to `open_sample_stream()` from there.
- capture `loglevel` var in `brokers.cli.search()` and pass to
  `symbol_search()` call.

Also,
- drop blank lines in fn sigs for consistency with piker style.
- add debug bp in `open_feed()` when `loglevel != 'info'`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
Gud Boi 2026-02-11 19:56:14 -05:00
parent e08fb267f8
commit e01ba6e31e
4 changed files with 44 additions and 18 deletions

View File

@ -481,11 +481,12 @@ def search(
# the `piker --pdb` XD ..
# -[ ] pull from the parent click ctx's values..dumdum
# assert pdb
loglevel: str = config['loglevel']
# define tractor entrypoint
async def main(func):
async with maybe_open_pikerd(
loglevel=config['loglevel'],
loglevel=loglevel,
debug_mode=pdb,
):
return await func()
@ -498,6 +499,7 @@ def search(
core.symbol_search,
brokermods,
pattern,
loglevel=loglevel,
),
)

View File

@ -336,10 +336,18 @@ async def register_with_sampler(
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates?
loglevel: str|None = None,
) -> set[int]:
get_console_log(tractor.current_actor().loglevel)
get_console_log(
level=(
loglevel
or
tractor.current_actor().loglevel
),
name=__name__,
)
incr_was_started: bool = False
try:
@ -476,6 +484,7 @@ async def spawn_samplerd(
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
loglevel=loglevel,
)
return True
@ -484,7 +493,6 @@ async def spawn_samplerd(
@acm
async def maybe_open_samplerd(
loglevel: str|None = None,
**pikerd_kwargs,
@ -513,10 +521,10 @@ async def open_sample_stream(
shms_by_period: dict[float, dict]|None = None,
open_index_stream: bool = True,
sub_for_broadcasts: bool = True,
loglevel: str|None = None,
cache_key: str|None = None,
allow_new_sampler: bool = True,
# cache_key: str|None = None,
# allow_new_sampler: bool = True,
ensure_is_active: bool = False,
) -> AsyncIterator[dict[str, float]]:
@ -551,7 +559,9 @@ async def open_sample_stream(
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
maybe_open_samplerd() as portal,
maybe_open_samplerd(
loglevel=loglevel,
) as portal,
portal.open_context(
register_with_sampler,
@ -560,6 +570,7 @@ async def open_sample_stream(
'shms_by_period': shms_by_period,
'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts,
'loglevel': loglevel,
},
) as (ctx, shm_periods)
):

View File

@ -239,7 +239,6 @@ async def allocate_persistent_feed(
brokername: str,
symstr: str,
loglevel: str,
start_stream: bool = True,
init_timeout: float = 616,
@ -348,11 +347,14 @@ async def allocate_persistent_feed(
izero_rt,
rt_shm,
) = await bus.nursery.start(
partial(
manage_history,
mod,
mkt,
some_data_ready,
feed_is_live,
mod=mod,
mkt=mkt,
some_data_ready=some_data_ready,
feed_is_live=feed_is_live,
loglevel=loglevel,
)
)
# yield back control to starting nursery once we receive either
@ -460,7 +462,6 @@ async def allocate_persistent_feed(
@tractor.context
async def open_feed_bus(
ctx: tractor.Context,
brokername: str,
symbols: list[str], # normally expected to the broker-specific fqme
@ -818,6 +819,11 @@ async def maybe_open_feed(
'''
fqme = fqmes[0]
# if (
# loglevel != 'info'
# ):
# await tractor.pause()
async with trionics.maybe_open_context(
acm_func=open_feed,
kwargs={
@ -884,9 +890,12 @@ async def open_feed(
providers.setdefault(mod, []).append(bs_fqme)
feed.mods[mod.name] = mod
if (
loglevel != 'info'
):
await tractor.pause()
# one actor per brokerd for now
brokerd_ctxs = []
for brokermod, bfqmes in providers.items():
# if no `brokerd` for this backend exists yet we spawn

View File

@ -63,8 +63,10 @@ from ..data._sharedmem import (
maybe_open_shm_array,
ShmArray,
)
from ..data._source import def_iohlcv_fields
from ..data._sampling import (
from piker.data._source import (
def_iohlcv_fields,
)
from piker.data._sampling import (
open_sample_stream,
)
@ -1322,6 +1324,7 @@ async def manage_history(
mkt: MktPair,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
loglevel: str = 'warning',
timeframe: float = 60, # in seconds
wait_for_live_timeout: float = 0.5,
@ -1499,6 +1502,7 @@ async def manage_history(
# data feed layer that needs to consume it).
open_index_stream=True,
sub_for_broadcasts=False,
loglevel=loglevel,
) as sample_stream:
# register 1s and 1m buffers with the global