diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index b6d469d9..79c454f6 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -24,14 +24,10 @@ from tractor._testing import ( ) # XXX TODO cases: -# - [ ] peer cancelled itself - so other peers should -# get errors reflecting that the peer was itself the .canceller? - # - [x] WE cancelled the peer and thus should not see any raised # `ContextCancelled` as it should be reaped silently? # => pretty sure `test_context_stream_semantics::test_caller_cancels()` # already covers this case? - # - [x] INTER-PEER: some arbitrary remote peer cancels via # Portal.cancel_actor(). # => all other connected peers should get that cancel requesting peer's @@ -44,16 +40,6 @@ from tractor._testing import ( # that also spawned a remote task task in that same peer-parent. -# def test_self_cancel(): -# ''' -# 2 cases: -# - calls `Actor.cancel()` locally in some task -# - calls LocalPortal.cancel_actor()` ? - -# ''' -# ... - - @tractor.context async def open_stream_then_sleep_forever( ctx: Context, @@ -806,7 +792,7 @@ async def basic_echo_server( ctx: Context, peer_name: str = 'wittle_bruv', - err_after: int|None = None, + err_after_imsg: int|None = None, ) -> None: ''' @@ -835,8 +821,9 @@ async def basic_echo_server( await ipc.send(resp) if ( - err_after - and i > err_after + err_after_imsg + and + i > err_after_imsg ): raise RuntimeError( f'Simulated error in `{peer_name}`' @@ -978,7 +965,8 @@ async def tell_little_bro( actor_name: str, caller: str = '', - err_after: int|None = None, + err_after: float|None = None, + rng_seed: int = 50, ): # contact target actor, do a stream dialog. async with ( @@ -989,14 +977,18 @@ async def tell_little_bro( basic_echo_server, # XXX proxy any delayed err condition - err_after=err_after, + err_after_imsg=( + err_after * rng_seed + if err_after is not None + else None + ), ) as (sub_ctx, first), sub_ctx.open_stream() as echo_ipc, ): actor: Actor = current_actor() uid: tuple = actor.uid - for i in range(100): + for i in range(rng_seed): msg: tuple = ( uid, i, @@ -1021,13 +1013,13 @@ async def tell_little_bro( ) @pytest.mark.parametrize( 'raise_sub_spawn_error_after', - [None, 50], + [None, 0.5], ) def test_peer_spawns_and_cancels_service_subactor( debug_mode: bool, raise_client_error: str, reg_addr: tuple[str, int], - raise_sub_spawn_error_after: int|None, + raise_sub_spawn_error_after: float|None, ): # NOTE: this tests for the modden `mod wks open piker` bug # discovered as part of implementing workspace ctx @@ -1041,6 +1033,7 @@ def test_peer_spawns_and_cancels_service_subactor( # and the server's spawned child should cancel and terminate! peer_name: str = 'little_bro' + def check_inner_rte(rae: RemoteActorError): ''' Validate the little_bro's relayed inception! @@ -1134,8 +1127,7 @@ def test_peer_spawns_and_cancels_service_subactor( ) try: - res = await client_ctx.result(hide_tb=False) - + res = await client_ctx.wait_for_result(hide_tb=False) # in remote (relayed inception) error # case, we should error on the line above! if raise_sub_spawn_error_after: @@ -1146,6 +1138,23 @@ def test_peer_spawns_and_cancels_service_subactor( assert isinstance(res, ContextCancelled) assert client_ctx.cancel_acked assert res.canceller == root.uid + assert not raise_sub_spawn_error_after + + # cancelling the spawner sub should + # transitively cancel it's sub, the little + # bruv. + print('root cancelling server/client sub-actors') + await spawn_ctx.cancel() + async with tractor.find_actor( + name=peer_name, + ) as sub: + assert not sub + + # XXX, only for tracing + # except BaseException as _berr: + # berr = _berr + # await tractor.pause(shield=True) + # raise berr except RemoteActorError as rae: _err = rae @@ -1174,19 +1183,8 @@ def test_peer_spawns_and_cancels_service_subactor( raise # await tractor.pause() - else: - assert not raise_sub_spawn_error_after - - # cancelling the spawner sub should - # transitively cancel it's sub, the little - # bruv. - print('root cancelling server/client sub-actors') - await spawn_ctx.cancel() - async with tractor.find_actor( - name=peer_name, - ) as sub: - assert not sub + # await tractor.pause() # await server.cancel_actor() except RemoteActorError as rae: @@ -1199,7 +1197,7 @@ def test_peer_spawns_and_cancels_service_subactor( # since we called `.cancel_actor()`, `.cancel_ack` # will not be set on the ctx bc `ctx.cancel()` was not - # called directly fot this confext. + # called directly for this confext. except ContextCancelled as ctxc: _ctxc = ctxc print( @@ -1239,12 +1237,19 @@ def test_peer_spawns_and_cancels_service_subactor( # assert spawn_ctx.cancelled_caught + async def _main(): + with trio.fail_after( + 3 if not debug_mode + else 999 + ): + await main() + if raise_sub_spawn_error_after: with pytest.raises(RemoteActorError) as excinfo: - trio.run(main) + trio.run(_main) rae: RemoteActorError = excinfo.value check_inner_rte(rae) else: - trio.run(main) + trio.run(_main) diff --git a/tests/test_oob_cancellation.py b/tests/test_oob_cancellation.py new file mode 100644 index 00000000..58d12d2b --- /dev/null +++ b/tests/test_oob_cancellation.py @@ -0,0 +1,239 @@ +''' +Define the details of inter-actor "out-of-band" (OoB) cancel +semantics, that is how cancellation works when a cancel request comes +from the different concurrency (primitive's) "layer" then where the +eventual `trio.Task` actually raises a signal. + +''' +from functools import partial +# from contextlib import asynccontextmanager as acm +# import itertools + +import pytest +import trio +import tractor +from tractor import ( # typing + ActorNursery, + Portal, + Context, + # ContextCancelled, + # RemoteActorError, +) +# from tractor._testing import ( +# tractor_test, +# expect_ctxc, +# ) + +# XXX TODO cases: +# - [ ] peer cancelled itself - so other peers should +# get errors reflecting that the peer was itself the .canceller? + +# def test_self_cancel(): +# ''' +# 2 cases: +# - calls `Actor.cancel()` locally in some task +# - calls LocalPortal.cancel_actor()` ? +# +# things to ensure! +# -[ ] the ctxc raised in a child should ideally show the tb of the +# underlying `Cancelled` checkpoint, i.e. +# `raise scope_error from ctxc`? +# +# -[ ] a self-cancelled context, if not allowed to block on +# `ctx.result()` at some point will hang since the `ctx._scope` +# is never `.cancel_called`; cases for this include, +# - an `open_ctx()` which never starteds before being OoB actor +# cancelled. +# |_ parent task will be blocked in `.open_context()` for the +# `Started` msg, and when the OoB ctxc arrives `ctx._scope` +# will never have been signalled.. + +# ''' +# ... + +# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py` +# but with the `Lock.acquire()` from a `@context` to ensure the +# implicit ignore-case-non-unmasking. +# +# @tractor.context +# async def acquire_actor_global_lock( +# ctx: tractor.Context, +# ignore_special_cases: bool, +# ): + +# async with maybe_unmask_excs( +# ignore_special_cases=ignore_special_cases, +# ): +# await ctx.started('locked') + +# # block til cancelled +# await trio.sleep_forever() + + +@tractor.context +async def sleep_forever( + ctx: tractor.Context, + # ignore_special_cases: bool, + do_started: bool, +): + + # async with maybe_unmask_excs( + # ignore_special_cases=ignore_special_cases, + # ): + # await ctx.started('locked') + if do_started: + await ctx.started() + + # block til cancelled + print('sleepin on child-side..') + await trio.sleep_forever() + + +@pytest.mark.parametrize( + 'cancel_ctx', + [True, False], +) +def test_cancel_ctx_with_parent_side_entered_in_bg_task( + debug_mode: bool, + loglevel: str, + cancel_ctx: bool, +): + ''' + The most "basic" out-of-band-task self-cancellation case where + `Portal.open_context()` is entered in a bg task and the + parent-task (of the containing nursery) calls `Context.cancel()` + without the child knowing; the `Context._scope` should be + `.cancel_called` when the IPC ctx's child-side relays + a `ContextCancelled` with a `.canceller` set to the parent + actor('s task). + + ''' + async def main(): + with trio.fail_after( + 2 if not debug_mode else 999, + ): + an: ActorNursery + async with ( + tractor.open_nursery( + debug_mode=debug_mode, + loglevel='devx', + enable_stack_on_sig=True, + ) as an, + trio.open_nursery() as tn, + ): + ptl: Portal = await an.start_actor( + 'sub', + enable_modules=[__name__], + ) + + async def _open_ctx_async( + do_started: bool = True, + task_status=trio.TASK_STATUS_IGNORED, + ): + # do we expect to never enter the + # `.open_context()` below. + if not do_started: + task_status.started() + + async with ptl.open_context( + sleep_forever, + do_started=do_started, + ) as (ctx, first): + task_status.started(ctx) + await trio.sleep_forever() + + # XXX, this is the key OoB part! + # + # - start the `.open_context()` in a bg task which + # blocks inside the embedded scope-body, + # + # - when we call `Context.cancel()` it **is + # not** from the same task which eventually runs + # `.__aexit__()`, + # + # - since the bg "opener" task will be in + # a `trio.sleep_forever()`, it must be interrupted + # by the `ContextCancelled` delivered from the + # child-side; `Context._scope: CancelScope` MUST + # be `.cancel_called`! + # + print('ASYNC opening IPC context in subtask..') + maybe_ctx: Context|None = await tn.start(partial( + _open_ctx_async, + )) + + if ( + maybe_ctx + and + cancel_ctx + ): + print('cancelling first IPC ctx!') + await maybe_ctx.cancel() + + # XXX, note that despite `maybe_context.cancel()` + # being called above, it's the parent (bg) task + # which was originally never interrupted in + # the `ctx._scope` body due to missing case logic in + # `ctx._maybe_cancel_and_set_remote_error()`. + # + # It didn't matter that the subactor process was + # already terminated and reaped, nothing was + # cancelling the ctx-parent task's scope! + # + print('cancelling subactor!') + await ptl.cancel_actor() + + if maybe_ctx: + try: + await maybe_ctx.wait_for_result() + except tractor.ContextCancelled as ctxc: + assert not cancel_ctx + assert ( + ctxc.canceller + == + tractor.current_actor().aid.uid + ) + # don't re-raise since it'll trigger + # an EG from the above tn. + + if cancel_ctx: + # graceful self-cancel + trio.run(main) + + else: + # ctx parent task should see OoB ctxc due to + # `ptl.cancel_actor()`. + with pytest.raises(tractor.ContextCancelled) as excinfo: + trio.run(main) + + assert 'root' in excinfo.value.canceller[0] + + +# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it( +# debug_mode: bool, +# loglevel: str, +# ): +# ''' +# Demos OoB cancellation from the perspective of a ctx opened with +# a child subactor where the parent cancels the child at the "actor +# layer" using `Portal.cancel_actor()` and thus the +# `ContextCancelled.canceller` received by the ctx's parent-side +# task will appear to be a "self cancellation" even though that +# specific task itself was not cancelled and thus +# `Context.cancel_called ==False`. +# ''' + # TODO, do we have an existing implied ctx + # cancel test like this? + # with trio.move_on_after(0.5):# as cs: + # await _open_ctx_async( + # do_started=False, + # ) + + + # in-line ctx scope should definitely raise + # a ctxc with `.canceller = 'root'` + # async with ptl.open_context( + # sleep_forever, + # do_started=True, + # ) as pair: + diff --git a/tractor/_context.py b/tractor/_context.py index 9e277a88..da31e423 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -442,25 +442,25 @@ class Context: ''' Records whether cancellation has been requested for this context by a call to `.cancel()` either due to, - - either an explicit call by some local task, + - an explicit call by some local task, - or an implicit call due to an error caught inside - the ``Portal.open_context()`` block. + the `Portal.open_context()` block. ''' return self._cancel_called - @cancel_called.setter - def cancel_called(self, val: bool) -> None: - ''' - Set the self-cancelled request `bool` value. + # XXX, to debug who frickin sets it.. + # @cancel_called.setter + # def cancel_called(self, val: bool) -> None: + # ''' + # Set the self-cancelled request `bool` value. - ''' - # to debug who frickin sets it.. - # if val: - # from .devx import pause_from_sync - # pause_from_sync() + # ''' + # if val: + # from .devx import pause_from_sync + # pause_from_sync() - self._cancel_called = val + # self._cancel_called = val @property def canceller(self) -> tuple[str, str]|None: @@ -635,6 +635,71 @@ class Context: ''' await self.chan.send(Stop(cid=self.cid)) + @property + def parent_task(self) -> trio.Task: + ''' + This IPC context's "owning task" which is a `trio.Task` + on one of the "sides" of the IPC. + + Note that the "parent_" prefix here refers to the local + `trio` task tree using the same interface as + `trio.Nursery.parent_task` whereas for IPC contexts, + a different cross-actor task hierarchy exists: + + - a "parent"-side which originally entered + `Portal.open_context()`, + + - the "child"-side which was spawned and scheduled to invoke + a function decorated with `@tractor.context`. + + This task is thus a handle to mem-domain-distinct/per-process + `Nursery.parent_task` depending on in which of the above + "sides" this context exists. + + ''' + return self._task + + def _is_blocked_on_rx_chan(self) -> bool: + ''' + Predicate to indicate whether the owner `._task: trio.Task` is + currently blocked (by `.receive()`-ing) on its underlying RPC + feeder `._rx_chan`. + + This knowledge is highly useful when handling so called + "out-of-band" (OoB) cancellation conditions where a peer + actor's task transmitted some remote error/cancel-msg and we + must know whether to signal-via-cancel currently executing + "user-code" (user defined code embedded in `ctx._scope`) or + simply to forward the IPC-msg-as-error **without calling** + `._scope.cancel()`. + + In the latter case it is presumed that if the owner task is + blocking for the next IPC msg, it will eventually receive, + process and raise the equivalent local error **without** + requiring `._scope.cancel()` to be explicitly called by the + *delivering OoB RPC-task* (via `_deliver_msg()`). + + ''' + # NOTE, see the mem-chan meth-impls for *why* this + # logic works, + # `trio._channel.MemoryReceiveChannel.receive[_nowait]()` + # + # XXX realize that this is NOT an + # official/will-be-loudly-deprecated API: + # - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data + # |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled + # + # orig repo intro in the mem-chan change over patch: + # - https://github.com/python-trio/trio/pull/586#issuecomment-414039117 + # |_https://github.com/python-trio/trio/pull/616 + # |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0 + # + return ( + self._task.custom_sleep_data + is + self._rx_chan + ) + def _maybe_cancel_and_set_remote_error( self, error: BaseException, @@ -787,13 +852,27 @@ class Context: if self._canceller is None: log.error('Ctx has no canceller set!?') + cs: trio.CancelScope = self._scope + + # ?TODO? see comment @ .start_remote_task()` + # + # if not cs: + # from .devx import mk_pdb + # mk_pdb().set_trace() + # raise RuntimeError( + # f'IPC ctx was not be opened prior to remote error delivery !?\n' + # f'{self}\n' + # f'\n' + # f'`Portal.open_context()` must be entered (somewhere) beforehand!\n' + # ) + # Cancel the local `._scope`, catch that # `._scope.cancelled_caught` and re-raise any remote error # once exiting (or manually calling `.wait_for_result()`) the # `.open_context()` block. - cs: trio.CancelScope = self._scope if ( cs + and not cs.cancel_called # XXX this is an expected cancel request response # message and we **don't need to raise it** in the @@ -802,8 +881,7 @@ class Context: # if `._cancel_called` then `.cancel_acked and .cancel_called` # always should be set. and not self._is_self_cancelled() - and not cs.cancel_called - and not cs.cancelled_caught + # and not cs.cancelled_caught ): if ( msgerr @@ -814,7 +892,7 @@ class Context: not self._cancel_on_msgerr ): message: str = ( - 'NOT Cancelling `Context._scope` since,\n' + f'NOT Cancelling `Context._scope` since,\n' f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' f'AND we got a msg-type-error!\n' f'{error}\n' @@ -824,13 +902,43 @@ class Context: # `trio.Cancelled` subtype here ;) # https://github.com/goodboy/tractor/issues/368 message: str = 'Cancelling `Context._scope` !\n\n' - # from .devx import pause_from_sync - # pause_from_sync() - self._scope.cancel() - else: - message: str = 'NOT cancelling `Context._scope` !\n\n' + cs.cancel() + + # TODO, explicit condition for OoB (self-)cancellation? + # - we called `Portal.cancel_actor()` from this actor + # and the peer ctx task delivered ctxc due to it. + # - currently `self._is_self_cancelled()` will be true + # since the ctxc.canceller check will match us even though it + # wasn't from this ctx specifically! + elif ( + cs + and self._is_self_cancelled() + and not cs.cancel_called + ): + message: str = ( + 'Cancelling `ctx._scope` due to OoB self-cancel ?!\n' + '\n' + ) # from .devx import mk_pdb # mk_pdb().set_trace() + # TODO XXX, required to fix timeout failure in + # `test_cancelled_lockacquire_in_ipctx_not_unmaskeed` + # + + # XXX NOTE XXX, this is SUPER SUBTLE! + # we only want to cancel our embedded `._scope` + # if the ctx's current/using task is NOT blocked + # on `._rx_chan.receive()` and on some other + # `trio`-checkpoint since in the former case + # any `._remote_error` will be relayed through + # the rx-chan and appropriately raised by the owning + # `._task` directly. IF the owner task is however + # blocking elsewhere we need to interrupt it **now**. + if not self._is_blocked_on_rx_chan(): + cs.cancel() + else: + # rx_stats = self._rx_chan.statistics() + message: str = 'NOT cancelling `Context._scope` !\n\n' fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' if ( @@ -854,6 +962,7 @@ class Context: + cs_fmt ) + log.cancel( message + @@ -946,8 +1055,9 @@ class Context: ''' side: str = self.side - # XXX for debug via the `@.setter` - self.cancel_called = True + self._cancel_called = True + # ^ XXX for debug via the `@.setter` + # self.cancel_called = True header: str = ( f'Cancelling ctx from {side!r}-side\n' @@ -2011,6 +2121,9 @@ async def open_context_from_portal( f'|_{portal.actor}\n' ) + # ?TODO? could we move this to inside the `tn` block? + # -> would allow doing `ctx.parent_task = tn.parent_task` ? + # -> would allow a `if not ._scope: => raise RTE` ? ctx: Context = await portal.actor.start_remote_task( portal.channel, nsf=nsf, @@ -2037,6 +2150,7 @@ async def open_context_from_portal( scope_err: BaseException|None = None ctxc_from_child: ContextCancelled|None = None try: + # from .devx import pause async with ( collapse_eg(), trio.open_nursery() as tn, @@ -2059,6 +2173,10 @@ async def open_context_from_portal( # the dialog, the `Error` msg should be raised from the `msg` # handling block below. try: + log.runtime( + f'IPC ctx parent waiting on Started msg..\n' + f'ctx.cid: {ctx.cid!r}\n' + ) started_msg, first = await ctx._pld_rx.recv_msg( ipc=ctx, expect_msg=Started, @@ -2067,16 +2185,16 @@ async def open_context_from_portal( ) except trio.Cancelled as taskc: ctx_cs: trio.CancelScope = ctx._scope + log.cancel( + f'IPC ctx was cancelled during "child" task sync due to\n\n' + f'.cid: {ctx.cid!r}\n' + f'.maybe_error: {ctx.maybe_error!r}\n' + ) + # await pause(shield=True) + if not ctx_cs.cancel_called: raise - # from .devx import pause - # await pause(shield=True) - - log.cancel( - 'IPC ctx was cancelled during "child" task sync due to\n\n' - f'{ctx.maybe_error}\n' - ) # OW if the ctx's scope was cancelled manually, # likely the `Context` was cancelled via a call to # `._maybe_cancel_and_set_remote_error()` so ensure @@ -2199,7 +2317,7 @@ async def open_context_from_portal( # documenting it as a definittive example of # debugging the tractor-runtime itself using it's # own `.devx.` tooling! - # + # # await debug.pause() # CASE 2: context was cancelled by local task calling @@ -2272,13 +2390,16 @@ async def open_context_from_portal( match scope_err: case trio.Cancelled(): logmeth = log.cancel + cause: str = 'cancelled' # XXX explicitly report on any non-graceful-taskc cases case _: + cause: str = 'errored' logmeth = log.exception logmeth( - f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n' + f'ctx {ctx.side!r}-side {cause!r} with,\n' + f'{ctx.repr_outcome()!r}\n' ) if debug_mode(): @@ -2303,6 +2424,7 @@ async def open_context_from_portal( # told us it's cancelled ;p if ctxc_from_child is None: try: + # await pause(shield=True) await ctx.cancel() except ( trio.BrokenResourceError, @@ -2459,8 +2581,10 @@ async def open_context_from_portal( log.cancel( f'Context cancelled by local {ctx.side!r}-side task\n' f'c)>\n' - f' |_{ctx._task}\n\n' - f'{repr(scope_err)}\n' + f' |_{ctx.parent_task}\n' + f' .cid={ctx.cid!r}\n' + f'\n' + f'{scope_err!r}\n' ) # TODO: should we add a `._cancel_req_received` diff --git a/tractor/_supervise.py b/tractor/_supervise.py index be89c4cb..22ef62b4 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -446,12 +446,12 @@ class ActorNursery: @acm async def _open_and_supervise_one_cancels_all_nursery( actor: Actor, - tb_hide: bool = False, + hide_tb: bool = True, ) -> typing.AsyncGenerator[ActorNursery, None]: # normally don't need to show user by default - __tracebackhide__: bool = tb_hide + __tracebackhide__: bool = hide_tb outer_err: BaseException|None = None inner_err: BaseException|None = None diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 1dad63c8..aa215a81 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -613,10 +613,9 @@ async def drain_to_final_msg( # msg: dict = await ctx._rx_chan.receive() # if res_cs.cancelled_caught: # - # -[ ] make sure pause points work here for REPLing + # -[x] make sure pause points work here for REPLing # the runtime itself; i.e. ensure there's no hangs! - # |_from tractor.devx.debug import pause - # await pause() + # |_see masked code below in .cancel_called path # NOTE: we get here if the far end was # `ContextCancelled` in 2 cases: @@ -652,6 +651,10 @@ async def drain_to_final_msg( f'IPC ctx cancelled externally during result drain ?\n' f'{ctx}' ) + # XXX, for tracing `Cancelled`.. + # from tractor.devx.debug import pause + # await pause(shield=True) + # CASE 2: mask the local cancelled-error(s) # only when we are sure the remote error is # the source cause of this local task's