Compare commits

..

1 Commits

Author SHA1 Message Date
Tyler Goodlet 0ab8a304a4 Add ctx-ep version of `trio`'s *finally-footgun* as suite 2025-06-13 22:39:03 -04:00
5 changed files with 157 additions and 358 deletions

View File

@ -33,6 +33,10 @@ including,
whether a `Context` task should raise `ContextCancelled` (ctx). whether a `Context` task should raise `ContextCancelled` (ctx).
''' '''
# from contextlib import (
# asynccontextmanager as acm,
# )
import pytest import pytest
import trio import trio
import tractor import tractor
@ -42,19 +46,24 @@ from tractor import ( # typing
Context, Context,
ContextCancelled, ContextCancelled,
) )
# from tractor._testing import (
# tractor_test,
# expect_ctxc,
# )
@tractor.context @tractor.context
async def sleep_n_chkpt_in_finally( async def sleep_n_chkpt_in_finally(
ctx: Context, ctx: Context,
sleep_n_raise: bool, sleep_n_raise: bool,
chld_raise_delay: float, chld_raise_delay: float,
chld_finally_delay: float, chld_finally_delay: float,
rent_cancels: bool, rent_cancels: bool,
rent_ctxc_delay: float, rent_ctxc_delay: float,
tn_cancels: bool,
gto_task: bool = False,
tn_cancels: bool = False,
expect_exc: str|None = None, expect_exc: str|None = None,
) -> None: ) -> None:
@ -74,10 +83,10 @@ async def sleep_n_chkpt_in_finally(
`trio.Cancelled` to signal cancellation on each side of an IPC `Context`, `trio.Cancelled` to signal cancellation on each side of an IPC `Context`,
the footgun issue can compound itself as demonstrated in this suite.. the footgun issue can compound itself as demonstrated in this suite..
Here are some edge cases codified with our WIP "sclang" syntax Here are some edge cases codified with "sclang" syntax.
(note the parent(rent)/child(chld) naming here is just Note that the parent/child relationship is just a pragmatic
pragmatism, generally these most of these cases can occurr choice, these cases can occurr regardless of the supervision
regardless of the distributed-task's supervision hiearchy), hiearchy,
- rent c)=> chld.raises-then-taskc-in-finally - rent c)=> chld.raises-then-taskc-in-finally
|_ chld's body raises an `exc: BaseException`. |_ chld's body raises an `exc: BaseException`.
@ -97,67 +106,79 @@ async def sleep_n_chkpt_in_finally(
) )
berr: BaseException|None = None berr: BaseException|None = None
try: async with (
if not sleep_n_raise: tractor.trionics.collapse_eg(
await trio.sleep_forever() # raise_from_src=True, # to show orig eg
elif sleep_n_raise: ),
trio.open_nursery() as tn
):
if gto_task:
tn.start_soon(trio.sleep_forever())
# XXX this sleep is less then the sleep the parent
# does before calling `ctx.cancel()`
await trio.sleep(chld_raise_delay)
# XXX this will be masked by a taskc raised in
# the `finally:` if this fn doesn't terminate
# before any ctxc-req arrives AND a checkpoint is hit
# in that `finally:`.
raise RuntimeError('my app krurshed..')
except BaseException as _berr:
berr = _berr
# TODO: it'd sure be nice to be able to inject our own
# `ContextCancelled` here instead of of `trio.Cancelled`
# so that our runtime can expect it and this "user code"
# would be able to tell the diff between a generic trio
# cancel and a tractor runtime-IPC cancel.
if expect_exc:
if not isinstance(
berr,
expect_exc,
):
raise ValueError(
f'Unexpected exc type ??\n'
f'{berr!r}\n'
f'\n'
f'Expected a {expect_exc!r}\n'
)
raise berr
# simulate what user code might try even though
# it's a known boo-boo..
finally:
# maybe wait for rent ctxc to arrive
with trio.CancelScope(shield=True):
await trio.sleep(chld_finally_delay)
# !!XXX this will raise `trio.Cancelled` which
# will mask the RTE from above!!!
#
# YES, it's the same case as our extant
# `test_trioisms::test_acm_embedded_nursery_propagates_enter_err`
try: try:
await trio.lowlevel.checkpoint() if not sleep_n_raise:
except trio.Cancelled as taskc: await trio.sleep_forever()
if (scope_err := taskc.__context__): elif sleep_n_raise:
print(
f'XXX MASKED REMOTE ERROR XXX\n'
f'ENDPOINT exception -> {scope_err!r}\n'
f'will be masked by -> {taskc!r}\n'
)
# await tractor.pause(shield=True)
raise taskc # XXX this sleep is less then the sleep the parent
# does before calling `ctx.cancel()`
await trio.sleep(chld_raise_delay)
# XXX this will be masked by a taskc raised in
# the `finally:` if this fn doesn't terminate
# before any ctxc-req arrives AND a checkpoint is hit
# in that `finally:`.
raise RuntimeError('my app krurshed..')
except BaseException as _berr:
berr = _berr
# TODO: it'd sure be nice to be able to inject our own
# `ContextCancelled` here instead of of `trio.Cancelled`
# so that our runtime can expect it and this "user code"
# would be able to tell the diff between a generic trio
# cancel and a tractor runtime-IPC cancel.
if expect_exc:
if not isinstance(
berr,
expect_exc,
):
raise ValueError(
f'Unexpected exc type ??\n'
f'{berr!r}\n'
f'\n'
f'Expected a {expect_exc!r}\n'
)
raise berr
# simulate what user code might try even though
# it's a known boo-boo..
finally:
# maybe wait for rent ctxc to arrive
with trio.CancelScope(shield=True):
await trio.sleep(chld_finally_delay)
if tn_cancels:
tn.cancel_scope.cancel()
# !!XXX this will raise `trio.Cancelled` which
# will mask the RTE from above!!!
#
# YES, it's the same case as our extant
# `test_trioisms::test_acm_embedded_nursery_propagates_enter_err`
try:
await trio.lowlevel.checkpoint()
except trio.Cancelled as taskc:
if (scope_err := taskc.__context__):
print(
f'XXX MASKED REMOTE ERROR XXX\n'
f'ENDPOINT exception -> {scope_err!r}\n'
f'will be masked by -> {taskc!r}\n'
)
# await tractor.pause(shield=True)
raise taskc
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -170,6 +191,7 @@ async def sleep_n_chkpt_in_finally(
expect_exc='Cancelled', expect_exc='Cancelled',
rent_cancels=True, rent_cancels=True,
rent_ctxc_delay=0.1, rent_ctxc_delay=0.1,
tn_cancels=True,
), ),
dict( dict(
sleep_n_raise='RuntimeError', sleep_n_raise='RuntimeError',
@ -178,11 +200,12 @@ async def sleep_n_chkpt_in_finally(
expect_exc='RuntimeError', expect_exc='RuntimeError',
rent_cancels=False, rent_cancels=False,
rent_ctxc_delay=0.1, rent_ctxc_delay=0.1,
tn_cancels=False,
), ),
], ],
ids=lambda item: f'chld_callspec={item!r}' ids=lambda item: f'chld_callspec={item!r}'
) )
def test_unmasked_remote_exc( def test_masked_taskc_with_taskc_still_is_contx(
debug_mode: bool, debug_mode: bool,
chld_callspec: dict, chld_callspec: dict,
tpt_proto: str, tpt_proto: str,

View File

@ -112,11 +112,55 @@ def test_acm_embedded_nursery_propagates_enter_err(
''' '''
import tractor import tractor
@acm
async def maybe_raise_from_masking_exc(
tn: trio.Nursery,
unmask_from: BaseException|None = trio.Cancelled
# TODO, maybe offer a collection?
# unmask_from: set[BaseException] = {
# trio.Cancelled,
# },
):
if not unmask_from:
yield
return
try:
yield
except* unmask_from as be_eg:
# TODO, if we offer `unmask_from: set`
# for masker_exc_type in unmask_from:
matches, rest = be_eg.split(unmask_from)
if not matches:
raise
for exc_match in be_eg.exceptions:
if (
(exc_ctx := exc_match.__context__)
and
type(exc_ctx) not in {
# trio.Cancelled, # always by default?
unmask_from,
}
):
exc_ctx.add_note(
f'\n'
f'WARNING: the above error was masked by a {unmask_from!r} !?!\n'
f'Are you always cancelling? Say from a `finally:` ?\n\n'
f'{tn!r}'
)
raise exc_ctx from exc_match
@acm @acm
async def wraps_tn_that_always_cancels(): async def wraps_tn_that_always_cancels():
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as tn,
tractor.trionics.maybe_raise_from_masking_exc( maybe_raise_from_masking_exc(
tn=tn, tn=tn,
unmask_from=( unmask_from=(
trio.Cancelled trio.Cancelled
@ -158,60 +202,3 @@ def test_acm_embedded_nursery_propagates_enter_err(
assert_eg, rest_eg = eg.split(AssertionError) assert_eg, rest_eg = eg.split(AssertionError)
assert len(assert_eg.exceptions) == 1 assert len(assert_eg.exceptions) == 1
def test_gatherctxs_with_memchan_breaks_multicancelled(
debug_mode: bool,
):
'''
Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
will break a strict-eg-tn's multi-cancelled absorption..
'''
from tractor import (
trionics,
)
@acm
async def open_memchan() -> trio.abc.ReceiveChannel:
task: trio.Task = trio.lowlevel.current_task()
print(
f'Opening {task!r}\n'
)
# 1 to force eager sending
send, recv = trio.open_memory_channel(16)
try:
async with send:
yield recv
finally:
print(
f'Closed {task!r}\n'
)
async def main():
async with (
# XXX should ensure ONLY the KBI
# is relayed upward
trionics.collapse_eg(),
trio.open_nursery(
# strict_exception_groups=False,
), # as tn,
trionics.gather_contexts([
open_memchan(),
open_memchan(),
]) as recv_chans,
):
assert len(recv_chans) == 2
await trio.sleep(1)
raise KeyboardInterrupt
# tn.cancel_scope.cancel()
with pytest.raises(KeyboardInterrupt):
trio.run(main)

View File

@ -37,7 +37,6 @@ import warnings
import trio import trio
from trio import ( from trio import (
Cancelled,
CancelScope, CancelScope,
Nursery, Nursery,
TaskStatus, TaskStatus,
@ -53,14 +52,10 @@ from ._exceptions import (
ModuleNotExposed, ModuleNotExposed,
MsgTypeError, MsgTypeError,
TransportClosed, TransportClosed,
is_multi_cancelled,
pack_error, pack_error,
unpack_error, unpack_error,
) )
from .trionics import (
collapse_eg,
is_multi_cancelled,
maybe_raise_from_masking_exc,
)
from .devx import ( from .devx import (
debug, debug,
add_div, add_div,
@ -621,40 +616,32 @@ async def _invoke(
# -> the below scope is never exposed to the # -> the below scope is never exposed to the
# `@context` marked RPC function. # `@context` marked RPC function.
# - `._portal` is never set. # - `._portal` is never set.
scope_err: BaseException|None = None
try: try:
# TODO: better `trionics` primitive/tooling usage here! tn: trio.Nursery
# -[ ] should would be nice to have our `TaskMngr`
# nursery here!
# -[ ] payload value checking like we do with
# `.started()` such that the debbuger can engage
# here in the child task instead of waiting for the
# parent to crash with it's own MTE..
#
tn: Nursery
rpc_ctx_cs: CancelScope rpc_ctx_cs: CancelScope
async with ( async with (
collapse_eg(), trio.open_nursery(
trio.open_nursery() as tn, strict_exception_groups=False,
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
) as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
ctx=ctx, ctx=ctx,
spec=ctx_meta.get('pld_spec'), spec=ctx_meta.get('pld_spec'),
dec_hook=ctx_meta.get('dec_hook'), dec_hook=ctx_meta.get('dec_hook'),
), ),
# XXX NOTE, this being the "most embedded"
# scope ensures unasking of the `await coro` below
# *should* never be interfered with!!
maybe_raise_from_masking_exc(
tn=tn,
unmask_from=Cancelled,
) as _mbme, # maybe boxed masked exc
): ):
ctx._scope_nursery = tn ctx._scope_nursery = tn
rpc_ctx_cs = ctx._scope = tn.cancel_scope rpc_ctx_cs = ctx._scope = tn.cancel_scope
task_status.started(ctx) task_status.started(ctx)
# invoke user endpoint fn. # TODO: better `trionics` tooling:
# -[ ] should would be nice to have our `TaskMngr`
# nursery here!
# -[ ] payload value checking like we do with
# `.started()` such that the debbuger can engage
# here in the child task instead of waiting for the
# parent to crash with it's own MTE..
res: Any|PayloadT = await coro res: Any|PayloadT = await coro
return_msg: Return|CancelAck = return_msg_type( return_msg: Return|CancelAck = return_msg_type(
cid=cid, cid=cid,
@ -757,48 +744,38 @@ async def _invoke(
BaseException, BaseException,
trio.Cancelled, trio.Cancelled,
) as _scope_err: ) as scope_error:
scope_err = _scope_err
if ( if (
isinstance(scope_err, RuntimeError) isinstance(scope_error, RuntimeError)
and and scope_error.args
scope_err.args and 'Cancel scope stack corrupted' in scope_error.args[0]
and
'Cancel scope stack corrupted' in scope_err.args[0]
): ):
log.exception('Cancel scope stack corrupted!?\n') log.exception('Cancel scope stack corrupted!?\n')
# debug.mk_pdb().set_trace() # debug.mk_pdb().set_trace()
# always set this (child) side's exception as the # always set this (child) side's exception as the
# local error on the context # local error on the context
ctx._local_error: BaseException = scope_err ctx._local_error: BaseException = scope_error
# ^-TODO-^ question, # ^-TODO-^ question,
# does this matter other then for # does this matter other then for
# consistentcy/testing? # consistentcy/testing?
# |_ no user code should be in this scope at this point # |_ no user code should be in this scope at this point
# AND we already set this in the block below? # AND we already set this in the block below?
# XXX if a remote error was set then likely the # if a remote error was set then likely the
# exc group was raised due to that, so # exception group was raised due to that, so
# and we instead raise that error immediately! # and we instead raise that error immediately!
maybe_re: ( ctx.maybe_raise()
ContextCancelled|RemoteActorError
) = ctx.maybe_raise()
if maybe_re:
log.cancel(
f'Suppressing remote-exc from peer,\n'
f'{maybe_re!r}\n'
)
# maybe TODO: pack in come kinda # maybe TODO: pack in come kinda
# `trio.Cancelled.__traceback__` here so they can be # `trio.Cancelled.__traceback__` here so they can be
# unwrapped and displayed on the caller side? no se.. # unwrapped and displayed on the caller side? no se..
raise scope_err raise
# `@context` entrypoint task bookeeping. # `@context` entrypoint task bookeeping.
# i.e. only pop the context tracking if used ;) # i.e. only pop the context tracking if used ;)
finally: finally:
assert chan.aid assert chan.uid
# don't pop the local context until we know the # don't pop the local context until we know the
# associated child isn't in debug any more # associated child isn't in debug any more
@ -825,9 +802,6 @@ async def _invoke(
descr_str += ( descr_str += (
f'\n{merr!r}\n' # needed? f'\n{merr!r}\n' # needed?
f'{tb_str}\n' f'{tb_str}\n'
f'\n'
f'scope_error:\n'
f'{scope_err!r}\n'
) )
else: else:
descr_str += f'\n{merr!r}\n' descr_str += f'\n{merr!r}\n'

View File

@ -34,6 +34,3 @@ from ._beg import (
maybe_collapse_eg as maybe_collapse_eg, maybe_collapse_eg as maybe_collapse_eg,
is_multi_cancelled as is_multi_cancelled, is_multi_cancelled as is_multi_cancelled,
) )
from ._taskc import (
maybe_raise_from_masking_exc as maybe_raise_from_masking_exc,
)

View File

@ -1,182 +0,0 @@
# tractor: structured concurrent "actors".
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
`trio.Task` cancellation helpers, extensions and "holsters".
'''
from __future__ import annotations
from contextlib import (
asynccontextmanager as acm,
)
from typing import TYPE_CHECKING
import trio
from tractor.log import get_logger
log = get_logger(__name__)
if TYPE_CHECKING:
from tractor.devx.debug import BoxedMaybeException
def find_masked_excs(
maybe_masker: BaseException,
unmask_from: set[BaseException],
) -> BaseException|None:
''''
Deliver any `maybe_masker.__context__` provided
it a declared masking exc-type entry in `unmask_from`.
'''
if (
type(maybe_masker) in unmask_from
and
(exc_ctx := maybe_masker.__context__)
# TODO? what about any cases where
# they could be the same type but not same instance?
# |_i.e. a cancel masking a cancel ??
# or (
# exc_ctx is not maybe_masker
# )
):
return exc_ctx
return None
@acm
async def maybe_raise_from_masking_exc(
tn: trio.Nursery|None = None,
unmask_from: (
BaseException|
tuple[BaseException]
) = (trio.Cancelled,),
raise_unmasked: bool = True,
extra_note: str = (
'This can occurr when,\n'
' - a `trio.Nursery` scope embeds a `finally:`-block '
'which executes a checkpoint!'
#
# ^TODO? other cases?
),
always_warn_on: tuple[BaseException] = (
trio.Cancelled,
),
# ^XXX, special case(s) where we warn-log bc likely
# there will be no operational diff since the exc
# is always expected to be consumed.
) -> BoxedMaybeException:
'''
Maybe un-mask and re-raise exception(s) suppressed by a known
error-used-as-signal type (cough namely `trio.Cancelled`).
Though this unmasker targets cancelleds, it can be used more
generally to capture and unwrap masked excs detected as
`.__context__` values which were suppressed by any error type
passed in `unmask_from`.
-------------
STILL-TODO ??
-------------
-[ ] support for egs which have multiple masked entries in
`maybe_eg.exceptions`, in which case we should unmask the
individual sub-excs but maintain the eg-parent's form right?
'''
from tractor.devx.debug import (
BoxedMaybeException,
pause,
)
boxed_maybe_exc = BoxedMaybeException(
raise_on_exit=raise_unmasked,
)
matching: list[BaseException]|None = None
maybe_eg: ExceptionGroup|None
maybe_eg: ExceptionGroup|None
if tn:
try: # handle egs
yield boxed_maybe_exc
return
except* unmask_from as _maybe_eg:
maybe_eg = _maybe_eg
matches: ExceptionGroup
matches, _ = maybe_eg.split(
unmask_from
)
if not matches:
raise
matching: list[BaseException] = matches.exceptions
else:
try: # handle non-egs
yield boxed_maybe_exc
return
except unmask_from as _maybe_exc:
maybe_exc = _maybe_exc
matching: list[BaseException] = [
maybe_exc
]
# XXX, only unmask-ed for debuggin!
# TODO, remove eventually..
except BaseException as _berr:
berr = _berr
await pause(shield=True)
raise berr
if matching is None:
raise
masked: list[tuple[BaseException, BaseException]] = []
for exc_match in matching:
if exc_ctx := find_masked_excs(
maybe_masker=exc_match,
unmask_from={unmask_from},
):
masked.append((exc_ctx, exc_match))
boxed_maybe_exc.value = exc_match
note: str = (
f'\n'
f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
)
if extra_note:
note += (
f'\n'
f'{extra_note}\n'
)
exc_ctx.add_note(note)
if type(exc_match) in always_warn_on:
log.warning(note)
# await tractor.pause(shield=True)
if raise_unmasked:
if len(masked) < 2:
raise exc_ctx from exc_match
else:
# ?TODO, see above but, possibly unmasking sub-exc
# entries if there are > 1
await pause(shield=True)
else:
raise