''' Actor tree daemon sub-service verifications ''' from typing import ( AsyncContextManager, Callable, ) from contextlib import asynccontextmanager as acm from exceptiongroup import BaseExceptionGroup import pytest import trio import tractor from piker.service import ( find_service, Services, ) from piker.data import ( open_feed, ) from piker.clearing import ( open_ems, ) from piker.clearing._messages import ( BrokerdPosition, Status, ) from piker.clearing._client import ( OrderClient, ) def test_runtime_boot( open_test_pikerd: AsyncContextManager ): ''' Verify we can boot the `pikerd` service stack using the `open_test_pikerd()` fixture helper and that contact-registry address details match up. ''' async def main(): port = 6666 daemon_addr = ('127.0.0.1', port) services: Services async with ( open_test_pikerd( reg_addr=daemon_addr, ) as (_, _, pikerd_portal, services), tractor.wait_for_actor( 'pikerd', registry_addr=daemon_addr, ) as portal, ): uw_raddr: tuple = pikerd_portal.chan.raddr.unwrap() assert uw_raddr == daemon_addr assert uw_raddr == portal.chan.raddr.unwrap() # no service tasks should be started assert not services.service_tasks trio.run(main) def test_ensure_datafeed_actors( open_test_pikerd: AsyncContextManager, loglevel: str, # cancel_method: str, ) -> None: ''' Verify that booting a data feed starts a `brokerd` actor and a singleton global `samplerd` and opening an order mode in paper opens the `paperboi` service. ''' actor_name: str = 'brokerd' backend: str = 'kraken' brokerd_name: str = f'{actor_name}.{backend}' async def main(): async with ( open_test_pikerd(), open_feed( ['xbtusdt.kraken'], loglevel=loglevel, ) as feed ): # halt rt quote streams since we aren't testing them await feed.pause() async with ( ensure_service(brokerd_name), ensure_service('samplerd'), ): await trio.sleep(0.1) trio.run(main) @acm async def ensure_service( name: str, sockaddr: tuple[str, int] | None = None, ) -> None: async with find_service(name) as portal: remote_sockaddr: tuple = portal.chan.raddr.unwrap() print(f'FOUND `{name}` @ {remote_sockaddr}') if sockaddr: assert remote_sockaddr == sockaddr yield portal def run_test_w_cancel_method( cancel_method: str, main: Callable, ) -> None: ''' Run our runtime under trio and expect a certain type of cancel condition depending on input. ''' cancelled_msg: str = ( "was remotely cancelled by remote actor (\'pikerd\'") if cancel_method == 'sigint': # XXX: with modern `tractor` the (single-exc) # group is collapsed so a bare KBI normally # propagates; tolerate either form. with pytest.raises(( KeyboardInterrupt, BaseExceptionGroup, )) as exc_info: trio.run(main) err = exc_info.value match err: case BaseExceptionGroup(): for suberr in err.exceptions: match suberr: # ensure we receive a remote # cancellation error caused by the # pikerd root actor. case tractor.ContextCancelled(): assert ( cancelled_msg in suberr.args[0] ) case KeyboardInterrupt(): pass case _: pytest.fail( f'Unexpected error {suberr}' ) case KeyboardInterrupt(): pass elif cancel_method == 'services': # XXX: cancelling our own sub-service via # `Services.cancel_service()` is a *self* # requested cancel: modern `tractor` absorbs the # resulting `ContextCancelled` (canceller is our # own actor) so the runtime tears down gracefully # with NO error raised to the opener. trio.run(main) else: pytest.fail(f'Test is broken due to {cancel_method}') @pytest.mark.parametrize( 'cancel_method', ['services', 'sigint'], ) def test_ensure_ems_in_paper_actors( open_test_pikerd: AsyncContextManager, loglevel: str, cancel_method: str, ) -> None: actor_name: str = 'brokerd' backend: str = 'kraken' brokerd_name: str = f'{actor_name}.{backend}' async def main(): # type declares client: OrderClient pps: dict[str, list[BrokerdPosition]] accounts: list[str] dialogs: dict[str, Status] # ensure we timeout after is startup is too slow. # TODO: something like this should be our start point for # benchmarking end-to-end startup B) # NOTE: includes a live (kraken) symbology fetch so # the budget needs some headroom for net latency.. with trio.fail_after(19): async with ( open_test_pikerd() as (_, _, _, services), open_ems( 'xbtusdt.kraken', mode='paper', loglevel=loglevel, ) as ( client, _, # trades_stream: tractor.MsgStream pps, accounts, dialogs, ), ): # there should be no on-going positions, # TODO: though eventually we'll want to validate against # local ledger and `pps.toml` state ;) assert not pps assert not dialogs # XXX: should be new client with no state from other tests assert not client._sent_orders assert accounts pikerd_subservices = ['emsd', 'samplerd'] async with ( ensure_service('emsd'), ensure_service(brokerd_name), ensure_service(f'paperboi.{backend}'), ): for name in pikerd_subservices: assert name in services.service_tasks # brokerd.kraken actor should have been started # implicitly by the ems. assert brokerd_name in services.service_tasks print('ALL SERVICES STARTED, cancelling runtime with:\n' f'-> {cancel_method}') if cancel_method == 'services': await services.cancel_service('emsd') elif cancel_method == 'sigint': raise KeyboardInterrupt run_test_w_cancel_method(cancel_method, main)