diff --git a/tractor/_context.py b/tractor/_context.py index 9e277a88..da31e423 100644 --- a/tractor/_context.py +++ b/tractor/_context.py @@ -442,25 +442,25 @@ class Context: ''' Records whether cancellation has been requested for this context by a call to `.cancel()` either due to, - - either an explicit call by some local task, + - an explicit call by some local task, - or an implicit call due to an error caught inside - the ``Portal.open_context()`` block. + the `Portal.open_context()` block. ''' return self._cancel_called - @cancel_called.setter - def cancel_called(self, val: bool) -> None: - ''' - Set the self-cancelled request `bool` value. + # XXX, to debug who frickin sets it.. + # @cancel_called.setter + # def cancel_called(self, val: bool) -> None: + # ''' + # Set the self-cancelled request `bool` value. - ''' - # to debug who frickin sets it.. - # if val: - # from .devx import pause_from_sync - # pause_from_sync() + # ''' + # if val: + # from .devx import pause_from_sync + # pause_from_sync() - self._cancel_called = val + # self._cancel_called = val @property def canceller(self) -> tuple[str, str]|None: @@ -635,6 +635,71 @@ class Context: ''' await self.chan.send(Stop(cid=self.cid)) + @property + def parent_task(self) -> trio.Task: + ''' + This IPC context's "owning task" which is a `trio.Task` + on one of the "sides" of the IPC. + + Note that the "parent_" prefix here refers to the local + `trio` task tree using the same interface as + `trio.Nursery.parent_task` whereas for IPC contexts, + a different cross-actor task hierarchy exists: + + - a "parent"-side which originally entered + `Portal.open_context()`, + + - the "child"-side which was spawned and scheduled to invoke + a function decorated with `@tractor.context`. + + This task is thus a handle to mem-domain-distinct/per-process + `Nursery.parent_task` depending on in which of the above + "sides" this context exists. + + ''' + return self._task + + def _is_blocked_on_rx_chan(self) -> bool: + ''' + Predicate to indicate whether the owner `._task: trio.Task` is + currently blocked (by `.receive()`-ing) on its underlying RPC + feeder `._rx_chan`. + + This knowledge is highly useful when handling so called + "out-of-band" (OoB) cancellation conditions where a peer + actor's task transmitted some remote error/cancel-msg and we + must know whether to signal-via-cancel currently executing + "user-code" (user defined code embedded in `ctx._scope`) or + simply to forward the IPC-msg-as-error **without calling** + `._scope.cancel()`. + + In the latter case it is presumed that if the owner task is + blocking for the next IPC msg, it will eventually receive, + process and raise the equivalent local error **without** + requiring `._scope.cancel()` to be explicitly called by the + *delivering OoB RPC-task* (via `_deliver_msg()`). + + ''' + # NOTE, see the mem-chan meth-impls for *why* this + # logic works, + # `trio._channel.MemoryReceiveChannel.receive[_nowait]()` + # + # XXX realize that this is NOT an + # official/will-be-loudly-deprecated API: + # - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data + # |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled + # + # orig repo intro in the mem-chan change over patch: + # - https://github.com/python-trio/trio/pull/586#issuecomment-414039117 + # |_https://github.com/python-trio/trio/pull/616 + # |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0 + # + return ( + self._task.custom_sleep_data + is + self._rx_chan + ) + def _maybe_cancel_and_set_remote_error( self, error: BaseException, @@ -787,13 +852,27 @@ class Context: if self._canceller is None: log.error('Ctx has no canceller set!?') + cs: trio.CancelScope = self._scope + + # ?TODO? see comment @ .start_remote_task()` + # + # if not cs: + # from .devx import mk_pdb + # mk_pdb().set_trace() + # raise RuntimeError( + # f'IPC ctx was not be opened prior to remote error delivery !?\n' + # f'{self}\n' + # f'\n' + # f'`Portal.open_context()` must be entered (somewhere) beforehand!\n' + # ) + # Cancel the local `._scope`, catch that # `._scope.cancelled_caught` and re-raise any remote error # once exiting (or manually calling `.wait_for_result()`) the # `.open_context()` block. - cs: trio.CancelScope = self._scope if ( cs + and not cs.cancel_called # XXX this is an expected cancel request response # message and we **don't need to raise it** in the @@ -802,8 +881,7 @@ class Context: # if `._cancel_called` then `.cancel_acked and .cancel_called` # always should be set. and not self._is_self_cancelled() - and not cs.cancel_called - and not cs.cancelled_caught + # and not cs.cancelled_caught ): if ( msgerr @@ -814,7 +892,7 @@ class Context: not self._cancel_on_msgerr ): message: str = ( - 'NOT Cancelling `Context._scope` since,\n' + f'NOT Cancelling `Context._scope` since,\n' f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' f'AND we got a msg-type-error!\n' f'{error}\n' @@ -824,13 +902,43 @@ class Context: # `trio.Cancelled` subtype here ;) # https://github.com/goodboy/tractor/issues/368 message: str = 'Cancelling `Context._scope` !\n\n' - # from .devx import pause_from_sync - # pause_from_sync() - self._scope.cancel() - else: - message: str = 'NOT cancelling `Context._scope` !\n\n' + cs.cancel() + + # TODO, explicit condition for OoB (self-)cancellation? + # - we called `Portal.cancel_actor()` from this actor + # and the peer ctx task delivered ctxc due to it. + # - currently `self._is_self_cancelled()` will be true + # since the ctxc.canceller check will match us even though it + # wasn't from this ctx specifically! + elif ( + cs + and self._is_self_cancelled() + and not cs.cancel_called + ): + message: str = ( + 'Cancelling `ctx._scope` due to OoB self-cancel ?!\n' + '\n' + ) # from .devx import mk_pdb # mk_pdb().set_trace() + # TODO XXX, required to fix timeout failure in + # `test_cancelled_lockacquire_in_ipctx_not_unmaskeed` + # + + # XXX NOTE XXX, this is SUPER SUBTLE! + # we only want to cancel our embedded `._scope` + # if the ctx's current/using task is NOT blocked + # on `._rx_chan.receive()` and on some other + # `trio`-checkpoint since in the former case + # any `._remote_error` will be relayed through + # the rx-chan and appropriately raised by the owning + # `._task` directly. IF the owner task is however + # blocking elsewhere we need to interrupt it **now**. + if not self._is_blocked_on_rx_chan(): + cs.cancel() + else: + # rx_stats = self._rx_chan.statistics() + message: str = 'NOT cancelling `Context._scope` !\n\n' fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' if ( @@ -854,6 +962,7 @@ class Context: + cs_fmt ) + log.cancel( message + @@ -946,8 +1055,9 @@ class Context: ''' side: str = self.side - # XXX for debug via the `@.setter` - self.cancel_called = True + self._cancel_called = True + # ^ XXX for debug via the `@.setter` + # self.cancel_called = True header: str = ( f'Cancelling ctx from {side!r}-side\n' @@ -2011,6 +2121,9 @@ async def open_context_from_portal( f'|_{portal.actor}\n' ) + # ?TODO? could we move this to inside the `tn` block? + # -> would allow doing `ctx.parent_task = tn.parent_task` ? + # -> would allow a `if not ._scope: => raise RTE` ? ctx: Context = await portal.actor.start_remote_task( portal.channel, nsf=nsf, @@ -2037,6 +2150,7 @@ async def open_context_from_portal( scope_err: BaseException|None = None ctxc_from_child: ContextCancelled|None = None try: + # from .devx import pause async with ( collapse_eg(), trio.open_nursery() as tn, @@ -2059,6 +2173,10 @@ async def open_context_from_portal( # the dialog, the `Error` msg should be raised from the `msg` # handling block below. try: + log.runtime( + f'IPC ctx parent waiting on Started msg..\n' + f'ctx.cid: {ctx.cid!r}\n' + ) started_msg, first = await ctx._pld_rx.recv_msg( ipc=ctx, expect_msg=Started, @@ -2067,16 +2185,16 @@ async def open_context_from_portal( ) except trio.Cancelled as taskc: ctx_cs: trio.CancelScope = ctx._scope + log.cancel( + f'IPC ctx was cancelled during "child" task sync due to\n\n' + f'.cid: {ctx.cid!r}\n' + f'.maybe_error: {ctx.maybe_error!r}\n' + ) + # await pause(shield=True) + if not ctx_cs.cancel_called: raise - # from .devx import pause - # await pause(shield=True) - - log.cancel( - 'IPC ctx was cancelled during "child" task sync due to\n\n' - f'{ctx.maybe_error}\n' - ) # OW if the ctx's scope was cancelled manually, # likely the `Context` was cancelled via a call to # `._maybe_cancel_and_set_remote_error()` so ensure @@ -2199,7 +2317,7 @@ async def open_context_from_portal( # documenting it as a definittive example of # debugging the tractor-runtime itself using it's # own `.devx.` tooling! - # + # # await debug.pause() # CASE 2: context was cancelled by local task calling @@ -2272,13 +2390,16 @@ async def open_context_from_portal( match scope_err: case trio.Cancelled(): logmeth = log.cancel + cause: str = 'cancelled' # XXX explicitly report on any non-graceful-taskc cases case _: + cause: str = 'errored' logmeth = log.exception logmeth( - f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n' + f'ctx {ctx.side!r}-side {cause!r} with,\n' + f'{ctx.repr_outcome()!r}\n' ) if debug_mode(): @@ -2303,6 +2424,7 @@ async def open_context_from_portal( # told us it's cancelled ;p if ctxc_from_child is None: try: + # await pause(shield=True) await ctx.cancel() except ( trio.BrokenResourceError, @@ -2459,8 +2581,10 @@ async def open_context_from_portal( log.cancel( f'Context cancelled by local {ctx.side!r}-side task\n' f'c)>\n' - f' |_{ctx._task}\n\n' - f'{repr(scope_err)}\n' + f' |_{ctx.parent_task}\n' + f' .cid={ctx.cid!r}\n' + f'\n' + f'{scope_err!r}\n' ) # TODO: should we add a `._cancel_req_received`