Compare commits
5 Commits
0ab8a304a4
...
4bc443ccae
Author | SHA1 | Date |
---|---|---|
|
4bc443ccae | |
|
bad42734db | |
|
86346c27e8 | |
|
0687f1aaa6 | |
|
a21d9b1e33 |
|
@ -33,10 +33,6 @@ including,
|
|||
whether a `Context` task should raise `ContextCancelled` (ctx).
|
||||
|
||||
'''
|
||||
# from contextlib import (
|
||||
# asynccontextmanager as acm,
|
||||
# )
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
import tractor
|
||||
|
@ -46,24 +42,19 @@ from tractor import ( # typing
|
|||
Context,
|
||||
ContextCancelled,
|
||||
)
|
||||
# from tractor._testing import (
|
||||
# tractor_test,
|
||||
# expect_ctxc,
|
||||
# )
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def sleep_n_chkpt_in_finally(
|
||||
ctx: Context,
|
||||
sleep_n_raise: bool,
|
||||
|
||||
chld_raise_delay: float,
|
||||
chld_finally_delay: float,
|
||||
|
||||
rent_cancels: bool,
|
||||
rent_ctxc_delay: float,
|
||||
tn_cancels: bool,
|
||||
gto_task: bool = False,
|
||||
|
||||
tn_cancels: bool = False,
|
||||
expect_exc: str|None = None,
|
||||
|
||||
) -> None:
|
||||
|
@ -83,10 +74,10 @@ async def sleep_n_chkpt_in_finally(
|
|||
`trio.Cancelled` to signal cancellation on each side of an IPC `Context`,
|
||||
the footgun issue can compound itself as demonstrated in this suite..
|
||||
|
||||
Here are some edge cases codified with "sclang" syntax.
|
||||
Note that the parent/child relationship is just a pragmatic
|
||||
choice, these cases can occurr regardless of the supervision
|
||||
hiearchy,
|
||||
Here are some edge cases codified with our WIP "sclang" syntax
|
||||
(note the parent(rent)/child(chld) naming here is just
|
||||
pragmatism, generally these most of these cases can occurr
|
||||
regardless of the distributed-task's supervision hiearchy),
|
||||
|
||||
- rent c)=> chld.raises-then-taskc-in-finally
|
||||
|_ chld's body raises an `exc: BaseException`.
|
||||
|
@ -106,15 +97,6 @@ async def sleep_n_chkpt_in_finally(
|
|||
)
|
||||
|
||||
berr: BaseException|None = None
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(
|
||||
# raise_from_src=True, # to show orig eg
|
||||
),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
if gto_task:
|
||||
tn.start_soon(trio.sleep_forever())
|
||||
|
||||
try:
|
||||
if not sleep_n_raise:
|
||||
await trio.sleep_forever()
|
||||
|
@ -159,9 +141,6 @@ async def sleep_n_chkpt_in_finally(
|
|||
with trio.CancelScope(shield=True):
|
||||
await trio.sleep(chld_finally_delay)
|
||||
|
||||
if tn_cancels:
|
||||
tn.cancel_scope.cancel()
|
||||
|
||||
# !!XXX this will raise `trio.Cancelled` which
|
||||
# will mask the RTE from above!!!
|
||||
#
|
||||
|
@ -191,7 +170,6 @@ async def sleep_n_chkpt_in_finally(
|
|||
expect_exc='Cancelled',
|
||||
rent_cancels=True,
|
||||
rent_ctxc_delay=0.1,
|
||||
tn_cancels=True,
|
||||
),
|
||||
dict(
|
||||
sleep_n_raise='RuntimeError',
|
||||
|
@ -200,12 +178,11 @@ async def sleep_n_chkpt_in_finally(
|
|||
expect_exc='RuntimeError',
|
||||
rent_cancels=False,
|
||||
rent_ctxc_delay=0.1,
|
||||
tn_cancels=False,
|
||||
),
|
||||
],
|
||||
ids=lambda item: f'chld_callspec={item!r}'
|
||||
)
|
||||
def test_masked_taskc_with_taskc_still_is_contx(
|
||||
def test_unmasked_remote_exc(
|
||||
debug_mode: bool,
|
||||
chld_callspec: dict,
|
||||
tpt_proto: str,
|
||||
|
|
|
@ -112,55 +112,11 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
'''
|
||||
import tractor
|
||||
|
||||
@acm
|
||||
async def maybe_raise_from_masking_exc(
|
||||
tn: trio.Nursery,
|
||||
unmask_from: BaseException|None = trio.Cancelled
|
||||
|
||||
# TODO, maybe offer a collection?
|
||||
# unmask_from: set[BaseException] = {
|
||||
# trio.Cancelled,
|
||||
# },
|
||||
):
|
||||
if not unmask_from:
|
||||
yield
|
||||
return
|
||||
|
||||
try:
|
||||
yield
|
||||
except* unmask_from as be_eg:
|
||||
|
||||
# TODO, if we offer `unmask_from: set`
|
||||
# for masker_exc_type in unmask_from:
|
||||
|
||||
matches, rest = be_eg.split(unmask_from)
|
||||
if not matches:
|
||||
raise
|
||||
|
||||
for exc_match in be_eg.exceptions:
|
||||
if (
|
||||
(exc_ctx := exc_match.__context__)
|
||||
and
|
||||
type(exc_ctx) not in {
|
||||
# trio.Cancelled, # always by default?
|
||||
unmask_from,
|
||||
}
|
||||
):
|
||||
exc_ctx.add_note(
|
||||
f'\n'
|
||||
f'WARNING: the above error was masked by a {unmask_from!r} !?!\n'
|
||||
f'Are you always cancelling? Say from a `finally:` ?\n\n'
|
||||
|
||||
f'{tn!r}'
|
||||
)
|
||||
raise exc_ctx from exc_match
|
||||
|
||||
|
||||
@acm
|
||||
async def wraps_tn_that_always_cancels():
|
||||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
maybe_raise_from_masking_exc(
|
||||
tractor.trionics.maybe_raise_from_masking_exc(
|
||||
tn=tn,
|
||||
unmask_from=(
|
||||
trio.Cancelled
|
||||
|
@ -202,3 +158,60 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
|||
assert_eg, rest_eg = eg.split(AssertionError)
|
||||
|
||||
assert len(assert_eg.exceptions) == 1
|
||||
|
||||
|
||||
|
||||
def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
|
||||
will break a strict-eg-tn's multi-cancelled absorption..
|
||||
|
||||
'''
|
||||
from tractor import (
|
||||
trionics,
|
||||
)
|
||||
|
||||
@acm
|
||||
async def open_memchan() -> trio.abc.ReceiveChannel:
|
||||
|
||||
task: trio.Task = trio.lowlevel.current_task()
|
||||
print(
|
||||
f'Opening {task!r}\n'
|
||||
)
|
||||
|
||||
# 1 to force eager sending
|
||||
send, recv = trio.open_memory_channel(16)
|
||||
|
||||
try:
|
||||
async with send:
|
||||
yield recv
|
||||
finally:
|
||||
print(
|
||||
f'Closed {task!r}\n'
|
||||
)
|
||||
|
||||
|
||||
async def main():
|
||||
async with (
|
||||
# XXX should ensure ONLY the KBI
|
||||
# is relayed upward
|
||||
trionics.collapse_eg(),
|
||||
trio.open_nursery(
|
||||
# strict_exception_groups=False,
|
||||
), # as tn,
|
||||
|
||||
trionics.gather_contexts([
|
||||
open_memchan(),
|
||||
open_memchan(),
|
||||
]) as recv_chans,
|
||||
):
|
||||
assert len(recv_chans) == 2
|
||||
|
||||
await trio.sleep(1)
|
||||
raise KeyboardInterrupt
|
||||
# tn.cancel_scope.cancel()
|
||||
|
||||
with pytest.raises(KeyboardInterrupt):
|
||||
trio.run(main)
|
||||
|
|
|
@ -37,6 +37,7 @@ import warnings
|
|||
|
||||
import trio
|
||||
from trio import (
|
||||
Cancelled,
|
||||
CancelScope,
|
||||
Nursery,
|
||||
TaskStatus,
|
||||
|
@ -52,10 +53,14 @@ from ._exceptions import (
|
|||
ModuleNotExposed,
|
||||
MsgTypeError,
|
||||
TransportClosed,
|
||||
is_multi_cancelled,
|
||||
pack_error,
|
||||
unpack_error,
|
||||
)
|
||||
from .trionics import (
|
||||
collapse_eg,
|
||||
is_multi_cancelled,
|
||||
maybe_raise_from_masking_exc,
|
||||
)
|
||||
from .devx import (
|
||||
debug,
|
||||
add_div,
|
||||
|
@ -616,32 +621,40 @@ async def _invoke(
|
|||
# -> the below scope is never exposed to the
|
||||
# `@context` marked RPC function.
|
||||
# - `._portal` is never set.
|
||||
scope_err: BaseException|None = None
|
||||
try:
|
||||
tn: trio.Nursery
|
||||
rpc_ctx_cs: CancelScope
|
||||
async with (
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
|
||||
|
||||
) as tn,
|
||||
msgops.maybe_limit_plds(
|
||||
ctx=ctx,
|
||||
spec=ctx_meta.get('pld_spec'),
|
||||
dec_hook=ctx_meta.get('dec_hook'),
|
||||
),
|
||||
):
|
||||
ctx._scope_nursery = tn
|
||||
rpc_ctx_cs = ctx._scope = tn.cancel_scope
|
||||
task_status.started(ctx)
|
||||
|
||||
# TODO: better `trionics` tooling:
|
||||
# TODO: better `trionics` primitive/tooling usage here!
|
||||
# -[ ] should would be nice to have our `TaskMngr`
|
||||
# nursery here!
|
||||
# -[ ] payload value checking like we do with
|
||||
# `.started()` such that the debbuger can engage
|
||||
# here in the child task instead of waiting for the
|
||||
# parent to crash with it's own MTE..
|
||||
#
|
||||
tn: Nursery
|
||||
rpc_ctx_cs: CancelScope
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
msgops.maybe_limit_plds(
|
||||
ctx=ctx,
|
||||
spec=ctx_meta.get('pld_spec'),
|
||||
dec_hook=ctx_meta.get('dec_hook'),
|
||||
),
|
||||
|
||||
# XXX NOTE, this being the "most embedded"
|
||||
# scope ensures unasking of the `await coro` below
|
||||
# *should* never be interfered with!!
|
||||
maybe_raise_from_masking_exc(
|
||||
tn=tn,
|
||||
unmask_from=Cancelled,
|
||||
) as _mbme, # maybe boxed masked exc
|
||||
):
|
||||
ctx._scope_nursery = tn
|
||||
rpc_ctx_cs = ctx._scope = tn.cancel_scope
|
||||
task_status.started(ctx)
|
||||
|
||||
# invoke user endpoint fn.
|
||||
res: Any|PayloadT = await coro
|
||||
return_msg: Return|CancelAck = return_msg_type(
|
||||
cid=cid,
|
||||
|
@ -744,38 +757,48 @@ async def _invoke(
|
|||
BaseException,
|
||||
trio.Cancelled,
|
||||
|
||||
) as scope_error:
|
||||
) as _scope_err:
|
||||
scope_err = _scope_err
|
||||
if (
|
||||
isinstance(scope_error, RuntimeError)
|
||||
and scope_error.args
|
||||
and 'Cancel scope stack corrupted' in scope_error.args[0]
|
||||
isinstance(scope_err, RuntimeError)
|
||||
and
|
||||
scope_err.args
|
||||
and
|
||||
'Cancel scope stack corrupted' in scope_err.args[0]
|
||||
):
|
||||
log.exception('Cancel scope stack corrupted!?\n')
|
||||
# debug.mk_pdb().set_trace()
|
||||
|
||||
# always set this (child) side's exception as the
|
||||
# local error on the context
|
||||
ctx._local_error: BaseException = scope_error
|
||||
ctx._local_error: BaseException = scope_err
|
||||
# ^-TODO-^ question,
|
||||
# does this matter other then for
|
||||
# consistentcy/testing?
|
||||
# |_ no user code should be in this scope at this point
|
||||
# AND we already set this in the block below?
|
||||
|
||||
# if a remote error was set then likely the
|
||||
# exception group was raised due to that, so
|
||||
# XXX if a remote error was set then likely the
|
||||
# exc group was raised due to that, so
|
||||
# and we instead raise that error immediately!
|
||||
ctx.maybe_raise()
|
||||
maybe_re: (
|
||||
ContextCancelled|RemoteActorError
|
||||
) = ctx.maybe_raise()
|
||||
if maybe_re:
|
||||
log.cancel(
|
||||
f'Suppressing remote-exc from peer,\n'
|
||||
f'{maybe_re!r}\n'
|
||||
)
|
||||
|
||||
# maybe TODO: pack in come kinda
|
||||
# `trio.Cancelled.__traceback__` here so they can be
|
||||
# unwrapped and displayed on the caller side? no se..
|
||||
raise
|
||||
raise scope_err
|
||||
|
||||
# `@context` entrypoint task bookeeping.
|
||||
# i.e. only pop the context tracking if used ;)
|
||||
finally:
|
||||
assert chan.uid
|
||||
assert chan.aid
|
||||
|
||||
# don't pop the local context until we know the
|
||||
# associated child isn't in debug any more
|
||||
|
@ -802,6 +825,9 @@ async def _invoke(
|
|||
descr_str += (
|
||||
f'\n{merr!r}\n' # needed?
|
||||
f'{tb_str}\n'
|
||||
f'\n'
|
||||
f'scope_error:\n'
|
||||
f'{scope_err!r}\n'
|
||||
)
|
||||
else:
|
||||
descr_str += f'\n{merr!r}\n'
|
||||
|
|
|
@ -34,3 +34,6 @@ from ._beg import (
|
|||
maybe_collapse_eg as maybe_collapse_eg,
|
||||
is_multi_cancelled as is_multi_cancelled,
|
||||
)
|
||||
from ._taskc import (
|
||||
maybe_raise_from_masking_exc as maybe_raise_from_masking_exc,
|
||||
)
|
||||
|
|
|
@ -0,0 +1,182 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
`trio.Task` cancellation helpers, extensions and "holsters".
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
import trio
|
||||
from tractor.log import get_logger
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor.devx.debug import BoxedMaybeException
|
||||
|
||||
|
||||
def find_masked_excs(
|
||||
maybe_masker: BaseException,
|
||||
unmask_from: set[BaseException],
|
||||
) -> BaseException|None:
|
||||
''''
|
||||
Deliver any `maybe_masker.__context__` provided
|
||||
it a declared masking exc-type entry in `unmask_from`.
|
||||
|
||||
'''
|
||||
if (
|
||||
type(maybe_masker) in unmask_from
|
||||
and
|
||||
(exc_ctx := maybe_masker.__context__)
|
||||
|
||||
# TODO? what about any cases where
|
||||
# they could be the same type but not same instance?
|
||||
# |_i.e. a cancel masking a cancel ??
|
||||
# or (
|
||||
# exc_ctx is not maybe_masker
|
||||
# )
|
||||
):
|
||||
return exc_ctx
|
||||
|
||||
return None
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_raise_from_masking_exc(
|
||||
tn: trio.Nursery|None = None,
|
||||
unmask_from: (
|
||||
BaseException|
|
||||
tuple[BaseException]
|
||||
) = (trio.Cancelled,),
|
||||
|
||||
raise_unmasked: bool = True,
|
||||
extra_note: str = (
|
||||
'This can occurr when,\n'
|
||||
' - a `trio.Nursery` scope embeds a `finally:`-block '
|
||||
'which executes a checkpoint!'
|
||||
#
|
||||
# ^TODO? other cases?
|
||||
),
|
||||
|
||||
always_warn_on: tuple[BaseException] = (
|
||||
trio.Cancelled,
|
||||
),
|
||||
# ^XXX, special case(s) where we warn-log bc likely
|
||||
# there will be no operational diff since the exc
|
||||
# is always expected to be consumed.
|
||||
) -> BoxedMaybeException:
|
||||
'''
|
||||
Maybe un-mask and re-raise exception(s) suppressed by a known
|
||||
error-used-as-signal type (cough namely `trio.Cancelled`).
|
||||
|
||||
Though this unmasker targets cancelleds, it can be used more
|
||||
generally to capture and unwrap masked excs detected as
|
||||
`.__context__` values which were suppressed by any error type
|
||||
passed in `unmask_from`.
|
||||
|
||||
-------------
|
||||
STILL-TODO ??
|
||||
-------------
|
||||
-[ ] support for egs which have multiple masked entries in
|
||||
`maybe_eg.exceptions`, in which case we should unmask the
|
||||
individual sub-excs but maintain the eg-parent's form right?
|
||||
|
||||
'''
|
||||
from tractor.devx.debug import (
|
||||
BoxedMaybeException,
|
||||
pause,
|
||||
)
|
||||
boxed_maybe_exc = BoxedMaybeException(
|
||||
raise_on_exit=raise_unmasked,
|
||||
)
|
||||
matching: list[BaseException]|None = None
|
||||
maybe_eg: ExceptionGroup|None
|
||||
maybe_eg: ExceptionGroup|None
|
||||
|
||||
if tn:
|
||||
try: # handle egs
|
||||
yield boxed_maybe_exc
|
||||
return
|
||||
except* unmask_from as _maybe_eg:
|
||||
maybe_eg = _maybe_eg
|
||||
matches: ExceptionGroup
|
||||
matches, _ = maybe_eg.split(
|
||||
unmask_from
|
||||
)
|
||||
if not matches:
|
||||
raise
|
||||
|
||||
matching: list[BaseException] = matches.exceptions
|
||||
else:
|
||||
try: # handle non-egs
|
||||
yield boxed_maybe_exc
|
||||
return
|
||||
except unmask_from as _maybe_exc:
|
||||
maybe_exc = _maybe_exc
|
||||
matching: list[BaseException] = [
|
||||
maybe_exc
|
||||
]
|
||||
|
||||
# XXX, only unmask-ed for debuggin!
|
||||
# TODO, remove eventually..
|
||||
except BaseException as _berr:
|
||||
berr = _berr
|
||||
await pause(shield=True)
|
||||
raise berr
|
||||
|
||||
if matching is None:
|
||||
raise
|
||||
|
||||
masked: list[tuple[BaseException, BaseException]] = []
|
||||
for exc_match in matching:
|
||||
|
||||
if exc_ctx := find_masked_excs(
|
||||
maybe_masker=exc_match,
|
||||
unmask_from={unmask_from},
|
||||
):
|
||||
masked.append((exc_ctx, exc_match))
|
||||
boxed_maybe_exc.value = exc_match
|
||||
note: str = (
|
||||
f'\n'
|
||||
f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
|
||||
)
|
||||
if extra_note:
|
||||
note += (
|
||||
f'\n'
|
||||
f'{extra_note}\n'
|
||||
)
|
||||
exc_ctx.add_note(note)
|
||||
|
||||
if type(exc_match) in always_warn_on:
|
||||
log.warning(note)
|
||||
|
||||
# await tractor.pause(shield=True)
|
||||
if raise_unmasked:
|
||||
|
||||
if len(masked) < 2:
|
||||
raise exc_ctx from exc_match
|
||||
else:
|
||||
# ?TODO, see above but, possibly unmasking sub-exc
|
||||
# entries if there are > 1
|
||||
await pause(shield=True)
|
||||
else:
|
||||
raise
|
Loading…
Reference in New Issue