From 217d54b9d135b980c9ae25262d0e7136077a0fd8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 7 Sep 2025 20:19:50 -0400 Subject: [PATCH 1/8] Add the minimal OoB cancel edge case from #391 Discovered while writing a `@context` sanity test to verify unmasker ignore-cases support. Masked code is due to the process of finding the minimal example causing the original hang discovered in the original examples script. Details are in the test-fn doc strings and surrounding comments; more refinement and cleanup coming obviously. Also moved over the self-cancel todos from the inter-peer tests module. --- tests/test_inter_peer_cancellation.py | 14 -- tests/test_oob_cancellation.py | 212 ++++++++++++++++++++++++++ 2 files changed, 212 insertions(+), 14 deletions(-) create mode 100644 tests/test_oob_cancellation.py diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index b6d469d9..84865824 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -24,14 +24,10 @@ from tractor._testing import ( ) # XXX TODO cases: -# - [ ] peer cancelled itself - so other peers should -# get errors reflecting that the peer was itself the .canceller? - # - [x] WE cancelled the peer and thus should not see any raised # `ContextCancelled` as it should be reaped silently? # => pretty sure `test_context_stream_semantics::test_caller_cancels()` # already covers this case? - # - [x] INTER-PEER: some arbitrary remote peer cancels via # Portal.cancel_actor(). # => all other connected peers should get that cancel requesting peer's @@ -44,16 +40,6 @@ from tractor._testing import ( # that also spawned a remote task task in that same peer-parent. -# def test_self_cancel(): -# ''' -# 2 cases: -# - calls `Actor.cancel()` locally in some task -# - calls LocalPortal.cancel_actor()` ? - -# ''' -# ... - - @tractor.context async def open_stream_then_sleep_forever( ctx: Context, diff --git a/tests/test_oob_cancellation.py b/tests/test_oob_cancellation.py new file mode 100644 index 00000000..c65955d5 --- /dev/null +++ b/tests/test_oob_cancellation.py @@ -0,0 +1,212 @@ +''' +Define the details of inter-actor "out-of-band" (OoB) cancel +semantics, that is how cancellation works when a cancel request comes +from the different concurrency (primitive's) "layer" then where the +eventual `trio.Task` actually raises a signal. + +''' +from functools import partial +# from contextlib import asynccontextmanager as acm +# import itertools + +# import pytest +import trio +import tractor +from tractor import ( # typing + ActorNursery, + Portal, + Context, + # ContextCancelled, + # RemoteActorError, +) +# from tractor._testing import ( +# tractor_test, +# expect_ctxc, +# ) + +# XXX TODO cases: +# - [ ] peer cancelled itself - so other peers should +# get errors reflecting that the peer was itself the .canceller? + +# def test_self_cancel(): +# ''' +# 2 cases: +# - calls `Actor.cancel()` locally in some task +# - calls LocalPortal.cancel_actor()` ? +# +# things to ensure! +# -[ ] the ctxc raised in a child should ideally show the tb of the +# underlying `Cancelled` checkpoint, i.e. +# `raise scope_error from ctxc`? +# +# -[ ] a self-cancelled context, if not allowed to block on +# `ctx.result()` at some point will hang since the `ctx._scope` +# is never `.cancel_called`; cases for this include, +# - an `open_ctx()` which never starteds before being OoB actor +# cancelled. +# |_ parent task will be blocked in `.open_context()` for the +# `Started` msg, and when the OoB ctxc arrives `ctx._scope` +# will never have been signalled.. + +# ''' +# ... + +# TODO, sanity test against the case in `/examples/trio/lockacquire_not_unmasked.py` +# but with the `Lock.acquire()` from a `@context` to ensure the +# implicit ignore-case-non-unmasking. +# +# @tractor.context +# async def acquire_actor_global_lock( +# ctx: tractor.Context, +# ignore_special_cases: bool, +# ): + +# async with maybe_unmask_excs( +# ignore_special_cases=ignore_special_cases, +# ): +# await ctx.started('locked') + +# # block til cancelled +# await trio.sleep_forever() + + +@tractor.context +async def sleep_forever( + ctx: tractor.Context, + # ignore_special_cases: bool, + do_started: bool, +): + + # async with maybe_unmask_excs( + # ignore_special_cases=ignore_special_cases, + # ): + # await ctx.started('locked') + if do_started: + await ctx.started() + + # block til cancelled + print('sleepin on child-side..') + await trio.sleep_forever() + + +def test_cancel_ctx_with_parent_side_entered_in_bg_task( + debug_mode: bool, + loglevel: str, + cancel_ctx: bool = False, +): + ''' + The most "basic" out-of-band-task self-cancellation case where + `Portal.open_context()` is entered in a bg task and the + parent-task (of the containing nursery) calls `Context.cancel()` + without the child knowing; the `Context._scope` should be + `.cancel_called` when the IPC ctx's child-side relays + a `ContextCancelled` with a `.canceller` set to the parent + actor('s task). + + ''' + async def main(): + with trio.fail_after( + 2 if not debug_mode else 999, + ): + an: ActorNursery + async with ( + tractor.open_nursery( + debug_mode=debug_mode, + loglevel='devx', + enable_stack_on_sig=True, + ) as an, + trio.open_nursery() as tn, + ): + ptl: Portal = await an.start_actor( + 'sub', + enable_modules=[__name__], + ) + + async def _open_ctx_async( + do_started: bool = True, + task_status=trio.TASK_STATUS_IGNORED, + ): + # do we expect to never enter the + # `.open_context()` below. + if not do_started: + task_status.started() + + async with ptl.open_context( + sleep_forever, + do_started=do_started, + ) as (ctx, first): + task_status.started(ctx) + await trio.sleep_forever() + + # XXX, this is the key OoB part! + # + # - start the `.open_context()` in a bg task which + # blocks inside the embedded scope-body, + # + # - when we call `Context.cancel()` it **is + # not** from the same task which eventually runs + # `.__aexit__()`, + # + # - since the bg "opener" task will be in + # a `trio.sleep_forever()`, it must be interrupted + # by the `ContextCancelled` delivered from the + # child-side; `Context._scope: CancelScope` MUST + # be `.cancel_called`! + # + print('ASYNC opening IPC context in subtask..') + maybe_ctx: Context|None = await tn.start(partial( + _open_ctx_async, + )) + + if ( + maybe_ctx + and + cancel_ctx + ): + print('cancelling first IPC ctx!') + await maybe_ctx.cancel() + + # XXX, note that despite `maybe_context.cancel()` + # being called above, it's the parent (bg) task + # which was originally never interrupted in + # the `ctx._scope` body due to missing case logic in + # `ctx._maybe_cancel_and_set_remote_error()`. + # + # It didn't matter that the subactor process was + # already terminated and reaped, nothing was + # cancelling the ctx-parent task's scope! + # + print('cancelling subactor!') + await ptl.cancel_actor() + + trio.run(main) + + +# def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it( +# debug_mode: bool, +# loglevel: str, +# ): +# ''' +# Demos OoB cancellation from the perspective of a ctx opened with +# a child subactor where the parent cancels the child at the "actor +# layer" using `Portal.cancel_actor()` and thus the +# `ContextCancelled.canceller` received by the ctx's parent-side +# task will appear to be a "self cancellation" even though that +# specific task itself was not cancelled and thus +# `Context.cancel_called ==False`. +# ''' + # TODO, do we have an existing implied ctx + # cancel test like this? + # with trio.move_on_after(0.5):# as cs: + # await _open_ctx_async( + # do_started=False, + # ) + + + # in-line ctx scope should definitely raise + # a ctxc with `.canceller = 'root'` + # async with ptl.open_context( + # sleep_forever, + # do_started=True, + # ) as pair: + From 92eaed6fecffcb527d0fa262c8620472e1214c82 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 8 Sep 2025 17:39:54 -0400 Subject: [PATCH 2/8] Parametrize with `Portal.cancel_actor()` only case Such that when `maybe_context.cancel()` is not called (explicitly) and only the subactor is cancelled by its parent we expect to see a ctxc raised both from any call to `Context.wait_for_result()` and out of the `Portal.open_context()` scope, up to the `trio.run()`. Deats, - obvi call-n-catch the ctxc (in scope) for the oob-only subactor-cancelled case. - add branches around `trio.run()` entry to match. --- tests/test_oob_cancellation.py | 33 ++++++++++++++++++++++++++++++--- 1 file changed, 30 insertions(+), 3 deletions(-) diff --git a/tests/test_oob_cancellation.py b/tests/test_oob_cancellation.py index c65955d5..ba4c801d 100644 --- a/tests/test_oob_cancellation.py +++ b/tests/test_oob_cancellation.py @@ -9,7 +9,7 @@ from functools import partial # from contextlib import asynccontextmanager as acm # import itertools -# import pytest +import pytest import trio import tractor from tractor import ( # typing @@ -89,10 +89,14 @@ async def sleep_forever( await trio.sleep_forever() +@pytest.mark.parametrize( + 'cancel_ctx', + [True, False], +) def test_cancel_ctx_with_parent_side_entered_in_bg_task( debug_mode: bool, loglevel: str, - cancel_ctx: bool = False, + cancel_ctx: bool, ): ''' The most "basic" out-of-band-task self-cancellation case where @@ -179,7 +183,30 @@ def test_cancel_ctx_with_parent_side_entered_in_bg_task( print('cancelling subactor!') await ptl.cancel_actor() - trio.run(main) + if maybe_ctx: + try: + await maybe_ctx.wait_for_result() + except tractor.ContextCancelled as ctxc: + assert not cancel_ctx + assert ( + ctxc.canceller + == + tractor.current_actor().aid.uid + ) + # don't re-raise since it'll trigger + # an EG from the above tn. + + if cancel_ctx: + # graceful self-cancel + trio.run(main) + + else: + # ctx parent task should see OoB ctxc due to + # `ptl.cancel_actor()`. + with pytest.raises(tractor.ContextCancelled) as excinfo: + trio.run(main) + + 'root' in excinfo.value.canceller[0] # def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it( From 9489a2f84d067a9dfb7af6e74df2d824430ef836 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 8 Sep 2025 17:58:02 -0400 Subject: [PATCH 3/8] Add timeout around `test_peer_spawns_and_cancels_service_subactor` suite --- tests/test_inter_peer_cancellation.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 84865824..86863fb1 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -1225,12 +1225,16 @@ def test_peer_spawns_and_cancels_service_subactor( # assert spawn_ctx.cancelled_caught + async def _main(): + with trio.fail_after(2): + await main() + if raise_sub_spawn_error_after: with pytest.raises(RemoteActorError) as excinfo: - trio.run(main) + trio.run(_main) rae: RemoteActorError = excinfo.value check_inner_rte(rae) else: - trio.run(main) + trio.run(_main) From b1f2a6b3949fd3b69481323b3a1a486ac99a20f6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 8 Sep 2025 18:15:00 -0400 Subject: [PATCH 4/8] Rename var for and hide the `_open_and_supervise_one_cancels_all_nursery` frame --- tractor/_supervise.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tractor/_supervise.py b/tractor/_supervise.py index be89c4cb..22ef62b4 100644 --- a/tractor/_supervise.py +++ b/tractor/_supervise.py @@ -446,12 +446,12 @@ class ActorNursery: @acm async def _open_and_supervise_one_cancels_all_nursery( actor: Actor, - tb_hide: bool = False, + hide_tb: bool = True, ) -> typing.AsyncGenerator[ActorNursery, None]: # normally don't need to show user by default - __tracebackhide__: bool = tb_hide + __tracebackhide__: bool = hide_tb outer_err: BaseException|None = None inner_err: BaseException|None = None From 73423ef2b7cdc36c2d2ab71ecc71c4ecc33d8168 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 9 Sep 2025 17:33:20 -0400 Subject: [PATCH 5/8] Timeout on `test_peer_spawns_and_cancels_service_subactor` While working on a fix to the hang case found from `test_cancel_ctx_with_parent_side_entered_in_bg_task` an initial solution caused this test to hang indefinitely; solve it with a small wrapping `_main()` + `trio.fail_after()` entrypoint. Further suite refinements, - move the top-most `try:`->`else:` block - toss in a masked base-exc block for tracing unexpected `ctx.wait_for_result()` outcomes. - tweak the `raise_sub_spawn_error_after` to be an optional `float` which scales the `rng_seed: int = 50` msg counter to `tell_little_bro()` so that the abs value to the `range()` can be changed. --- tests/test_inter_peer_cancellation.py | 63 +++++++++++++++++---------- 1 file changed, 39 insertions(+), 24 deletions(-) diff --git a/tests/test_inter_peer_cancellation.py b/tests/test_inter_peer_cancellation.py index 86863fb1..79c454f6 100644 --- a/tests/test_inter_peer_cancellation.py +++ b/tests/test_inter_peer_cancellation.py @@ -792,7 +792,7 @@ async def basic_echo_server( ctx: Context, peer_name: str = 'wittle_bruv', - err_after: int|None = None, + err_after_imsg: int|None = None, ) -> None: ''' @@ -821,8 +821,9 @@ async def basic_echo_server( await ipc.send(resp) if ( - err_after - and i > err_after + err_after_imsg + and + i > err_after_imsg ): raise RuntimeError( f'Simulated error in `{peer_name}`' @@ -964,7 +965,8 @@ async def tell_little_bro( actor_name: str, caller: str = '', - err_after: int|None = None, + err_after: float|None = None, + rng_seed: int = 50, ): # contact target actor, do a stream dialog. async with ( @@ -975,14 +977,18 @@ async def tell_little_bro( basic_echo_server, # XXX proxy any delayed err condition - err_after=err_after, + err_after_imsg=( + err_after * rng_seed + if err_after is not None + else None + ), ) as (sub_ctx, first), sub_ctx.open_stream() as echo_ipc, ): actor: Actor = current_actor() uid: tuple = actor.uid - for i in range(100): + for i in range(rng_seed): msg: tuple = ( uid, i, @@ -1007,13 +1013,13 @@ async def tell_little_bro( ) @pytest.mark.parametrize( 'raise_sub_spawn_error_after', - [None, 50], + [None, 0.5], ) def test_peer_spawns_and_cancels_service_subactor( debug_mode: bool, raise_client_error: str, reg_addr: tuple[str, int], - raise_sub_spawn_error_after: int|None, + raise_sub_spawn_error_after: float|None, ): # NOTE: this tests for the modden `mod wks open piker` bug # discovered as part of implementing workspace ctx @@ -1027,6 +1033,7 @@ def test_peer_spawns_and_cancels_service_subactor( # and the server's spawned child should cancel and terminate! peer_name: str = 'little_bro' + def check_inner_rte(rae: RemoteActorError): ''' Validate the little_bro's relayed inception! @@ -1120,8 +1127,7 @@ def test_peer_spawns_and_cancels_service_subactor( ) try: - res = await client_ctx.result(hide_tb=False) - + res = await client_ctx.wait_for_result(hide_tb=False) # in remote (relayed inception) error # case, we should error on the line above! if raise_sub_spawn_error_after: @@ -1132,6 +1138,23 @@ def test_peer_spawns_and_cancels_service_subactor( assert isinstance(res, ContextCancelled) assert client_ctx.cancel_acked assert res.canceller == root.uid + assert not raise_sub_spawn_error_after + + # cancelling the spawner sub should + # transitively cancel it's sub, the little + # bruv. + print('root cancelling server/client sub-actors') + await spawn_ctx.cancel() + async with tractor.find_actor( + name=peer_name, + ) as sub: + assert not sub + + # XXX, only for tracing + # except BaseException as _berr: + # berr = _berr + # await tractor.pause(shield=True) + # raise berr except RemoteActorError as rae: _err = rae @@ -1160,19 +1183,8 @@ def test_peer_spawns_and_cancels_service_subactor( raise # await tractor.pause() - else: - assert not raise_sub_spawn_error_after - - # cancelling the spawner sub should - # transitively cancel it's sub, the little - # bruv. - print('root cancelling server/client sub-actors') - await spawn_ctx.cancel() - async with tractor.find_actor( - name=peer_name, - ) as sub: - assert not sub + # await tractor.pause() # await server.cancel_actor() except RemoteActorError as rae: @@ -1185,7 +1197,7 @@ def test_peer_spawns_and_cancels_service_subactor( # since we called `.cancel_actor()`, `.cancel_ack` # will not be set on the ctx bc `ctx.cancel()` was not - # called directly fot this confext. + # called directly for this confext. except ContextCancelled as ctxc: _ctxc = ctxc print( @@ -1226,7 +1238,10 @@ def test_peer_spawns_and_cancels_service_subactor( # assert spawn_ctx.cancelled_caught async def _main(): - with trio.fail_after(2): + with trio.fail_after( + 3 if not debug_mode + else 999 + ): await main() if raise_sub_spawn_error_after: From fc130d06b8d8535362adfcee61f485df387afbe7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 9 Sep 2025 18:13:28 -0400 Subject: [PATCH 6/8] Check off REPL-ing todo add masked usage in `drain_to_final_msg()` --- tractor/msg/_ops.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/tractor/msg/_ops.py b/tractor/msg/_ops.py index 1dad63c8..aa215a81 100644 --- a/tractor/msg/_ops.py +++ b/tractor/msg/_ops.py @@ -613,10 +613,9 @@ async def drain_to_final_msg( # msg: dict = await ctx._rx_chan.receive() # if res_cs.cancelled_caught: # - # -[ ] make sure pause points work here for REPLing + # -[x] make sure pause points work here for REPLing # the runtime itself; i.e. ensure there's no hangs! - # |_from tractor.devx.debug import pause - # await pause() + # |_see masked code below in .cancel_called path # NOTE: we get here if the far end was # `ContextCancelled` in 2 cases: @@ -652,6 +651,10 @@ async def drain_to_final_msg( f'IPC ctx cancelled externally during result drain ?\n' f'{ctx}' ) + # XXX, for tracing `Cancelled`.. + # from tractor.devx.debug import pause + # await pause(shield=True) + # CASE 2: mask the local cancelled-error(s) # only when we are sure the remote error is # the source cause of this local task's From 0c6d512ba47e13d57b09443fd6fc3d056aaa5866 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 10 Sep 2025 21:09:40 -0400 Subject: [PATCH 7/8] Solve another OoB cancellation case, the bg task one Such that we are able to (finally) detect when we should `Context._scope.cancel()` specifically when the `.parent_task` is **not** blocking on receiving from the underlying `._rx_chan`, since if the task is blocking on `.receive()` it will call `.cancel()` implicitly. This is a lot to explain with very little code actually needed for the implementation (are we like `trio` yet anyone?? XD) but the main jist is that `Context._maybe_cancel_and_set_remote_error()` needed the additional case of calling `._scope.cancel()` whenever we know that a remote-error/ctxc won't be immediately handled, bc user code is doing non `Context`-API things, and result in a similar outcome as if that task was waiting on `Context.wait_for_result()` or `.__aexite__()`. Impl details, - add a new `._is_blocked_on_rx_chan()` method which predicates whether the (new) `.parent_task` is blocking on `._rx_chan.receive()`. * see various stipulations about the current impl and how we might need to adjust for the future given `trio`'s commitment to the `Task.custom_sleep_data` attr.. - add `.parent_task`, a pub wrapper for `._task`. - check for `not ._is_blocked_on_rx_chan()` before manually cancelling the local `.parent_task` - minimize the surrounding branch case expressions. Other, - tweak a couple logs. - add a new `.cancel()` pre-started msg. - mask the `.cancel_called` setter, it's only (been) used for tracing. - todos around maybe moving the `._nursery` allocation "around" the `.start_remote_task()` call and various subsequent tweaks therein. --- tractor/_context.py | 192 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 158 insertions(+), 34 deletions(-) diff --git a/tractor/_context.py b/tractor/_context.py index 9e277a88..da31e423 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -442,25 +442,25 @@ class Context: ''' Records whether cancellation has been requested for this context by a call to `.cancel()` either due to, - - either an explicit call by some local task, + - an explicit call by some local task, - or an implicit call due to an error caught inside - the ``Portal.open_context()`` block. + the `Portal.open_context()` block. ''' return self._cancel_called - @cancel_called.setter - def cancel_called(self, val: bool) -> None: - ''' - Set the self-cancelled request `bool` value. + # XXX, to debug who frickin sets it.. + # @cancel_called.setter + # def cancel_called(self, val: bool) -> None: + # ''' + # Set the self-cancelled request `bool` value. - ''' - # to debug who frickin sets it.. - # if val: - # from .devx import pause_from_sync - # pause_from_sync() + # ''' + # if val: + # from .devx import pause_from_sync + # pause_from_sync() - self._cancel_called = val + # self._cancel_called = val @property def canceller(self) -> tuple[str, str]|None: @@ -635,6 +635,71 @@ class Context: ''' await self.chan.send(Stop(cid=self.cid)) + @property + def parent_task(self) -> trio.Task: + ''' + This IPC context's "owning task" which is a `trio.Task` + on one of the "sides" of the IPC. + + Note that the "parent_" prefix here refers to the local + `trio` task tree using the same interface as + `trio.Nursery.parent_task` whereas for IPC contexts, + a different cross-actor task hierarchy exists: + + - a "parent"-side which originally entered + `Portal.open_context()`, + + - the "child"-side which was spawned and scheduled to invoke + a function decorated with `@tractor.context`. + + This task is thus a handle to mem-domain-distinct/per-process + `Nursery.parent_task` depending on in which of the above + "sides" this context exists. + + ''' + return self._task + + def _is_blocked_on_rx_chan(self) -> bool: + ''' + Predicate to indicate whether the owner `._task: trio.Task` is + currently blocked (by `.receive()`-ing) on its underlying RPC + feeder `._rx_chan`. + + This knowledge is highly useful when handling so called + "out-of-band" (OoB) cancellation conditions where a peer + actor's task transmitted some remote error/cancel-msg and we + must know whether to signal-via-cancel currently executing + "user-code" (user defined code embedded in `ctx._scope`) or + simply to forward the IPC-msg-as-error **without calling** + `._scope.cancel()`. + + In the latter case it is presumed that if the owner task is + blocking for the next IPC msg, it will eventually receive, + process and raise the equivalent local error **without** + requiring `._scope.cancel()` to be explicitly called by the + *delivering OoB RPC-task* (via `_deliver_msg()`). + + ''' + # NOTE, see the mem-chan meth-impls for *why* this + # logic works, + # `trio._channel.MemoryReceiveChannel.receive[_nowait]()` + # + # XXX realize that this is NOT an + # official/will-be-loudly-deprecated API: + # - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data + # |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled + # + # orig repo intro in the mem-chan change over patch: + # - https://github.com/python-trio/trio/pull/586#issuecomment-414039117 + # |_https://github.com/python-trio/trio/pull/616 + # |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0 + # + return ( + self._task.custom_sleep_data + is + self._rx_chan + ) + def _maybe_cancel_and_set_remote_error( self, error: BaseException, @@ -787,13 +852,27 @@ class Context: if self._canceller is None: log.error('Ctx has no canceller set!?') + cs: trio.CancelScope = self._scope + + # ?TODO? see comment @ .start_remote_task()` + # + # if not cs: + # from .devx import mk_pdb + # mk_pdb().set_trace() + # raise RuntimeError( + # f'IPC ctx was not be opened prior to remote error delivery !?\n' + # f'{self}\n' + # f'\n' + # f'`Portal.open_context()` must be entered (somewhere) beforehand!\n' + # ) + # Cancel the local `._scope`, catch that # `._scope.cancelled_caught` and re-raise any remote error # once exiting (or manually calling `.wait_for_result()`) the # `.open_context()` block. - cs: trio.CancelScope = self._scope if ( cs + and not cs.cancel_called # XXX this is an expected cancel request response # message and we **don't need to raise it** in the @@ -802,8 +881,7 @@ class Context: # if `._cancel_called` then `.cancel_acked and .cancel_called` # always should be set. and not self._is_self_cancelled() - and not cs.cancel_called - and not cs.cancelled_caught + # and not cs.cancelled_caught ): if ( msgerr @@ -814,7 +892,7 @@ class Context: not self._cancel_on_msgerr ): message: str = ( - 'NOT Cancelling `Context._scope` since,\n' + f'NOT Cancelling `Context._scope` since,\n' f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' f'AND we got a msg-type-error!\n' f'{error}\n' @@ -824,13 +902,43 @@ class Context: # `trio.Cancelled` subtype here ;) # https://github.com/goodboy/tractor/issues/368 message: str = 'Cancelling `Context._scope` !\n\n' - # from .devx import pause_from_sync - # pause_from_sync() - self._scope.cancel() - else: - message: str = 'NOT cancelling `Context._scope` !\n\n' + cs.cancel() + + # TODO, explicit condition for OoB (self-)cancellation? + # - we called `Portal.cancel_actor()` from this actor + # and the peer ctx task delivered ctxc due to it. + # - currently `self._is_self_cancelled()` will be true + # since the ctxc.canceller check will match us even though it + # wasn't from this ctx specifically! + elif ( + cs + and self._is_self_cancelled() + and not cs.cancel_called + ): + message: str = ( + 'Cancelling `ctx._scope` due to OoB self-cancel ?!\n' + '\n' + ) # from .devx import mk_pdb # mk_pdb().set_trace() + # TODO XXX, required to fix timeout failure in + # `test_cancelled_lockacquire_in_ipctx_not_unmaskeed` + # + + # XXX NOTE XXX, this is SUPER SUBTLE! + # we only want to cancel our embedded `._scope` + # if the ctx's current/using task is NOT blocked + # on `._rx_chan.receive()` and on some other + # `trio`-checkpoint since in the former case + # any `._remote_error` will be relayed through + # the rx-chan and appropriately raised by the owning + # `._task` directly. IF the owner task is however + # blocking elsewhere we need to interrupt it **now**. + if not self._is_blocked_on_rx_chan(): + cs.cancel() + else: + # rx_stats = self._rx_chan.statistics() + message: str = 'NOT cancelling `Context._scope` !\n\n' fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' if ( @@ -854,6 +962,7 @@ class Context: + cs_fmt ) + log.cancel( message + @@ -946,8 +1055,9 @@ class Context: ''' side: str = self.side - # XXX for debug via the `@.setter` - self.cancel_called = True + self._cancel_called = True + # ^ XXX for debug via the `@.setter` + # self.cancel_called = True header: str = ( f'Cancelling ctx from {side!r}-side\n' @@ -2011,6 +2121,9 @@ async def open_context_from_portal( f'|_{portal.actor}\n' ) + # ?TODO? could we move this to inside the `tn` block? + # -> would allow doing `ctx.parent_task = tn.parent_task` ? + # -> would allow a `if not ._scope: => raise RTE` ? ctx: Context = await portal.actor.start_remote_task( portal.channel, nsf=nsf, @@ -2037,6 +2150,7 @@ async def open_context_from_portal( scope_err: BaseException|None = None ctxc_from_child: ContextCancelled|None = None try: + # from .devx import pause async with ( collapse_eg(), trio.open_nursery() as tn, @@ -2059,6 +2173,10 @@ async def open_context_from_portal( # the dialog, the `Error` msg should be raised from the `msg` # handling block below. try: + log.runtime( + f'IPC ctx parent waiting on Started msg..\n' + f'ctx.cid: {ctx.cid!r}\n' + ) started_msg, first = await ctx._pld_rx.recv_msg( ipc=ctx, expect_msg=Started, @@ -2067,16 +2185,16 @@ async def open_context_from_portal( ) except trio.Cancelled as taskc: ctx_cs: trio.CancelScope = ctx._scope + log.cancel( + f'IPC ctx was cancelled during "child" task sync due to\n\n' + f'.cid: {ctx.cid!r}\n' + f'.maybe_error: {ctx.maybe_error!r}\n' + ) + # await pause(shield=True) + if not ctx_cs.cancel_called: raise - # from .devx import pause - # await pause(shield=True) - - log.cancel( - 'IPC ctx was cancelled during "child" task sync due to\n\n' - f'{ctx.maybe_error}\n' - ) # OW if the ctx's scope was cancelled manually, # likely the `Context` was cancelled via a call to # `._maybe_cancel_and_set_remote_error()` so ensure @@ -2199,7 +2317,7 @@ async def open_context_from_portal( # documenting it as a definittive example of # debugging the tractor-runtime itself using it's # own `.devx.` tooling! - # + # # await debug.pause() # CASE 2: context was cancelled by local task calling @@ -2272,13 +2390,16 @@ async def open_context_from_portal( match scope_err: case trio.Cancelled(): logmeth = log.cancel + cause: str = 'cancelled' # XXX explicitly report on any non-graceful-taskc cases case _: + cause: str = 'errored' logmeth = log.exception logmeth( - f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n' + f'ctx {ctx.side!r}-side {cause!r} with,\n' + f'{ctx.repr_outcome()!r}\n' ) if debug_mode(): @@ -2303,6 +2424,7 @@ async def open_context_from_portal( # told us it's cancelled ;p if ctxc_from_child is None: try: + # await pause(shield=True) await ctx.cancel() except ( trio.BrokenResourceError, @@ -2459,8 +2581,10 @@ async def open_context_from_portal( log.cancel( f'Context cancelled by local {ctx.side!r}-side task\n' f'c)>\n' - f' |_{ctx._task}\n\n' - f'{repr(scope_err)}\n' + f' |_{ctx.parent_task}\n' + f' .cid={ctx.cid!r}\n' + f'\n' + f'{scope_err!r}\n' ) # TODO: should we add a `._cancel_req_received` From 9f757ffa63ddb98c2e93e34be10936f37c8f2ec2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 11 Sep 2025 13:13:18 -0400 Subject: [PATCH 8/8] Woops, fix missing `assert` thanks to copilot --- tests/test_oob_cancellation.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_oob_cancellation.py b/tests/test_oob_cancellation.py index ba4c801d..58d12d2b 100644 --- a/tests/test_oob_cancellation.py +++ b/tests/test_oob_cancellation.py @@ -206,7 +206,7 @@ def test_cancel_ctx_with_parent_side_entered_in_bg_task( with pytest.raises(tractor.ContextCancelled) as excinfo: trio.run(main) - 'root' in excinfo.value.canceller[0] + assert 'root' in excinfo.value.canceller[0] # def test_parent_actor_cancels_subactor_with_gt1_ctxs_open_to_it(