diff --git a/examples/basic_order_bot.py b/examples/basic_order_bot.py index 51575cc3..7aff6966 100644 --- a/examples/basic_order_bot.py +++ b/examples/basic_order_bot.py @@ -121,6 +121,7 @@ async def bot_main(): # tick_throttle=10, ) as feed, + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ): assert accounts diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 5952418f..a172f74c 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -365,7 +365,11 @@ class Position(Struct): # added: bool = False tid: str = t.tid if tid in self._events: - log.warning(f'{t} is already added?!') + log.debug( + f'Txn is already added?\n' + f'\n' + f'{t}\n' + ) # return added # TODO: apparently this IS possible with a dict but not @@ -731,7 +735,7 @@ class Account(Struct): else: # TODO: we reallly need a diff set of # loglevels/colors per subsys. - log.warning( + log.debug( f'Recent position for {fqme} was closed!' ) diff --git a/piker/brokers/_daemon.py b/piker/brokers/_daemon.py index 5efb03dd..5414bfb9 100644 --- a/piker/brokers/_daemon.py +++ b/piker/brokers/_daemon.py @@ -96,7 +96,10 @@ async def _setup_persistent_brokerd( # - `open_symbol_search()` # NOTE: see ep invocation details inside `.data.feed`. try: - async with trio.open_nursery() as service_nursery: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as service_nursery + ): bus: _FeedsBus = feed.get_feed_bus( brokername, service_nursery, diff --git a/piker/brokers/binance/broker.py b/piker/brokers/binance/broker.py index a13ce38f..919e8152 100644 --- a/piker/brokers/binance/broker.py +++ b/piker/brokers/binance/broker.py @@ -440,6 +440,7 @@ async def open_trade_dialog( # - ledger: TransactionLedger async with ( + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ctx.open_stream() as ems_stream, ): diff --git a/piker/brokers/binance/feed.py b/piker/brokers/binance/feed.py index efe2f717..b7d68edb 100644 --- a/piker/brokers/binance/feed.py +++ b/piker/brokers/binance/feed.py @@ -448,7 +448,6 @@ async def subscribe( async def stream_quotes( - send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, @@ -460,6 +459,7 @@ async def stream_quotes( ) -> None: async with ( + tractor.trionics.maybe_raise_from_masking_exc(), send_chan as send_chan, open_cached_client('binance') as client, ): diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 03cc301e..c0ab1ed4 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -31,7 +31,7 @@ from typing import ( Callable, ) -import pendulum +from pendulum import now import trio from trio_typing import TaskStatus from rapidfuzz import process as fuzzy @@ -39,6 +39,7 @@ import numpy as np from tractor.trionics import ( broadcast_receiver, maybe_open_context + collapse_eg, ) from tractor import to_asyncio # XXX WOOPS XD @@ -432,6 +433,7 @@ async def get_client( ) -> Client: async with ( + collapse_eg(), trio.open_nursery() as n, open_jsonrpc_session( _testnet_ws_url, dtype=JSONRPCResult) as json_rpc diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 23222512..74d03075 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -48,6 +48,7 @@ from bidict import bidict import trio import tractor from tractor import to_asyncio +from tractor import trionics from pendulum import ( from_timestamp, DateTime, @@ -1369,8 +1370,8 @@ async def load_clients_for_trio( ''' Pure async mngr proxy to ``load_aio_clients()``. - This is a bootstrap entrypoing to call from - a ``tractor.to_asyncio.open_channel_from()``. + This is a bootstrap entrypoint to call from + a `tractor.to_asyncio.open_channel_from()`. ''' async with load_aio_clients( @@ -1391,7 +1392,10 @@ async def open_client_proxies() -> tuple[ async with ( tractor.trionics.maybe_open_context( acm_func=tractor.to_asyncio.open_channel_from, - kwargs={'target': load_clients_for_trio}, + kwargs={ + 'target': load_clients_for_trio, + # ^XXX, kwarg to `open_channel_from()` + }, # lock around current actor task access # TODO: maybe this should be the default in tractor? @@ -1584,7 +1588,8 @@ async def open_client_proxy( event_consumers=event_table, ) as (first, chan), - trio.open_nursery() as relay_n, + trionics.collapse_eg(), # loose-ify + trio.open_nursery() as relay_tn, ): assert isinstance(first, Client) @@ -1624,7 +1629,7 @@ async def open_client_proxy( continue - relay_n.start_soon(relay_events) + relay_tn.start_soon(relay_events) yield proxy diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index ddda9020..b78f2880 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -34,6 +34,7 @@ import trio from trio_typing import TaskStatus import tractor from tractor.to_asyncio import LinkedTaskChannel +from tractor import trionics from ib_insync.contract import ( Contract, ) @@ -407,7 +408,7 @@ async def update_and_audit_pos_msg( # TODO: make this a "propaganda" log level? if ibpos.avgCost != msg.avg_price: - log.warning( + log.debug( f'IB "FIFO" avg price for {msg.symbol} is DIFF:\n' f'ib: {ibfmtmsg}\n' '---------------------------\n' @@ -738,7 +739,7 @@ async def open_trade_dialog( f'UNEXPECTED POSITION says IB => {msg.symbol}\n' 'Maybe they LIQUIDATED YOU or your ledger is wrong?\n' ) - log.error(logmsg) + log.debug(logmsg) await ctx.started(( all_positions, @@ -747,21 +748,22 @@ async def open_trade_dialog( async with ( ctx.open_stream() as ems_stream, - trio.open_nursery() as n, + trionics.collapse_eg(), + trio.open_nursery() as tn, ): # relay existing open orders to ems for msg in order_msgs: await ems_stream.send(msg) for client in set(aioclients.values()): - trade_event_stream: LinkedTaskChannel = await n.start( + trade_event_stream: LinkedTaskChannel = await tn.start( open_trade_event_stream, client, ) # start order request handler **before** local trades # event loop - n.start_soon( + tn.start_soon( handle_order_requests, ems_stream, accounts_def, @@ -769,7 +771,7 @@ async def open_trade_dialog( ) # allocate event relay tasks for each client connection - n.start_soon( + tn.start_soon( deliver_trade_events, trade_event_stream, diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 6d8f645e..21edcbb7 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -25,7 +25,10 @@ from typing import TYPE_CHECKING import trio import tractor -from tractor.trionics import broadcast_receiver +from tractor.trionics import ( + broadcast_receiver, + collapse_eg, +) from ._util import ( log, # sub-sys logger @@ -285,8 +288,11 @@ async def open_ems( client._ems_stream = trades_stream # start sync code order msg delivery task - async with trio.open_nursery() as n: - n.start_soon( + async with ( + collapse_eg(), + trio.open_nursery() as tn, + ): + tn.start_soon( relay_orders_from_sync_code, client, fqme, @@ -302,4 +308,4 @@ async def open_ems( ) # stop the sync-msg-relay task on exit. - n.cancel_scope.cancel() + tn.cancel_scope.cancel() diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 6547fb1e..3794313f 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -42,6 +42,7 @@ from bidict import bidict import trio from trio_typing import TaskStatus import tractor +from tractor import trionics from ._util import ( log, # sub-sys logger @@ -161,7 +162,7 @@ async def clear_dark_triggers( router: Router, brokerd_orders_stream: tractor.MsgStream, - quote_stream: tractor.ReceiveMsgStream, # noqa + quote_stream: tractor.MsgStream, broker: str, fqme: str, @@ -177,6 +178,7 @@ async def clear_dark_triggers( ''' # XXX: optimize this for speed! # TODO: + # - port to the new ringbuf stuff in `tractor.ipc`! # - numba all this! # - this stream may eventually contain multiple symbols quote_stream._raise_on_lag = False @@ -499,7 +501,7 @@ class Router(Struct): ''' # setup at actor spawn time - nursery: trio.Nursery + _tn: trio.Nursery # broker to book map books: dict[str, DarkBook] = {} @@ -669,7 +671,7 @@ class Router(Struct): # dark book clearing loop, also lives with parent # daemon to allow dark order clearing while no # client is connected. - self.nursery.start_soon( + self._tn.start_soon( clear_dark_triggers, self, relay.brokerd_stream, @@ -692,7 +694,7 @@ class Router(Struct): # spawn a ``brokerd`` order control dialog stream # that syncs lifetime with the parent `emsd` daemon. - self.nursery.start_soon( + self._tn.start_soon( translate_and_relay_brokerd_events, broker, relay.brokerd_stream, @@ -766,10 +768,12 @@ async def _setup_persistent_emsd( global _router - # open a root "service nursery" for the ``emsd`` actor - async with trio.open_nursery() as service_nursery: - - _router = Router(nursery=service_nursery) + # open a root "service task-nursery" for the `emsd`-actor + async with ( + trionics.collapse_eg(), + trio.open_nursery() as tn + ): + _router = Router(_tn=tn) # TODO: send back the full set of persistent # orders/execs? @@ -1518,7 +1522,7 @@ async def maybe_open_trade_relays( loglevel: str = 'info', ): - fqme, relay, feed, client_ready = await _router.nursery.start( + fqme, relay, feed, client_ready = await _router._tn.start( _router.open_trade_relays, fqme, exec_mode, diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index bf26d5ab..fdecb818 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -134,67 +134,65 @@ def pikerd( Spawn the piker broker-daemon. ''' - from tractor.devx import maybe_open_crash_handler - with maybe_open_crash_handler(pdb=pdb): - log = get_console_log(loglevel, name='cli') + # from tractor.devx import maybe_open_crash_handler + # with maybe_open_crash_handler(pdb=False): + log = get_console_log(loglevel, name='cli') - if pdb: - log.warning(( - "\n" - "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" - "When a `piker` daemon crashes it will block the " - "task-thread until resumed from console!\n" - "\n" + if pdb: + log.warning(( + "\n" + "!!! YOU HAVE ENABLED DAEMON DEBUG MODE !!!\n" + "When a `piker` daemon crashes it will block the " + "task-thread until resumed from console!\n" + "\n" + )) + + # service-actor registry endpoint socket-address set + regaddrs: list[tuple[str, int]] = [] + + conf, _ = config.load( + conf_name='conf', + ) + network: dict = conf.get('network') + if ( + network is None + and not maddr + ): + regaddrs = [( + _default_registry_host, + _default_registry_port, + )] + + else: + eps: dict = load_trans_eps( + network, + maddr, + ) + for layers in eps['pikerd']: + regaddrs.append(( + layers['ipv4']['addr'], + layers['tcp']['port'], )) - # service-actor registry endpoint socket-address set - regaddrs: list[tuple[str, int]] = [] + from .. import service - conf, _ = config.load( - conf_name='conf', - ) - network: dict = conf.get('network') - if ( - network is None - and not maddr + async def main(): + service_mngr: service.Services + async with ( + service.open_pikerd( + registry_addrs=regaddrs, + loglevel=loglevel, + debug_mode=pdb, + # enable_transports=['uds'], + enable_transports=['tcp'], + ) as service_mngr, ): - regaddrs = [( - _default_registry_host, - _default_registry_port, - )] + assert service_mngr + # ?TODO? spawn all other sub-actor daemons according to + # multiaddress endpoint spec defined by user config + await trio.sleep_forever() - else: - eps: dict = load_trans_eps( - network, - maddr, - ) - for layers in eps['pikerd']: - regaddrs.append(( - layers['ipv4']['addr'], - layers['tcp']['port'], - )) - - from .. import service - - async def main(): - service_mngr: service.Services - - async with ( - service.open_pikerd( - registry_addrs=regaddrs, - loglevel=loglevel, - debug_mode=pdb, - - ) as service_mngr, # normally delivers a ``Services`` handle - - # AsyncExitStack() as stack, - ): - assert service_mngr - # ?TODO? spawn all other sub-actor daemons according to - # multiaddress endpoint spec defined by user config - await trio.sleep_forever() - - trio.run(main) + trio.run(main) @click.group(context_settings=config._context_defaults) @@ -309,6 +307,10 @@ def services(config, tl, ports): if not ports: ports = [_default_registry_port] + addr = tractor._addr.wrap_address( + addr=(host, ports[0]) + ) + async def list_services(): nonlocal host async with ( @@ -316,16 +318,18 @@ def services(config, tl, ports): name='service_query', loglevel=config['loglevel'] if tl else None, ), - tractor.get_arbiter( - host=host, - port=ports[0] + tractor.get_registry( + addr=addr, ) as portal ): - registry = await portal.run_from_ns('self', 'get_registry') + registry = await portal.run_from_ns( + 'self', + 'get_registry', + ) json_d = {} for key, socket in registry.items(): - host, port = socket - json_d[key] = f'{host}:{port}' + json_d[key] = f'{socket}' + click.echo(f"{colorize_json(json_d)}") trio.run(list_services) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 4d886fbc..7e5eb810 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -27,7 +27,6 @@ from functools import partial from types import ModuleType from typing import ( Any, - Optional, Callable, AsyncContextManager, AsyncGenerator, @@ -35,6 +34,7 @@ from typing import ( ) import json +import tractor import trio from trio_typing import TaskStatus from trio_websocket import ( @@ -167,7 +167,7 @@ async def _reconnect_forever( async def proxy_msgs( ws: WebSocketConnection, - pcs: trio.CancelScope, # parent cancel scope + rent_cs: trio.CancelScope, # parent cancel scope ): ''' Receive (under `timeout` deadline) all msgs from from underlying @@ -192,7 +192,7 @@ async def _reconnect_forever( f'{url} connection bail with:' ) await trio.sleep(0.5) - pcs.cancel() + rent_cs.cancel() # go back to reonnect loop in parent task return @@ -204,7 +204,7 @@ async def _reconnect_forever( f'{src_mod}\n' 'WS feed seems down and slow af.. reconnecting\n' ) - pcs.cancel() + rent_cs.cancel() # go back to reonnect loop in parent task return @@ -228,7 +228,12 @@ async def _reconnect_forever( nobsws._connected = trio.Event() task_status.started() - while not snd._closed: + mc_state: trio._channel.MemoryChannelState = snd._state + while ( + mc_state.open_receive_channels > 0 + and + mc_state.open_send_channels > 0 + ): log.info( f'{src_mod}\n' f'{url} trying (RE)CONNECT' @@ -237,10 +242,11 @@ async def _reconnect_forever( ws: WebSocketConnection try: async with ( - trio.open_nursery() as n, open_websocket_url(url) as ws, + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn, ): - cs = nobsws._cs = n.cancel_scope + cs = nobsws._cs = tn.cancel_scope nobsws._ws = ws log.info( f'{src_mod}\n' @@ -248,7 +254,7 @@ async def _reconnect_forever( ) # begin relay loop to forward msgs - n.start_soon( + tn.start_soon( proxy_msgs, ws, cs, @@ -262,7 +268,7 @@ async def _reconnect_forever( # TODO: should we return an explicit sub-cs # from this fixture task? - await n.start( + await tn.start( open_fixture, fixture, nobsws, @@ -272,11 +278,23 @@ async def _reconnect_forever( # to let tasks run **inside** the ws open block above. nobsws._connected.set() await trio.sleep_forever() - except HandshakeError: + + except ( + HandshakeError, + ConnectionRejected, + ): log.exception('Retrying connection') + await trio.sleep(0.5) # throttle - # ws & nursery block ends + except BaseException as _berr: + berr = _berr + log.exception( + 'Reconnect-attempt failed ??\n' + ) + await trio.sleep(0.2) # throttle + raise berr + #|_ws & nursery block ends nobsws._connected = trio.Event() if cs.cancelled_caught: log.cancel( @@ -324,21 +342,25 @@ async def open_autorecon_ws( connetivity errors, or some user defined recv timeout. You can provide a ``fixture`` async-context-manager which will be - entered/exitted around each connection reset; eg. for (re)requesting - subscriptions without requiring streaming setup code to rerun. + entered/exitted around each connection reset; eg. for + (re)requesting subscriptions without requiring streaming setup + code to rerun. ''' snd: trio.MemorySendChannel rcv: trio.MemoryReceiveChannel snd, rcv = trio.open_memory_channel(616) - async with trio.open_nursery() as n: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): nobsws = NoBsWs( url, rcv, msg_recv_timeout=msg_recv_timeout, ) - await n.start( + await tn.start( partial( _reconnect_forever, url, @@ -351,11 +373,10 @@ async def open_autorecon_ws( await nobsws._connected.wait() assert nobsws._cs assert nobsws.connected() - try: yield nobsws finally: - n.cancel_scope.cancel() + tn.cancel_scope.cancel() ''' @@ -368,8 +389,8 @@ of msgs over a `NoBsWs`. class JSONRPCResult(Struct): id: int jsonrpc: str = '2.0' - result: Optional[dict] = None - error: Optional[dict] = None + result: dict|None = None + error: dict|None = None @acm diff --git a/piker/data/feed.py b/piker/data/feed.py index df4106e0..9cc37cd7 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -39,6 +39,7 @@ from typing import ( AsyncContextManager, Awaitable, Sequence, + TYPE_CHECKING, ) import trio @@ -75,6 +76,10 @@ from ._sampling import ( uniform_rate_send, ) +if TYPE_CHECKING: + from tractor._addr import Address + from tractor.msg.types import Aid + class Sub(Struct, frozen=True): ''' @@ -725,7 +730,10 @@ class Feed(Struct): async for msg in stream: await tx.send(msg) - async with trio.open_nursery() as nurse: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as nurse + ): # spawn a relay task for each stream so that they all # multiplex to a common channel. for brokername in mods: @@ -899,19 +907,19 @@ async def open_feed( feed.portals[brokermod] = portal # fill out "status info" that the UI can show - host, port = portal.channel.raddr - if host == '127.0.0.1': - host = 'localhost' - + chan: tractor.Channel = portal.chan + raddr: Address = chan.raddr + aid: Aid = chan.aid + # TAG_feed_status_update feed.status.update({ - 'actor_name': portal.channel.uid[0], - 'host': host, - 'port': port, + 'actor_id': aid, + 'actor_short_id': f'{aid.name}@{aid.pid}', + 'ipc': chan.raddr.proto_key, + 'ipc_addr': raddr, 'hist_shm': 'NA', 'rt_shm': 'NA', - 'throttle_rate': tick_throttle, + 'throttle_hz': tick_throttle, }) - # feed.status.update(init_msg.pop('status', {})) # (allocate and) connect to any feed bus for this broker bus_ctxs.append( diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index acc7309e..5d1fd45a 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -498,6 +498,7 @@ async def cascade( func_name: str = func.__name__ async with ( + tractor.trionics.collapse_eg(), # avoid multi-taskc tb in console trio.open_nursery() as tn, ): # TODO: might be better to just make a "restart" method where diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index a4e3ccf2..33f23453 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -107,17 +107,22 @@ async def open_piker_runtime( async with ( tractor.open_root_actor( - # passed through to ``open_root_actor`` + # passed through to `open_root_actor` registry_addrs=registry_addrs, name=name, + start_method=start_method, loglevel=loglevel, debug_mode=debug_mode, - start_method=start_method, + + # XXX NOTE MEMBER DAT der's a perf hit yo!! + # https://greenback.readthedocs.io/en/latest/principle.html#performance + maybe_enable_greenback=True, # TODO: eventually we should be able to avoid # having the root have more then permissions to # spawn other specialized daemons I think? enable_modules=enable_modules, + hide_tb=False, **tractor_kwargs, ) as actor, @@ -200,7 +205,8 @@ async def open_pikerd( reg_addrs, ), tractor.open_nursery() as actor_nursery, - trio.open_nursery() as service_nursery, + tractor.trionics.collapse_eg(), + trio.open_nursery() as service_tn, ): for addr in reg_addrs: if addr not in root_actor.accept_addrs: @@ -211,7 +217,7 @@ async def open_pikerd( # assign globally for future daemon/task creation Services.actor_n = actor_nursery - Services.service_n = service_nursery + Services.service_n = service_tn Services.debug_mode = debug_mode try: @@ -221,7 +227,7 @@ async def open_pikerd( # TODO: is this more clever/efficient? # if 'samplerd' in Services.service_tasks: # await Services.cancel_service('samplerd') - service_nursery.cancel_scope.cancel() + service_tn.cancel_scope.cancel() # TODO: do we even need this? @@ -256,7 +262,10 @@ async def maybe_open_pikerd( loglevel: str | None = None, **kwargs, -) -> tractor._portal.Portal | ClassVar[Services]: +) -> ( + tractor._portal.Portal + |ClassVar[Services] +): ''' If no ``pikerd`` daemon-root-actor can be found start it and yield up (we should probably figure out returning a portal to self @@ -281,10 +290,11 @@ async def maybe_open_pikerd( registry_addrs: list[tuple[str, int]] = ( registry_addrs - or [_default_reg_addr] + or + [_default_reg_addr] ) - pikerd_portal: tractor.Portal | None + pikerd_portal: tractor.Portal|None async with ( open_piker_runtime( name=query_name, diff --git a/piker/service/_daemon.py b/piker/service/_daemon.py index 1e7ff096..89d7f28d 100644 --- a/piker/service/_daemon.py +++ b/piker/service/_daemon.py @@ -28,6 +28,7 @@ from contextlib import ( ) import tractor +from trio.lowlevel import current_task from ._util import ( log, # sub-sys logger @@ -70,69 +71,84 @@ async def maybe_spawn_daemon( lock = Services.locks[service_name] await lock.acquire() - async with find_service( - service_name, - registry_addrs=[('127.0.0.1', 6116)], - ) as portal: - if portal is not None: - lock.release() - yield portal - return + try: + async with find_service( + service_name, + registry_addrs=[('127.0.0.1', 6116)], + ) as portal: + if portal is not None: + lock.release() + yield portal + return - log.warning( - f"Couldn't find any existing {service_name}\n" - 'Attempting to spawn new daemon-service..' - ) + log.warning( + f"Couldn't find any existing {service_name}\n" + 'Attempting to spawn new daemon-service..' + ) - # ask root ``pikerd`` daemon to spawn the daemon we need if - # pikerd is not live we now become the root of the - # process tree - async with maybe_open_pikerd( - loglevel=loglevel, - **pikerd_kwargs, + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + loglevel=loglevel, + **pikerd_kwargs, - ) as pikerd_portal: + ) as pikerd_portal: - # we are the root and thus are `pikerd` - # so spawn the target service directly by calling - # the provided target routine. - # XXX: this assumes that the target is well formed and will - # do the right things to setup both a sub-actor **and** call - # the ``_Services`` api from above to start the top level - # service task for that actor. - started: bool - if pikerd_portal is None: - started = await service_task_target( - loglevel=loglevel, - **spawn_args, + # we are the root and thus are `pikerd` + # so spawn the target service directly by calling + # the provided target routine. + # XXX: this assumes that the target is well formed and will + # do the right things to setup both a sub-actor **and** call + # the ``_Services`` api from above to start the top level + # service task for that actor. + started: bool + if pikerd_portal is None: + started = await service_task_target( + loglevel=loglevel, + **spawn_args, + ) + + else: + # request a remote `pikerd` (service manager) to start the + # target daemon-task, the target can't return + # a non-serializable value since it is expected that service + # starting is non-blocking and the target task will persist + # running "under" or "within" the `pikerd` actor tree after + # the questing client disconnects. in other words this + # spawns a persistent daemon actor that continues to live + # for the lifespan of whatever the service manager inside + # `pikerd` says it should. + started = await pikerd_portal.run( + service_task_target, + loglevel=loglevel, + **spawn_args, + ) + + if started: + log.info(f'Service {service_name} started!') + + # block until we can discover (by IPC connection) to the newly + # spawned daemon-actor and then deliver the portal to the + # caller. + async with tractor.wait_for_actor(service_name) as portal: + lock.release() + yield portal + await portal.cancel_actor() + + except BaseException as _err: + err = _err + if ( + lock.locked() + and + lock.statistics().owner is current_task() + ): + log.exception( + f'Releasing stale lock after crash..?' + f'{err!r}\n' ) - - else: - # request a remote `pikerd` (service manager) to start the - # target daemon-task, the target can't return - # a non-serializable value since it is expected that service - # starting is non-blocking and the target task will persist - # running "under" or "within" the `pikerd` actor tree after - # the questing client disconnects. in other words this - # spawns a persistent daemon actor that continues to live - # for the lifespan of whatever the service manager inside - # `pikerd` says it should. - started = await pikerd_portal.run( - service_task_target, - loglevel=loglevel, - **spawn_args, - ) - - if started: - log.info(f'Service {service_name} started!') - - # block until we can discover (by IPC connection) to the newly - # spawned daemon-actor and then deliver the portal to the - # caller. - async with tractor.wait_for_actor(service_name) as portal: lock.release() - yield portal - await portal.cancel_actor() + raise err async def spawn_emsd( diff --git a/piker/service/_mngr.py b/piker/service/_mngr.py index 89e98411..726a34c8 100644 --- a/piker/service/_mngr.py +++ b/piker/service/_mngr.py @@ -109,7 +109,7 @@ class Services: # wait on any context's return value # and any final portal result from the # sub-actor. - ctx_res: Any = await ctx.result() + ctx_res: Any = await ctx.wait_for_result() # NOTE: blocks indefinitely until cancelled # either by error from the target context diff --git a/piker/service/_registry.py b/piker/service/_registry.py index ed4569f7..94ccbc68 100644 --- a/piker/service/_registry.py +++ b/piker/service/_registry.py @@ -101,13 +101,15 @@ async def open_registry( if ( not tractor.is_root_process() - and not Registry.addrs + and + not Registry.addrs ): Registry.addrs.extend(actor.reg_addrs) if ( ensure_exists - and not Registry.addrs + and + not Registry.addrs ): raise RuntimeError( f"`{uid}` registry should already exist but doesn't?" @@ -146,7 +148,7 @@ async def find_service( | list[Portal] | None ): - + # try: reg_addrs: list[tuple[str, int]] async with open_registry( addrs=( @@ -157,22 +159,39 @@ async def find_service( or Registry.addrs ), ) as reg_addrs: - log.info(f'Scanning for service `{service_name}`') - maybe_portals: list[Portal] | Portal | None + log.info( + f'Scanning for service {service_name!r}' + ) # attach to existing daemon by name if possible + maybe_portals: list[Portal]|Portal|None async with tractor.find_actor( service_name, registry_addrs=reg_addrs, only_first=first_only, # if set only returns single ref ) as maybe_portals: if not maybe_portals: + # log.info( + print( + f'Could NOT find service {service_name!r} -> {maybe_portals!r}' + ) yield None return + # log.info( + print( + f'Found service {service_name!r} -> {maybe_portals}' + ) yield maybe_portals + # except BaseException as _berr: + # berr = _berr + # log.exception( + # 'tractor.find_actor() failed with,\n' + # ) + # raise berr + async def check_for_service( service_name: str, diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index f10f0f75..121fcbb7 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -963,7 +963,10 @@ async def tsdb_backfill( # concurrently load the provider's most-recent-frame AND any # pre-existing tsdb history already saved in `piker` storage. dt_eps: list[DateTime, DateTime] = [] - async with trio.open_nursery() as tn: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): tn.start_soon( push_latest_frame, dt_eps, @@ -1012,9 +1015,16 @@ async def tsdb_backfill( int, Duration, ]|None = config.get('frame_types', None) + if def_frame_durs: def_frame_size: Duration = def_frame_durs[timeframe] - assert def_frame_size == calced_frame_size + + if def_frame_size != calced_frame_size: + log.warning( + f'Expected frame size {def_frame_size}\n' + f'Rxed frame {calced_frame_size}\n' + ) + # await tractor.pause() else: # use what we calced from first frame above. def_frame_size = calced_frame_size @@ -1043,7 +1053,9 @@ async def tsdb_backfill( # if there is a gap to backfill from the first # history frame until the last datum loaded from the tsdb # continue that now in the background - async with trio.open_nursery() as tn: + async with trio.open_nursery( + strict_exception_groups=False, + ) as tn: bf_done = await tn.start( partial( @@ -1308,6 +1320,7 @@ async def manage_history( # sampling period) data set since normally differently # sampled timeseries can be loaded / process independently # ;) + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ): log.info( diff --git a/piker/ui/_app.py b/piker/ui/_app.py index 5733e372..68ecb3dd 100644 --- a/piker/ui/_app.py +++ b/piker/ui/_app.py @@ -21,6 +21,7 @@ Main app startup and run. from functools import partial from types import ModuleType +import tractor import trio from piker.ui.qt import ( @@ -116,6 +117,7 @@ async def _async_main( needed_brokermods[brokername] = brokers[brokername] async with ( + tractor.trionics.collapse_eg(), trio.open_nursery() as root_n, ): # set root nursery and task stack for spawning other charts/feeds diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index afcd7dd0..98b25398 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -33,7 +33,6 @@ import trio from piker.ui.qt import ( QtCore, - QtWidgets, Qt, QLineF, QFrame, diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 46e1b922..690bfb18 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -1445,7 +1445,10 @@ async def display_symbol_data( # for pause/resume on mouse interaction rt_chart.feed = feed - async with trio.open_nursery() as ln: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as ln, + ): # if available load volume related built-in display(s) vlm_charts: dict[ str, diff --git a/piker/ui/_event.py b/piker/ui/_event.py index 44797fa4..28d35de0 100644 --- a/piker/ui/_event.py +++ b/piker/ui/_event.py @@ -22,7 +22,10 @@ from contextlib import asynccontextmanager as acm from typing import Callable import trio -from tractor.trionics import gather_contexts +from tractor.trionics import ( + gather_contexts, + collapse_eg, +) from piker.ui.qt import ( QtCore, @@ -207,7 +210,10 @@ async def open_signal_handler( async for args in recv: await async_handler(*args) - async with trio.open_nursery() as tn: + async with ( + collapse_eg(), + trio.open_nursery() as tn + ): tn.start_soon(proxy_to_handler) async with send: yield @@ -242,6 +248,7 @@ async def open_handlers( widget: QWidget streams: list[trio.abc.ReceiveChannel] async with ( + collapse_eg(), trio.open_nursery() as tn, gather_contexts([ open_event_stream( diff --git a/piker/ui/_feedstatus.py b/piker/ui/_feedstatus.py index 1c9eb772..ea262876 100644 --- a/piker/ui/_feedstatus.py +++ b/piker/ui/_feedstatus.py @@ -18,10 +18,11 @@ Feed status and controls widget(s) for embedding in a UI-pane. """ - from __future__ import annotations -from textwrap import dedent -from typing import TYPE_CHECKING +from typing import ( + Any, + TYPE_CHECKING, +) # from PyQt5.QtCore import Qt @@ -49,35 +50,55 @@ def mk_feed_label( a feed control protocol. ''' - status = feed.status + status: dict[str, Any] = feed.status assert status - msg = dedent(""" - actor: **{actor_name}**\n - |_ @**{host}:{port}**\n - """) + # SO tips on ws/nls, + # https://stackoverflow.com/a/15721400 + ws: str = ' ' + # nl: str = '
' # dun work? + actor_info_repr: str = ( + f')> **{status["actor_short_id"]}**\n' + '\n' # bc md? + ) - for key, val in status.items(): - if key in ('host', 'port', 'actor_name'): - continue - msg += f'\n|_ {key}: **{{{key}}}**\n' + # fields to select *IN* for display + # (see `.data.feed.open_feed()` status + # update -> TAG_feed_status_update) + for key in [ + 'ipc', + 'hist_shm', + 'rt_shm', + 'throttle_hz', + ]: + # NOTE, the 2nd key is filled via `.format()` updates. + actor_info_repr += ( + f'\n' # bc md? + f'{ws}|_{key}: **{{{key}}}**\n' + ) + # ^TODO? formatting and content.. + # -[ ] showing which fqme is "forward" on the + # chart/fsp/order-mode? + # '|_ flows: **{symbols}**\n' + # + # -[x] why isn't the indent working? + # => markdown, now solved.. feed_label = FormatLabel( - fmt_str=msg, - # |_ streams: **{symbols}**\n + fmt_str=actor_info_repr, font=_font.font, font_size=_font_small.px_size, font_color='default_lightest', ) + # ?TODO, remove this? # form.vbox.setAlignment(feed_label, Qt.AlignBottom) # form.vbox.setAlignment(Qt.AlignBottom) - _ = chart.height() - ( - form.height() + - form.fill_bar.height() - # feed_label.height() - ) + # _ = chart.height() - ( + # form.height() + + # form.fill_bar.height() + # # feed_label.height() + # ) feed_label.format(**feed.status) - return feed_label diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 2e3e392e..ca43ed77 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -600,6 +600,7 @@ async def open_fsp_admin( kwargs=kwargs, ) as (cache_hit, cluster_map), + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ): if cache_hit: @@ -613,6 +614,8 @@ async def open_fsp_admin( ) try: yield admin + + # ??TODO, does this *need* to be inside a finally? finally: # terminate all tasks via signals for key, entry in admin._registry.items(): diff --git a/piker/ui/_label.py b/piker/ui/_label.py index 0e90b7fe..07956e4a 100644 --- a/piker/ui/_label.py +++ b/piker/ui/_label.py @@ -285,18 +285,20 @@ class FormatLabel(QLabel): font_size: int, font_color: str, + use_md: bool = True, + parent=None, ) -> None: super().__init__(parent) - # by default set the format string verbatim and expect user to - # call ``.format()`` later (presumably they'll notice the + # by default set the format string verbatim and expect user + # to call ``.format()`` later (presumably they'll notice the # unformatted content if ``fmt_str`` isn't meant to be # unformatted). self.fmt_str = fmt_str - self.setText(fmt_str) + # self.setText(fmt_str) # ?TODO, why here? self.setStyleSheet( f"""QLabel {{ @@ -306,9 +308,10 @@ class FormatLabel(QLabel): """ ) self.setFont(_font.font) - self.setTextFormat( - Qt.TextFormat.MarkdownText - ) + if use_md: + self.setTextFormat( + Qt.TextFormat.MarkdownText + ) self.setMargin(0) self.setSizePolicy( @@ -316,7 +319,10 @@ class FormatLabel(QLabel): size_policy.Expanding, ) self.setAlignment( - Qt.AlignVCenter | Qt.AlignLeft + Qt.AlignLeft + | + Qt.AlignBottom + # Qt.AlignVCenter ) self.setText(self.fmt_str) diff --git a/piker/ui/_search.py b/piker/ui/_search.py index 16b25a46..aa6f6623 100644 --- a/piker/ui/_search.py +++ b/piker/ui/_search.py @@ -15,7 +15,8 @@ # along with this program. If not, see . """ -qompleterz: embeddable search and complete using trio, Qt and rapidfuzz. +qompleterz: embeddable search and complete using trio, Qt and +rapidfuzz. """ @@ -46,6 +47,7 @@ import time from pprint import pformat from rapidfuzz import process as fuzzy +import tractor import trio from trio_typing import TaskStatus @@ -53,7 +55,7 @@ from piker.ui.qt import ( size_policy, align_flag, Qt, - QtCore, + # QtCore, QtWidgets, QModelIndex, QItemSelectionModel, @@ -920,7 +922,10 @@ async def fill_results( # issue multi-provider fan-out search request and place # "searching.." statuses on outstanding results providers - async with trio.open_nursery() as n: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): for provider, (search, pause) in ( _searcher_cache.copy().items() @@ -944,7 +949,7 @@ async def fill_results( status_field='-> searchin..', ) - await n.start( + await tn.start( pack_matches, view, has_results, @@ -1004,12 +1009,14 @@ async def handle_keyboard_input( view.set_font_size(searchbar.dpi_font.px_size) send, recv = trio.open_memory_channel(616) - async with trio.open_nursery() as n: - + async with ( + tractor.trionics.collapse_eg(), # needed? + trio.open_nursery() as tn + ): # start a background multi-searcher task which receives # patterns relayed from this keyboard input handler and # async updates the completer view's results. - n.start_soon( + tn.start_soon( partial( fill_results, searchw, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 47a3bb97..ea6d498a 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -792,6 +792,7 @@ async def open_order_mode( brokerd_accounts, ems_dialog_msgs, ), + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, ): diff --git a/tests/conftest.py b/tests/conftest.py index 366d5d95..22d1af3c 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -15,6 +15,12 @@ from piker.service import ( from piker.log import get_console_log +# include `tractor`'s built-in fixtures! +pytest_plugins: tuple[str] = ( + "tractor._testing.pytest", +) + + def pytest_addoption(parser): parser.addoption("--ll", action="store", dest='loglevel', default=None, help="logging level to set when testing") diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 4614b4f9..79e51ef5 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -142,7 +142,12 @@ async def test_concurrent_tokens_refresh(us_symbols, loglevel): # async with tractor.open_nursery() as n: # await n.run_in_actor('other', intermittently_refresh_tokens) - async with trio.open_nursery() as n: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery( + # strict_exception_groups=False, + ) as n + ): quoter = await qt.stock_quoter(client, us_symbols) @@ -383,7 +388,9 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what): else: symbols = [tmx_symbols] - async with trio.open_nursery() as n: + async with trio.open_nursery( + strict_exception_groups=False, + ) as n: for syms, func in zip(symbols, stream_what): n.start_soon(func, feed, syms)