diff --git a/piker/_daemon.py b/piker/_daemon.py index f4acf9f3..f1ced6e9 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -41,6 +41,9 @@ from .log import ( ) from .brokers import get_brokermod +from pprint import pformat +from functools import partial + log = get_logger(__name__) @@ -313,6 +316,9 @@ async def open_piker_runtime( @acm async def open_pikerd( + tsdb: bool, + es: bool, + loglevel: str | None = None, # XXX: you should pretty much never want debug mode @@ -349,12 +355,54 @@ async def open_pikerd( ): assert root_actor.accept_addr == reg_addr + if tsdb: + from piker.data._ahab import start_ahab + from piker.data.marketstore import start_marketstore + + log.info('Spawning `marketstore` supervisor') + ctn_ready, config, (cid, pid) = await service_nursery.start( + start_ahab, + 'marketstored', + start_marketstore, + + ) + log.info( + f'`marketstored` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + + if es: + from piker.data._ahab import start_ahab + from piker.data.elastic import start_elasticsearch + + log.info('Spawning `elasticsearch` supervisor') + ctn_ready, config, (cid, pid) = await service_nursery.start( + partial( + start_ahab, + 'elasticsearch', + start_elasticsearch, + start_timeout=30.0 + ) + ) + + log.info( + f'`elasticsearch` up!\n' + f'pid: {pid}\n' + f'container id: {cid[:12]}\n' + f'config: {pformat(config)}' + ) + # assign globally for future daemon/task creation Services.actor_n = actor_nursery Services.service_n = service_nursery Services.debug_mode = debug_mode + + try: yield Services + finally: # TODO: is this more clever/efficient? # if 'samplerd' in Services.service_tasks: @@ -388,6 +436,8 @@ async def maybe_open_runtime( @acm async def maybe_open_pikerd( + tsdb: bool = False, + es: bool = False, loglevel: Optional[str] = None, registry_addr: None | tuple = None, @@ -436,6 +486,8 @@ async def maybe_open_pikerd( # presume pikerd role since no daemon could be found at # configured address async with open_pikerd( + tsdb=tsdb, + es=es, loglevel=loglevel, debug_mode=kwargs.get('debug_mode', False), registry_addr=registry_addr, diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 5bc6b2f4..9b6f225c 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -90,6 +90,8 @@ def pikerd( async def main(): async with ( open_pikerd( + tsdb=tsdb, + es=es, loglevel=loglevel, debug_mode=pdb, registry_addr=reg_addr, @@ -97,44 +99,6 @@ def pikerd( ), # normally delivers a ``Services`` handle trio.open_nursery() as n, ): - if tsdb: - from piker.data._ahab import start_ahab - from piker.data.marketstore import start_marketstore - - log.info('Spawning `marketstore` supervisor') - ctn_ready, config, (cid, pid) = await n.start( - start_ahab, - 'marketstored', - start_marketstore, - - ) - log.info( - f'`marketstored` up!\n' - f'pid: {pid}\n' - f'container id: {cid[:12]}\n' - f'config: {pformat(config)}' - ) - - if es: - from piker.data._ahab import start_ahab - from piker.data.elasticsearch import start_elasticsearch - - log.info('Spawning `elasticsearch` supervisor') - ctn_ready, config, (cid, pid) = await n.start( - partial( - start_ahab, - 'elasticsearch', - start_elasticsearch, - start_timeout=30.0 - ) - ) - - log.info( - f'`elasticsearch` up!\n' - f'pid: {pid}\n' - f'container id: {cid[:12]}\n' - f'config: {pformat(config)}' - ) await trio.sleep_forever() @@ -241,7 +205,7 @@ def services(config, tl, ports): def _load_clis() -> None: from ..data import marketstore # noqa - from ..data import elasticsearch + from ..data import elastic from ..data import cli # noqa from ..brokers import cli # noqa from ..ui import cli # noqa