diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index c7ba675d..b679a632 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -19,7 +19,7 @@ CLI commons. ''' import os -from contextlib import AsyncExitStack +# from contextlib import AsyncExitStack from types import ModuleType import click @@ -43,88 +43,159 @@ log = get_logger('piker.cli') @click.command() -@click.option('--loglevel', '-l', default='warning', help='Logging level') -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--pdb', is_flag=True, help='Enable tractor debug mode') -@click.option('--host', '-h', default=None, help='Host addr to bind') -@click.option('--port', '-p', default=None, help='Port number to bind') @click.option( - '--tsdb', - is_flag=True, - help='Enable local ``marketstore`` instance' + '--loglevel', + '-l', + default='warning', + help='Logging level', ) @click.option( - '--es', + '--tl', is_flag=True, - help='Enable local ``elasticsearch`` instance' + help='Enable tractor-runtime logs', ) +@click.option( + '--pdb', + is_flag=True, + help='Enable tractor debug mode', +) +@click.option( + '--maddr', + '-m', + default=None, + help='Multiaddrs to bind or contact', +) +# @click.option( +# '--tsdb', +# is_flag=True, +# help='Enable local ``marketstore`` instance' +# ) +# @click.option( +# '--es', +# is_flag=True, +# help='Enable local ``elasticsearch`` instance' +# ) def pikerd( + maddr: str | None, loglevel: str, - host: str, - port: int, tl: bool, pdb: bool, - tsdb: bool, - es: bool, + # tsdb: bool, + # es: bool, ): ''' Spawn the piker broker-daemon. ''' - log = get_console_log(loglevel, name='cli') + from cornerboi._debug import open_crash_handler + with open_crash_handler(): + log = get_console_log(loglevel, name='cli') - if pdb: - log.warning(( - "\n" - "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" - "When a `piker` daemon crashes it will block the " - "task-thread until resumed from console!\n" - "\n" - )) + if pdb: + log.warning(( + "\n" + "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" + "When a `piker` daemon crashes it will block the " + "task-thread until resumed from console!\n" + "\n" + )) - reg_addr: None | tuple[str, int] = None - if host or port: - reg_addr = ( - host or _default_registry_host, - int(port) or _default_registry_port, + # service-actor registry endpoint socket-address + regaddrs: list[tuple[str, int]] | None = None + + conf, _ = config.load( + conf_name='conf', ) + network: dict = conf.get('network') + if network is None: + regaddrs = [( + _default_registry_host, + _default_registry_port, + )] - from .. import service + from .. import service + from ..service._multiaddr import parse_addr - async def main(): - service_mngr: service.Services + # transport-oriented endpoint multi-addresses + eps: dict[ + str, # service name, eg. `pikerd`, `emsd`.. - async with ( - service.open_pikerd( - loglevel=loglevel, - debug_mode=pdb, - registry_addr=reg_addr, + # libp2p style multi-addresses parsed into prot layers + list[dict[str, str | int]] + ] = {} - ) as service_mngr, # normally delivers a ``Services`` handle - - AsyncExitStack() as stack, + if ( + not maddr + and network ): - if tsdb: - dname, conf = await stack.enter_async_context( - service.marketstore.start_ahab_daemon( - service_mngr, - loglevel=loglevel, - ) - ) - log.info(f'TSDB `{dname}` up with conf:\n{conf}') + # load network section and (attempt to) connect all endpoints + # which are reachable B) + for key, maddrs in network.items(): + match key: - if es: - dname, conf = await stack.enter_async_context( - service.elastic.start_ahab_daemon( - service_mngr, - loglevel=loglevel, - ) - ) - log.info(f'DB `{dname}` up with conf:\n{conf}') + # TODO: resolve table across multiple discov + # prots Bo + case 'resolv': + pass - await trio.sleep_forever() + case 'pikerd': + dname: str = key + for maddr in maddrs: + layers: dict = parse_addr(maddr) + eps.setdefault( + dname, + [], + ).append(layers) - trio.run(main) + else: + # presume user is manually specifying the root actor ep. + eps['pikerd'] = [parse_addr(maddr)] + + regaddrs: list[tuple[str, int]] = [] + for layers in eps['pikerd']: + regaddrs.append(( + layers['ipv4']['addr'], + layers['tcp']['port'], + )) + + async def main(): + service_mngr: service.Services + + async with ( + service.open_pikerd( + registry_addrs=regaddrs, + loglevel=loglevel, + debug_mode=pdb, + + ) as service_mngr, # normally delivers a ``Services`` handle + + # AsyncExitStack() as stack, + ): + # TODO: spawn all other sub-actor daemons according to + # multiaddress endpoint spec defined by user config + assert service_mngr + + # if tsdb: + # dname, conf = await stack.enter_async_context( + # service.marketstore.start_ahab_daemon( + # service_mngr, + # loglevel=loglevel, + # ) + # ) + # log.info(f'TSDB `{dname}` up with conf:\n{conf}') + + # if es: + # dname, conf = await stack.enter_async_context( + # service.elastic.start_ahab_daemon( + # service_mngr, + # loglevel=loglevel, + # ) + # ) + # log.info(f'DB `{dname}` up with conf:\n{conf}') + + await trio.sleep_forever() + + trio.run(main) @click.group(context_settings=config._context_defaults) @@ -137,8 +208,8 @@ def pikerd( @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--configdir', '-c', help='Configuration directory') -@click.option('--host', '-h', default=None, help='Host addr to bind') -@click.option('--port', '-p', default=None, help='Port number to bind') +@click.option('--maddr', '-m', default=None, help='Multiaddr to bind') +@click.option('--raddr', '-r', default=None, help='Registrar addr to contact') @click.pass_context def cli( ctx: click.Context, @@ -146,8 +217,10 @@ def cli( loglevel: str, tl: bool, configdir: str, - host: str, - port: int, + + # TODO: make these list[str] with multiple -m maddr0 -m maddr1 + maddr: str, + raddr: str, ) -> None: if configdir is not None: @@ -168,12 +241,10 @@ def cli( } assert brokermods - reg_addr: None | tuple[str, int] = None - if host or port: - reg_addr = ( - host or _default_registry_host, - int(port) or _default_registry_port, - ) + regaddr: tuple[str, int] = ( + _default_registry_host, + _default_registry_port, + ) ctx.obj.update({ 'brokers': brokers, @@ -183,7 +254,7 @@ def cli( 'log': get_console_log(loglevel), 'confdir': config._config_dir, 'wl_path': config._watchlists_data_path, - 'registry_addr': reg_addr, + 'registry_addr': regaddr, }) # allow enabling same loglevel in ``tractor`` machinery @@ -230,7 +301,7 @@ def services(config, tl, ports): def _load_clis() -> None: - from ..service import elastic # noqa + # from ..service import elastic # noqa from ..brokers import cli # noqa from ..ui import cli # noqa from ..watchlists import cli # noqa diff --git a/piker/service/__init__.py b/piker/service/__init__.py index e6a17da0..95e39450 100644 --- a/piker/service/__init__.py +++ b/piker/service/__init__.py @@ -39,7 +39,7 @@ from ._actor_runtime import ( open_piker_runtime, maybe_open_pikerd, open_pikerd, - get_tractor_runtime_kwargs, + get_runtime_vars, ) from ..brokers._daemon import ( spawn_brokerd, @@ -58,5 +58,5 @@ __all__ = [ 'open_piker_runtime', 'maybe_open_pikerd', 'open_pikerd', - 'get_tractor_runtime_kwargs', + 'get_runtime_vars', ] diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index 78938a5f..3fcfbb01 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -45,7 +45,7 @@ from ._registry import ( # noqa ) -def get_tractor_runtime_kwargs() -> dict[str, Any]: +def get_runtime_vars() -> dict[str, Any]: ''' Deliver ``tractor`` related runtime variables in a `dict`. @@ -56,6 +56,8 @@ def get_tractor_runtime_kwargs() -> dict[str, Any]: @acm async def open_piker_runtime( name: str, + registry_addrs: list[tuple[str, int]], + enable_modules: list[str] = [], loglevel: Optional[str] = None, @@ -63,8 +65,6 @@ async def open_piker_runtime( # for data daemons when running in production. debug_mode: bool = False, - registry_addr: None | tuple[str, int] = None, - # TODO: once we have `rsyscall` support we will read a config # and spawn the service tree distributed per that. start_method: str = 'trio', @@ -74,7 +74,7 @@ async def open_piker_runtime( ) -> tuple[ tractor.Actor, - tuple[str, int], + list[tuple[str, int]], ]: ''' Start a piker actor who's runtime will automatically sync with @@ -90,15 +90,19 @@ async def open_piker_runtime( except tractor._exceptions.NoRuntime: tractor._state._runtime_vars[ - 'piker_vars'] = tractor_runtime_overrides + 'piker_vars' + ] = tractor_runtime_overrides - registry_addr = registry_addr or _default_reg_addr + registry_addrs = ( + registry_addrs + or [_default_reg_addr] + ) async with ( tractor.open_root_actor( # passed through to ``open_root_actor`` - arbiter_addr=registry_addr, + registry_addrs=registry_addrs, name=name, loglevel=loglevel, debug_mode=debug_mode, @@ -112,22 +116,27 @@ async def open_piker_runtime( **tractor_kwargs, ) as _, - open_registry(registry_addr, ensure_exists=False) as addr, + open_registry( + registry_addrs, + ensure_exists=False, + ) as addrs, ): yield ( tractor.current_actor(), - addr, + addrs, ) else: - async with open_registry(registry_addr) as addr: + async with open_registry( + registry_addrs + ) as addrs: yield ( actor, - addr, + addrs, ) -_root_dname = 'pikerd' -_root_modules = [ +_root_dname: str = 'pikerd' +_root_modules: list[str] = [ __name__, 'piker.service._daemon', 'piker.brokers._daemon', @@ -141,13 +150,13 @@ _root_modules = [ @acm async def open_pikerd( + registry_addrs: list[tuple[str, int]], loglevel: str | None = None, # XXX: you should pretty much never want debug mode # for data daemons when running in production. debug_mode: bool = False, - registry_addr: None | tuple[str, int] = None, **kwargs, @@ -169,19 +178,23 @@ async def open_pikerd( enable_modules=_root_modules, loglevel=loglevel, debug_mode=debug_mode, - registry_addr=registry_addr, + registry_addrs=registry_addrs, **kwargs, - ) as (root_actor, reg_addr), + ) as ( + root_actor, + reg_addrs, + ), tractor.open_nursery() as actor_nursery, trio.open_nursery() as service_nursery, ): - if root_actor.accept_addr != reg_addr: - raise RuntimeError( - f'`pikerd` failed to bind on {reg_addr}!\n' - 'Maybe you have another daemon already running?' - ) + for addr in reg_addrs: + if addr not in root_actor.accept_addrs: + raise RuntimeError( + f'`pikerd` failed to bind on {addr}!\n' + 'Maybe you have another daemon already running?' + ) # assign globally for future daemon/task creation Services.actor_n = actor_nursery @@ -225,9 +238,9 @@ async def open_pikerd( @acm async def maybe_open_pikerd( - loglevel: Optional[str] = None, - registry_addr: None | tuple = None, + registry_addrs: list[tuple[str, int]] | None = None, + loglevel: str | None = None, **kwargs, ) -> tractor._portal.Portal | ClassVar[Services]: @@ -253,17 +266,20 @@ async def maybe_open_pikerd( # async with open_portal(chan) as arb_portal: # yield arb_portal + registry_addrs = registry_addrs or [_default_reg_addr] + async with ( open_piker_runtime( name=query_name, - registry_addr=registry_addr, + registry_addrs=registry_addrs, loglevel=loglevel, **kwargs, ) as _, tractor.find_actor( _root_dname, - arbiter_sockaddr=registry_addr, + registry_addrs=registry_addrs, + only_first=True, ) as portal ): # connect to any existing daemon presuming @@ -278,7 +294,7 @@ async def maybe_open_pikerd( # configured address async with open_pikerd( loglevel=loglevel, - registry_addr=registry_addr, + registry_addrs=registry_addrs, # passthrough to ``tractor`` init **kwargs, diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index df94a992..1e7ff096 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -70,7 +70,10 @@ async def maybe_spawn_daemon( lock = Services.locks[service_name] await lock.acquire() - async with find_service(service_name) as portal: + async with find_service( + service_name, + registry_addrs=[('127.0.0.1', 6116)], + ) as portal: if portal is not None: lock.release() yield portal diff --git a/piker/service/_multiaddr.py b/piker/service/_multiaddr.py new file mode 100644 index 00000000..04e6f473 --- /dev/null +++ b/piker/service/_multiaddr.py @@ -0,0 +1,142 @@ +# piker: trading gear for hackers +# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Multiaddress parser and utils according the spec(s) defined by +`libp2p` and used in dependent project such as `ipfs`: + +- https://docs.libp2p.io/concepts/fundamentals/addressing/ +- https://github.com/libp2p/specs/blob/master/addressing/README.md + +''' +from typing import Iterator + +from bidict import bidict + +# TODO: see if we can leverage libp2p ecosys projects instead of +# rolling our own (parser) impls of the above addressing specs: +# - https://github.com/libp2p/py-libp2p +# - https://docs.libp2p.io/concepts/nat/circuit-relay/#relay-addresses +# prots: bidict[int, str] = bidict({ +prots: bidict[int, str] = { + 'ipv4': 3, + 'ipv6': 3, + 'wg': 3, + + 'tcp': 4, + 'udp': 4, + + # TODO: support the next-gen shite Bo + # 'quic': 4, + # 'ssh': 7, # via rsyscall bootstrapping +} + +prot_params: dict[str, tuple[str]] = { + 'ipv4': ('addr',), + 'ipv6': ('addr',), + 'wg': ('addr', 'port', 'pubkey'), + + 'tcp': ('port',), + 'udp': ('port',), + + # 'quic': ('port',), + # 'ssh': ('port',), +} + + +def iter_prot_layers( + multiaddr: str, +) -> Iterator[ + tuple[ + int, + list[str] + ] +]: + ''' + Unpack a libp2p style "multiaddress" into multiple "segments" + for each "layer" of the protocoll stack (in OSI terms). + + ''' + tokens: list[str] = multiaddr.split('/') + root, tokens = tokens[0], tokens[1:] + assert not root # there is a root '/' on LHS + itokens = iter(tokens) + + prot: str | None = None + params: list[str] = [] + for token in itokens: + # every prot path should start with a known + # key-str. + if token in prots: + if prot is None: + prot: str = token + else: + yield prot, params + prot = token + + params = [] + + elif token not in prots: + params.append(token) + + else: + yield prot, params + + +def parse_addr( + multiaddr: str, +) -> dict[str, str | int | dict]: + ''' + Parse a libp2p style "multiaddress" into it's distinct protocol + segments where each segment: + + `..////../` + + is loaded into a layers `dict[str, dict[str, Any]` which holds + each prot segment of the path as a separate entry sortable by + it's approx OSI "layer number". + + Any `paramN` in the path must be distinctly defined in order + according to the (global) `prot_params` table in this module. + + ''' + layers: dict[str, str | int | dict] = {} + for ( + prot_key, + params, + ) in iter_prot_layers(multiaddr): + + layer: int = prots[prot_key] # OSI layer used for sorting + ep: dict[str, int | str] = {'layer': layer} + layers[prot_key] = ep + + # TODO; validation and resolving of names: + # - each param via a validator provided as part of the + # prot_params def? (also see `"port"` case below..) + # - do a resolv step that will check addrs against + # any loaded network.resolv: dict[str, str] + rparams: list = list(reversed(params)) + for key in prot_params[prot_key]: + val: str | int = rparams.pop() + + # TODO: UGHH, dunno what we should do for validation + # here, put it in the params spec somehow? + if key == 'port': + val = int(val) + + ep[key] = val + + return layers diff --git a/piker/service/_registry.py b/piker/service/_registry.py index 7ae11937..7d98d0ba 100644 --- a/piker/service/_registry.py +++ b/piker/service/_registry.py @@ -46,7 +46,9 @@ _registry: Registry | None = None class Registry: - addr: None | tuple[str, int] = None + # TODO: should this be a set or should we complain + # on duplicates? + addrs: list[tuple[str, int]] = [] # TODO: table of uids to sockaddrs peers: dict[ @@ -60,69 +62,90 @@ _tractor_kwargs: dict[str, Any] = {} @acm async def open_registry( - addr: None | tuple[str, int] = None, + addrs: list[tuple[str, int]], ensure_exists: bool = True, -) -> tuple[str, int]: +) -> list[tuple[str, int]]: global _tractor_kwargs actor = tractor.current_actor() uid = actor.uid + preset_reg_addrs: list[tuple[str, int]] = Registry.addrs if ( - Registry.addr is not None - and addr + preset_reg_addrs + and addrs ): - raise RuntimeError( - f'`{uid}` registry addr already bound @ {_registry.sockaddr}' - ) + if preset_reg_addrs != addrs: + raise RuntimeError( + f'`{uid}` has non-matching registrar addresses?\n' + f'request: {addrs}\n' + f'already set: {preset_reg_addrs}' + ) was_set: bool = False if ( not tractor.is_root_process() - and Registry.addr is None + and not Registry.addrs ): - Registry.addr = actor._arb_addr + Registry.addrs.extend(actor._reg_addrs) if ( ensure_exists - and Registry.addr is None + and not Registry.addrs ): raise RuntimeError( - f"`{uid}` registry should already exist bug doesn't?" + f"`{uid}` registry should already exist but doesn't?" ) if ( - Registry.addr is None + not Registry.addrs ): was_set = True - Registry.addr = addr or _default_reg_addr + Registry.addrs = addrs or [_default_reg_addr] - _tractor_kwargs['arbiter_addr'] = Registry.addr + # NOTE: only spot this seems currently used is inside + # `.ui._exec` which is the (eventual qtloops) bootstrapping + # with guest mode. + _tractor_kwargs['registry_addrs'] = Registry.addrs try: - yield Registry.addr + yield Registry.addrs finally: # XXX: always clear the global addr if we set it so that the # next (set of) calls will apply whatever new one is passed # in. if was_set: - Registry.addr = None + Registry.addrs = None @acm async def find_service( service_name: str, + registry_addrs: list[tuple[str, int]], + + first_only: bool = True, + ) -> tractor.Portal | None: - async with open_registry() as reg_addr: + reg_addrs: list[tuple[str, int]] + async with open_registry( + addrs=registry_addrs, + ) as reg_addrs: log.info(f'Scanning for service `{service_name}`') # attach to existing daemon by name if possible async with tractor.find_actor( service_name, - arbiter_sockaddr=reg_addr, - ) as maybe_portal: - yield maybe_portal + registry_addrs=reg_addrs, + ) as maybe_portals: + if not maybe_portals: + yield None + return + + if first_only: + yield maybe_portals[0] + else: + yield maybe_portals[0] async def check_for_service(