forked from goodboy/tractor
1
0
Fork 0

Solve another OoB cancellation case, the bg task one

Such that we are able to (finally) detect when we should
`Context._scope.cancel()` specifically when the `.parent_task` is
**not** blocking on receiving from the underlying `._rx_chan`, since if
the task is blocking on `.receive()` it will call `.cancel()`
implicitly.

This is a lot to explain with very little code actually needed for the
implementation (are we like `trio` yet anyone?? XD) but the main jist is
that `Context._maybe_cancel_and_set_remote_error()` needed the
additional case of calling `._scope.cancel()` whenever we know that
a remote-error/ctxc won't be immediately handled, bc user code is doing
non `Context`-API things, and result in a similar outcome as if that
task was waiting on `Context.wait_for_result()` or `.__aexite__()`.

Impl details,
- add a new `._is_blocked_on_rx_chan()` method which predicates whether
  the (new) `.parent_task` is blocking on `._rx_chan.receive()`.
  * see various stipulations about the current impl and how we might
    need to adjust for the future given `trio`'s commitment to the
    `Task.custom_sleep_data` attr..
- add `.parent_task`, a pub wrapper for `._task`.
- check for `not ._is_blocked_on_rx_chan()` before manually cancelling
  the local `.parent_task`
- minimize the surrounding branch case expressions.

Other,
- tweak a couple logs.
- add a new `.cancel()` pre-started msg.
- mask the `.cancel_called` setter, it's only (been) used for tracing.
- todos around maybe moving the `._nursery` allocation "around" the
  `.start_remote_task()` call and various subsequent tweaks therein.
Tyler Goodlet 2025-09-10 21:09:40 -04:00
parent fc130d06b8
commit 0c6d512ba4
1 changed files with 158 additions and 34 deletions

View File

@ -442,25 +442,25 @@ class Context:
''' '''
Records whether cancellation has been requested for this context Records whether cancellation has been requested for this context
by a call to `.cancel()` either due to, by a call to `.cancel()` either due to,
- either an explicit call by some local task, - an explicit call by some local task,
- or an implicit call due to an error caught inside - or an implicit call due to an error caught inside
the ``Portal.open_context()`` block. the `Portal.open_context()` block.
''' '''
return self._cancel_called return self._cancel_called
@cancel_called.setter # XXX, to debug who frickin sets it..
def cancel_called(self, val: bool) -> None: # @cancel_called.setter
''' # def cancel_called(self, val: bool) -> None:
Set the self-cancelled request `bool` value. # '''
# Set the self-cancelled request `bool` value.
''' # '''
# to debug who frickin sets it..
# if val: # if val:
# from .devx import pause_from_sync # from .devx import pause_from_sync
# pause_from_sync() # pause_from_sync()
self._cancel_called = val # self._cancel_called = val
@property @property
def canceller(self) -> tuple[str, str]|None: def canceller(self) -> tuple[str, str]|None:
@ -635,6 +635,71 @@ class Context:
''' '''
await self.chan.send(Stop(cid=self.cid)) await self.chan.send(Stop(cid=self.cid))
@property
def parent_task(self) -> trio.Task:
'''
This IPC context's "owning task" which is a `trio.Task`
on one of the "sides" of the IPC.
Note that the "parent_" prefix here refers to the local
`trio` task tree using the same interface as
`trio.Nursery.parent_task` whereas for IPC contexts,
a different cross-actor task hierarchy exists:
- a "parent"-side which originally entered
`Portal.open_context()`,
- the "child"-side which was spawned and scheduled to invoke
a function decorated with `@tractor.context`.
This task is thus a handle to mem-domain-distinct/per-process
`Nursery.parent_task` depending on in which of the above
"sides" this context exists.
'''
return self._task
def _is_blocked_on_rx_chan(self) -> bool:
'''
Predicate to indicate whether the owner `._task: trio.Task` is
currently blocked (by `.receive()`-ing) on its underlying RPC
feeder `._rx_chan`.
This knowledge is highly useful when handling so called
"out-of-band" (OoB) cancellation conditions where a peer
actor's task transmitted some remote error/cancel-msg and we
must know whether to signal-via-cancel currently executing
"user-code" (user defined code embedded in `ctx._scope`) or
simply to forward the IPC-msg-as-error **without calling**
`._scope.cancel()`.
In the latter case it is presumed that if the owner task is
blocking for the next IPC msg, it will eventually receive,
process and raise the equivalent local error **without**
requiring `._scope.cancel()` to be explicitly called by the
*delivering OoB RPC-task* (via `_deliver_msg()`).
'''
# NOTE, see the mem-chan meth-impls for *why* this
# logic works,
# `trio._channel.MemoryReceiveChannel.receive[_nowait]()`
#
# XXX realize that this is NOT an
# official/will-be-loudly-deprecated API:
# - https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.Task.custom_sleep_data
# |_https://trio.readthedocs.io/en/stable/reference-lowlevel.html#trio.lowlevel.wait_task_rescheduled
#
# orig repo intro in the mem-chan change over patch:
# - https://github.com/python-trio/trio/pull/586#issuecomment-414039117
# |_https://github.com/python-trio/trio/pull/616
# |_https://github.com/njsmith/trio/commit/98c38cef6f62e731bf8c7190e8756976bface8f0
#
return (
self._task.custom_sleep_data
is
self._rx_chan
)
def _maybe_cancel_and_set_remote_error( def _maybe_cancel_and_set_remote_error(
self, self,
error: BaseException, error: BaseException,
@ -787,13 +852,27 @@ class Context:
if self._canceller is None: if self._canceller is None:
log.error('Ctx has no canceller set!?') log.error('Ctx has no canceller set!?')
cs: trio.CancelScope = self._scope
# ?TODO? see comment @ .start_remote_task()`
#
# if not cs:
# from .devx import mk_pdb
# mk_pdb().set_trace()
# raise RuntimeError(
# f'IPC ctx was not be opened prior to remote error delivery !?\n'
# f'{self}\n'
# f'\n'
# f'`Portal.open_context()` must be entered (somewhere) beforehand!\n'
# )
# Cancel the local `._scope`, catch that # Cancel the local `._scope`, catch that
# `._scope.cancelled_caught` and re-raise any remote error # `._scope.cancelled_caught` and re-raise any remote error
# once exiting (or manually calling `.wait_for_result()`) the # once exiting (or manually calling `.wait_for_result()`) the
# `.open_context()` block. # `.open_context()` block.
cs: trio.CancelScope = self._scope
if ( if (
cs cs
and not cs.cancel_called
# XXX this is an expected cancel request response # XXX this is an expected cancel request response
# message and we **don't need to raise it** in the # message and we **don't need to raise it** in the
@ -802,8 +881,7 @@ class Context:
# if `._cancel_called` then `.cancel_acked and .cancel_called` # if `._cancel_called` then `.cancel_acked and .cancel_called`
# always should be set. # always should be set.
and not self._is_self_cancelled() and not self._is_self_cancelled()
and not cs.cancel_called # and not cs.cancelled_caught
and not cs.cancelled_caught
): ):
if ( if (
msgerr msgerr
@ -814,7 +892,7 @@ class Context:
not self._cancel_on_msgerr not self._cancel_on_msgerr
): ):
message: str = ( message: str = (
'NOT Cancelling `Context._scope` since,\n' f'NOT Cancelling `Context._scope` since,\n'
f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n' f'Context._cancel_on_msgerr = {self._cancel_on_msgerr}\n\n'
f'AND we got a msg-type-error!\n' f'AND we got a msg-type-error!\n'
f'{error}\n' f'{error}\n'
@ -824,13 +902,43 @@ class Context:
# `trio.Cancelled` subtype here ;) # `trio.Cancelled` subtype here ;)
# https://github.com/goodboy/tractor/issues/368 # https://github.com/goodboy/tractor/issues/368
message: str = 'Cancelling `Context._scope` !\n\n' message: str = 'Cancelling `Context._scope` !\n\n'
# from .devx import pause_from_sync cs.cancel()
# pause_from_sync()
self._scope.cancel() # TODO, explicit condition for OoB (self-)cancellation?
else: # - we called `Portal.cancel_actor()` from this actor
message: str = 'NOT cancelling `Context._scope` !\n\n' # and the peer ctx task delivered ctxc due to it.
# - currently `self._is_self_cancelled()` will be true
# since the ctxc.canceller check will match us even though it
# wasn't from this ctx specifically!
elif (
cs
and self._is_self_cancelled()
and not cs.cancel_called
):
message: str = (
'Cancelling `ctx._scope` due to OoB self-cancel ?!\n'
'\n'
)
# from .devx import mk_pdb # from .devx import mk_pdb
# mk_pdb().set_trace() # mk_pdb().set_trace()
# TODO XXX, required to fix timeout failure in
# `test_cancelled_lockacquire_in_ipctx_not_unmaskeed`
#
# XXX NOTE XXX, this is SUPER SUBTLE!
# we only want to cancel our embedded `._scope`
# if the ctx's current/using task is NOT blocked
# on `._rx_chan.receive()` and on some other
# `trio`-checkpoint since in the former case
# any `._remote_error` will be relayed through
# the rx-chan and appropriately raised by the owning
# `._task` directly. IF the owner task is however
# blocking elsewhere we need to interrupt it **now**.
if not self._is_blocked_on_rx_chan():
cs.cancel()
else:
# rx_stats = self._rx_chan.statistics()
message: str = 'NOT cancelling `Context._scope` !\n\n'
fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n' fmt_str: str = 'No `self._scope: CancelScope` was set/used ?\n'
if ( if (
@ -854,6 +962,7 @@ class Context:
+ +
cs_fmt cs_fmt
) )
log.cancel( log.cancel(
message message
+ +
@ -946,8 +1055,9 @@ class Context:
''' '''
side: str = self.side side: str = self.side
# XXX for debug via the `@.setter` self._cancel_called = True
self.cancel_called = True # ^ XXX for debug via the `@.setter`
# self.cancel_called = True
header: str = ( header: str = (
f'Cancelling ctx from {side!r}-side\n' f'Cancelling ctx from {side!r}-side\n'
@ -2011,6 +2121,9 @@ async def open_context_from_portal(
f'|_{portal.actor}\n' f'|_{portal.actor}\n'
) )
# ?TODO? could we move this to inside the `tn` block?
# -> would allow doing `ctx.parent_task = tn.parent_task` ?
# -> would allow a `if not ._scope: => raise RTE` ?
ctx: Context = await portal.actor.start_remote_task( ctx: Context = await portal.actor.start_remote_task(
portal.channel, portal.channel,
nsf=nsf, nsf=nsf,
@ -2037,6 +2150,7 @@ async def open_context_from_portal(
scope_err: BaseException|None = None scope_err: BaseException|None = None
ctxc_from_child: ContextCancelled|None = None ctxc_from_child: ContextCancelled|None = None
try: try:
# from .devx import pause
async with ( async with (
collapse_eg(), collapse_eg(),
trio.open_nursery() as tn, trio.open_nursery() as tn,
@ -2059,6 +2173,10 @@ async def open_context_from_portal(
# the dialog, the `Error` msg should be raised from the `msg` # the dialog, the `Error` msg should be raised from the `msg`
# handling block below. # handling block below.
try: try:
log.runtime(
f'IPC ctx parent waiting on Started msg..\n'
f'ctx.cid: {ctx.cid!r}\n'
)
started_msg, first = await ctx._pld_rx.recv_msg( started_msg, first = await ctx._pld_rx.recv_msg(
ipc=ctx, ipc=ctx,
expect_msg=Started, expect_msg=Started,
@ -2067,16 +2185,16 @@ async def open_context_from_portal(
) )
except trio.Cancelled as taskc: except trio.Cancelled as taskc:
ctx_cs: trio.CancelScope = ctx._scope ctx_cs: trio.CancelScope = ctx._scope
log.cancel(
f'IPC ctx was cancelled during "child" task sync due to\n\n'
f'.cid: {ctx.cid!r}\n'
f'.maybe_error: {ctx.maybe_error!r}\n'
)
# await pause(shield=True)
if not ctx_cs.cancel_called: if not ctx_cs.cancel_called:
raise raise
# from .devx import pause
# await pause(shield=True)
log.cancel(
'IPC ctx was cancelled during "child" task sync due to\n\n'
f'{ctx.maybe_error}\n'
)
# OW if the ctx's scope was cancelled manually, # OW if the ctx's scope was cancelled manually,
# likely the `Context` was cancelled via a call to # likely the `Context` was cancelled via a call to
# `._maybe_cancel_and_set_remote_error()` so ensure # `._maybe_cancel_and_set_remote_error()` so ensure
@ -2272,13 +2390,16 @@ async def open_context_from_portal(
match scope_err: match scope_err:
case trio.Cancelled(): case trio.Cancelled():
logmeth = log.cancel logmeth = log.cancel
cause: str = 'cancelled'
# XXX explicitly report on any non-graceful-taskc cases # XXX explicitly report on any non-graceful-taskc cases
case _: case _:
cause: str = 'errored'
logmeth = log.exception logmeth = log.exception
logmeth( logmeth(
f'ctx {ctx.side!r}-side exited with {ctx.repr_outcome()!r}\n' f'ctx {ctx.side!r}-side {cause!r} with,\n'
f'{ctx.repr_outcome()!r}\n'
) )
if debug_mode(): if debug_mode():
@ -2303,6 +2424,7 @@ async def open_context_from_portal(
# told us it's cancelled ;p # told us it's cancelled ;p
if ctxc_from_child is None: if ctxc_from_child is None:
try: try:
# await pause(shield=True)
await ctx.cancel() await ctx.cancel()
except ( except (
trio.BrokenResourceError, trio.BrokenResourceError,
@ -2459,8 +2581,10 @@ async def open_context_from_portal(
log.cancel( log.cancel(
f'Context cancelled by local {ctx.side!r}-side task\n' f'Context cancelled by local {ctx.side!r}-side task\n'
f'c)>\n' f'c)>\n'
f' |_{ctx._task}\n\n' f' |_{ctx.parent_task}\n'
f'{repr(scope_err)}\n' f' .cid={ctx.cid!r}\n'
f'\n'
f'{scope_err!r}\n'
) )
# TODO: should we add a `._cancel_req_received` # TODO: should we add a `._cancel_req_received`