Compare commits
16 Commits
main
...
cancelled_
Author | SHA1 | Date |
---|---|---|
|
07781e38cd | |
|
9c6b90ef04 | |
|
542d4c7840 | |
|
9aebe7d8f9 | |
|
04c3d5e239 | |
|
759174729c | |
|
e9f3689191 | |
|
93aa39db07 | |
|
5ab642bdf0 | |
|
ed18ecd064 | |
|
cec0282953 | |
|
25c5847f2e | |
|
ba793fadd9 | |
|
d17864a432 | |
|
6c361a9564 | |
|
34ca7429c7 |
|
@ -0,0 +1,85 @@
|
||||||
|
from contextlib import (
|
||||||
|
asynccontextmanager as acm,
|
||||||
|
)
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
|
import tractor
|
||||||
|
import trio
|
||||||
|
|
||||||
|
|
||||||
|
log = tractor.log.get_logger(
|
||||||
|
name=__name__
|
||||||
|
)
|
||||||
|
|
||||||
|
_lock: trio.Lock|None = None
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def acquire_singleton_lock(
|
||||||
|
) -> None:
|
||||||
|
global _lock
|
||||||
|
if _lock is None:
|
||||||
|
log.info('Allocating LOCK')
|
||||||
|
_lock = trio.Lock()
|
||||||
|
|
||||||
|
log.info('TRYING TO LOCK ACQUIRE')
|
||||||
|
async with _lock:
|
||||||
|
log.info('ACQUIRED')
|
||||||
|
yield _lock
|
||||||
|
|
||||||
|
log.info('RELEASED')
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
async def hold_lock_forever(
|
||||||
|
task_status=trio.TASK_STATUS_IGNORED
|
||||||
|
):
|
||||||
|
async with (
|
||||||
|
tractor.trionics.maybe_raise_from_masking_exc(),
|
||||||
|
acquire_singleton_lock() as lock,
|
||||||
|
):
|
||||||
|
task_status.started(lock)
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
|
||||||
|
async def main(
|
||||||
|
ignore_special_cases: bool,
|
||||||
|
loglevel: str = 'info',
|
||||||
|
debug_mode: bool = True,
|
||||||
|
):
|
||||||
|
async with (
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
|
||||||
|
# tractor.trionics.maybe_raise_from_masking_exc()
|
||||||
|
# ^^^ XXX NOTE, interestingly putting the unmasker
|
||||||
|
# here does not exhibit the same behaviour ??
|
||||||
|
):
|
||||||
|
if not ignore_special_cases:
|
||||||
|
from tractor.trionics import _taskc
|
||||||
|
_taskc._mask_cases.clear()
|
||||||
|
|
||||||
|
_lock = await tn.start(
|
||||||
|
hold_lock_forever,
|
||||||
|
)
|
||||||
|
with trio.move_on_after(0.2):
|
||||||
|
await tn.start(
|
||||||
|
hold_lock_forever,
|
||||||
|
)
|
||||||
|
|
||||||
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
# XXX, manual test as script
|
||||||
|
if __name__ == '__main__':
|
||||||
|
tractor.log.get_console_log(level='info')
|
||||||
|
for case in [True, False]:
|
||||||
|
log.info(
|
||||||
|
f'\n'
|
||||||
|
f'------ RUNNING SCRIPT TRIAL ------\n'
|
||||||
|
f'ignore_special_cases: {case!r}\n'
|
||||||
|
)
|
||||||
|
trio.run(partial(
|
||||||
|
main,
|
||||||
|
ignore_special_cases=case,
|
||||||
|
loglevel='info',
|
||||||
|
))
|
|
@ -0,0 +1,195 @@
|
||||||
|
from contextlib import (
|
||||||
|
contextmanager as cm,
|
||||||
|
# TODO, any diff in async case(s)??
|
||||||
|
# asynccontextmanager as acm,
|
||||||
|
)
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
|
import tractor
|
||||||
|
import trio
|
||||||
|
|
||||||
|
|
||||||
|
log = tractor.log.get_logger(
|
||||||
|
name=__name__
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@cm
|
||||||
|
def teardown_on_exc(
|
||||||
|
raise_from_handler: bool = False,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
You could also have a teardown handler which catches any exc and
|
||||||
|
does some required teardown. In this case the problem is
|
||||||
|
compounded UNLESS you ensure the handler's scope is OUTSIDE the
|
||||||
|
`ux.aclose()`.. that is in the caller's enclosing scope.
|
||||||
|
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
except BaseException as _berr:
|
||||||
|
berr = _berr
|
||||||
|
log.exception(
|
||||||
|
f'Handling termination teardown in child due to,\n'
|
||||||
|
f'{berr!r}\n'
|
||||||
|
)
|
||||||
|
if raise_from_handler:
|
||||||
|
# XXX teardown ops XXX
|
||||||
|
# on termination these steps say need to be run to
|
||||||
|
# ensure wider system consistency (like the state of
|
||||||
|
# remote connections/services).
|
||||||
|
#
|
||||||
|
# HOWEVER, any bug in this teardown code is also
|
||||||
|
# masked by the `tx.aclose()`!
|
||||||
|
# this is also true if `_tn.cancel_scope` is
|
||||||
|
# `.cancel_called` by the parent in a graceful
|
||||||
|
# request case..
|
||||||
|
|
||||||
|
# simulate a bug in teardown handler.
|
||||||
|
raise RuntimeError(
|
||||||
|
'woopsie teardown bug!'
|
||||||
|
)
|
||||||
|
|
||||||
|
raise # no teardown bug.
|
||||||
|
|
||||||
|
|
||||||
|
async def finite_stream_to_rent(
|
||||||
|
tx: trio.abc.SendChannel,
|
||||||
|
child_errors_mid_stream: bool,
|
||||||
|
raise_unmasked: bool,
|
||||||
|
|
||||||
|
task_status: trio.TaskStatus[
|
||||||
|
trio.CancelScope,
|
||||||
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
|
):
|
||||||
|
async with (
|
||||||
|
# XXX without this unmasker the mid-streaming RTE is never
|
||||||
|
# reported since it is masked by the `tx.aclose()`
|
||||||
|
# call which in turn raises `Cancelled`!
|
||||||
|
#
|
||||||
|
# NOTE, this is WITHOUT doing any exception handling
|
||||||
|
# inside the child task!
|
||||||
|
#
|
||||||
|
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
||||||
|
tractor.trionics.maybe_raise_from_masking_exc(
|
||||||
|
raise_unmasked=raise_unmasked,
|
||||||
|
),
|
||||||
|
|
||||||
|
tx as tx, # .aclose() is the guilty masker chkpt!
|
||||||
|
|
||||||
|
# XXX, this ONLY matters in the
|
||||||
|
# `child_errors_mid_stream=False` case oddly!?
|
||||||
|
# THAT IS, if no tn is opened in that case then the
|
||||||
|
# test will not fail; it raises the RTE correctly?
|
||||||
|
#
|
||||||
|
# -> so it seems this new scope somehow affects the form of
|
||||||
|
# eventual in the parent EG?
|
||||||
|
tractor.trionics.maybe_open_nursery(
|
||||||
|
nursery=(
|
||||||
|
None
|
||||||
|
if not child_errors_mid_stream
|
||||||
|
else True
|
||||||
|
),
|
||||||
|
) as _tn,
|
||||||
|
):
|
||||||
|
# pass our scope back to parent for supervision\
|
||||||
|
# control.
|
||||||
|
cs: trio.CancelScope|None = (
|
||||||
|
None
|
||||||
|
if _tn is True
|
||||||
|
else _tn.cancel_scope
|
||||||
|
)
|
||||||
|
task_status.started(cs)
|
||||||
|
|
||||||
|
with teardown_on_exc(
|
||||||
|
raise_from_handler=not child_errors_mid_stream,
|
||||||
|
):
|
||||||
|
for i in range(100):
|
||||||
|
log.debug(
|
||||||
|
f'Child tx {i!r}\n'
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
child_errors_mid_stream
|
||||||
|
and
|
||||||
|
i == 66
|
||||||
|
):
|
||||||
|
# oh wait but WOOPS there's a bug
|
||||||
|
# in that teardown code!?
|
||||||
|
raise RuntimeError(
|
||||||
|
'woopsie, a mid-streaming bug!?'
|
||||||
|
)
|
||||||
|
|
||||||
|
await tx.send(i)
|
||||||
|
|
||||||
|
|
||||||
|
async def main(
|
||||||
|
# TODO! toggle this for the 2 cases!
|
||||||
|
# 1. child errors mid-stream while parent is also requesting
|
||||||
|
# (graceful) cancel of that child streamer.
|
||||||
|
#
|
||||||
|
# 2. child contains a teardown handler which contains a
|
||||||
|
# bug and raises.
|
||||||
|
#
|
||||||
|
child_errors_mid_stream: bool,
|
||||||
|
|
||||||
|
raise_unmasked: bool = False,
|
||||||
|
loglevel: str = 'info',
|
||||||
|
):
|
||||||
|
tractor.log.get_console_log(level=loglevel)
|
||||||
|
|
||||||
|
# the `.aclose()` being checkpoints on these
|
||||||
|
# is the source of the problem..
|
||||||
|
tx, rx = trio.open_memory_channel(1)
|
||||||
|
|
||||||
|
async with (
|
||||||
|
tractor.trionics.collapse_eg(),
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
rx as rx,
|
||||||
|
):
|
||||||
|
_child_cs = await tn.start(
|
||||||
|
partial(
|
||||||
|
finite_stream_to_rent,
|
||||||
|
child_errors_mid_stream=child_errors_mid_stream,
|
||||||
|
raise_unmasked=raise_unmasked,
|
||||||
|
tx=tx,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
async for msg in rx:
|
||||||
|
log.debug(
|
||||||
|
f'Rent rx {msg!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# simulate some external cancellation
|
||||||
|
# request **JUST BEFORE** the child errors.
|
||||||
|
if msg == 65:
|
||||||
|
log.cancel(
|
||||||
|
f'Cancelling parent on,\n'
|
||||||
|
f'msg={msg}\n'
|
||||||
|
f'\n'
|
||||||
|
f'Simulates OOB cancel request!\n'
|
||||||
|
)
|
||||||
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
# XXX, manual test as script
|
||||||
|
if __name__ == '__main__':
|
||||||
|
tractor.log.get_console_log(level='info')
|
||||||
|
for case in [True, False]:
|
||||||
|
log.info(
|
||||||
|
f'\n'
|
||||||
|
f'------ RUNNING SCRIPT TRIAL ------\n'
|
||||||
|
f'child_errors_midstream: {case!r}\n'
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
trio.run(partial(
|
||||||
|
main,
|
||||||
|
child_errors_mid_stream=case,
|
||||||
|
# raise_unmasked=True,
|
||||||
|
loglevel='info',
|
||||||
|
))
|
||||||
|
except Exception as _exc:
|
||||||
|
exc = _exc
|
||||||
|
log.exception(
|
||||||
|
'Should have raised an RTE or Cancelled?\n'
|
||||||
|
)
|
||||||
|
breakpoint()
|
|
@ -95,6 +95,7 @@ def run_example_in_subproc(
|
||||||
and 'integration' not in p[0]
|
and 'integration' not in p[0]
|
||||||
and 'advanced_faults' not in p[0]
|
and 'advanced_faults' not in p[0]
|
||||||
and 'multihost' not in p[0]
|
and 'multihost' not in p[0]
|
||||||
|
and 'trio' not in p[0]
|
||||||
)
|
)
|
||||||
],
|
],
|
||||||
ids=lambda t: t[1],
|
ids=lambda t: t[1],
|
||||||
|
|
|
@ -6,11 +6,18 @@ want to see changed.
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
from types import ModuleType
|
||||||
|
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from _pytest import pathlib
|
||||||
from tractor.trionics import collapse_eg
|
from tractor.trionics import collapse_eg
|
||||||
import trio
|
import trio
|
||||||
from trio import TaskStatus
|
from trio import TaskStatus
|
||||||
|
from tractor._testing import (
|
||||||
|
examples_dir,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
@ -106,8 +113,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Demo how a masking `trio.Cancelled` could be handled by unmasking from the
|
Demo how a masking `trio.Cancelled` could be handled by unmasking
|
||||||
`.__context__` field when a user (by accident) re-raises from a `finally:`.
|
from the `.__context__` field when a user (by accident) re-raises
|
||||||
|
from a `finally:`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
import tractor
|
import tractor
|
||||||
|
@ -117,11 +125,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
tractor.trionics.maybe_raise_from_masking_exc(
|
tractor.trionics.maybe_raise_from_masking_exc(
|
||||||
tn=tn,
|
|
||||||
unmask_from=(
|
unmask_from=(
|
||||||
trio.Cancelled
|
(trio.Cancelled,) if unmask_from_canc
|
||||||
if unmask_from_canc
|
else ()
|
||||||
else None
|
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
):
|
):
|
||||||
|
@ -136,7 +142,6 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
with tractor.devx.maybe_open_crash_handler(
|
with tractor.devx.maybe_open_crash_handler(
|
||||||
pdb=debug_mode,
|
pdb=debug_mode,
|
||||||
) as bxerr:
|
) as bxerr:
|
||||||
if bxerr:
|
|
||||||
assert not bxerr.value
|
assert not bxerr.value
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
|
@ -145,6 +150,7 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
assert not tn.cancel_scope.cancel_called
|
assert not tn.cancel_scope.cancel_called
|
||||||
assert 0
|
assert 0
|
||||||
|
|
||||||
|
if debug_mode:
|
||||||
assert (
|
assert (
|
||||||
(err := bxerr.value)
|
(err := bxerr.value)
|
||||||
and
|
and
|
||||||
|
@ -160,13 +166,13 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
assert len(assert_eg.exceptions) == 1
|
assert len(assert_eg.exceptions) == 1
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_gatherctxs_with_memchan_breaks_multicancelled(
|
def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
|
Demo how a using an `async with sndchan` inside
|
||||||
will break a strict-eg-tn's multi-cancelled absorption..
|
a `.trionics.gather_contexts()` task will break a strict-eg-tn's
|
||||||
|
multi-cancelled absorption..
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from tractor import (
|
from tractor import (
|
||||||
|
@ -192,7 +198,6 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
f'Closed {task!r}\n'
|
f'Closed {task!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with (
|
async with (
|
||||||
# XXX should ensure ONLY the KBI
|
# XXX should ensure ONLY the KBI
|
||||||
|
@ -213,3 +218,85 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
|
|
||||||
with pytest.raises(KeyboardInterrupt):
|
with pytest.raises(KeyboardInterrupt):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'raise_unmasked', [
|
||||||
|
True,
|
||||||
|
pytest.param(
|
||||||
|
False,
|
||||||
|
marks=pytest.mark.xfail(
|
||||||
|
reason="see examples/trio/send_chan_aclose_masks.py"
|
||||||
|
)
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'child_errors_mid_stream',
|
||||||
|
[True, False],
|
||||||
|
)
|
||||||
|
def test_unmask_aclose_as_checkpoint_on_aexit(
|
||||||
|
raise_unmasked: bool,
|
||||||
|
child_errors_mid_stream: bool,
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that our unmasker util works over the common case where
|
||||||
|
a mem-chan's `.aclose()` is included in an `@acm` stack
|
||||||
|
and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded
|
||||||
|
exception from user code resulting in a silent failure which
|
||||||
|
appears like graceful cancellation.
|
||||||
|
|
||||||
|
This test suite is mostly implemented as an example script so it
|
||||||
|
could more easily be shared with `trio`-core peeps as `tractor`-less
|
||||||
|
minimum reproducing example.
|
||||||
|
|
||||||
|
'''
|
||||||
|
mod: ModuleType = pathlib.import_path(
|
||||||
|
examples_dir()
|
||||||
|
/ 'trio'
|
||||||
|
/ 'send_chan_aclose_masks_beg.py',
|
||||||
|
root=examples_dir(),
|
||||||
|
consider_namespace_packages=False,
|
||||||
|
)
|
||||||
|
with pytest.raises(RuntimeError):
|
||||||
|
trio.run(partial(
|
||||||
|
mod.main,
|
||||||
|
raise_unmasked=raise_unmasked,
|
||||||
|
child_errors_mid_stream=child_errors_mid_stream,
|
||||||
|
))
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'ignore_special_cases', [
|
||||||
|
True,
|
||||||
|
pytest.param(
|
||||||
|
False,
|
||||||
|
marks=pytest.mark.xfail(
|
||||||
|
reason="see examples/trio/lockacquire_not_umasked.py"
|
||||||
|
)
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
def test_cancelled_lockacquire_in_ipctx_not_unmaskeed(
|
||||||
|
ignore_special_cases: bool,
|
||||||
|
loglevel: str,
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
|
mod: ModuleType = pathlib.import_path(
|
||||||
|
examples_dir()
|
||||||
|
/ 'trio'
|
||||||
|
/ 'lockacquire_not_unmasked.py',
|
||||||
|
root=examples_dir(),
|
||||||
|
consider_namespace_packages=False,
|
||||||
|
)
|
||||||
|
async def _main():
|
||||||
|
with trio.fail_after(2):
|
||||||
|
await mod.main(
|
||||||
|
ignore_special_cases=ignore_special_cases,
|
||||||
|
loglevel=loglevel,
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
)
|
||||||
|
|
||||||
|
trio.run(_main)
|
||||||
|
|
|
@ -654,8 +654,7 @@ async def _invoke(
|
||||||
# scope ensures unasking of the `await coro` below
|
# scope ensures unasking of the `await coro` below
|
||||||
# *should* never be interfered with!!
|
# *should* never be interfered with!!
|
||||||
maybe_raise_from_masking_exc(
|
maybe_raise_from_masking_exc(
|
||||||
tn=tn,
|
unmask_from=(Cancelled,),
|
||||||
unmask_from=Cancelled,
|
|
||||||
) as _mbme, # maybe boxed masked exc
|
) as _mbme, # maybe boxed masked exc
|
||||||
):
|
):
|
||||||
ctx._scope_nursery = tn
|
ctx._scope_nursery = tn
|
||||||
|
|
|
@ -31,7 +31,6 @@ from typing import (
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
Callable,
|
Callable,
|
||||||
Hashable,
|
Hashable,
|
||||||
Optional,
|
|
||||||
Sequence,
|
Sequence,
|
||||||
TypeVar,
|
TypeVar,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
@ -204,7 +203,7 @@ class _Cache:
|
||||||
a kept-alive-while-in-use async resource.
|
a kept-alive-while-in-use async resource.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
service_tn: Optional[trio.Nursery] = None
|
service_tn: trio.Nursery|None = None
|
||||||
locks: dict[Hashable, trio.Lock] = {}
|
locks: dict[Hashable, trio.Lock] = {}
|
||||||
users: int = 0
|
users: int = 0
|
||||||
values: dict[Any, Any] = {}
|
values: dict[Any, Any] = {}
|
||||||
|
@ -213,7 +212,7 @@ class _Cache:
|
||||||
tuple[trio.Nursery, trio.Event]
|
tuple[trio.Nursery, trio.Event]
|
||||||
] = {}
|
] = {}
|
||||||
# nurseries: dict[int, trio.Nursery] = {}
|
# nurseries: dict[int, trio.Nursery] = {}
|
||||||
no_more_users: Optional[trio.Event] = None
|
no_more_users: trio.Event|None = None
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
async def run_ctx(
|
async def run_ctx(
|
||||||
|
@ -223,15 +222,17 @@ class _Cache:
|
||||||
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
try:
|
||||||
async with mng as value:
|
async with mng as value:
|
||||||
_, no_more_users = cls.resources[ctx_key]
|
_, no_more_users = cls.resources[ctx_key]
|
||||||
|
try:
|
||||||
cls.values[ctx_key] = value
|
cls.values[ctx_key] = value
|
||||||
task_status.started(value)
|
task_status.started(value)
|
||||||
try:
|
|
||||||
await no_more_users.wait()
|
await no_more_users.wait()
|
||||||
finally:
|
finally:
|
||||||
# discard nursery ref so it won't be re-used (an error)?
|
|
||||||
value = cls.values.pop(ctx_key)
|
value = cls.values.pop(ctx_key)
|
||||||
|
finally:
|
||||||
|
# discard nursery ref so it won't be re-used (an error)?
|
||||||
cls.resources.pop(ctx_key)
|
cls.resources.pop(ctx_key)
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -22,7 +22,14 @@ from __future__ import annotations
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
from typing import TYPE_CHECKING
|
import inspect
|
||||||
|
from types import (
|
||||||
|
TracebackType,
|
||||||
|
)
|
||||||
|
from typing import (
|
||||||
|
Type,
|
||||||
|
TYPE_CHECKING,
|
||||||
|
)
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
@ -60,12 +67,71 @@ def find_masked_excs(
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
_mask_cases: dict[
|
||||||
|
Type[Exception], # masked exc type
|
||||||
|
dict[
|
||||||
|
int, # inner-frame index into `inspect.getinnerframes()`
|
||||||
|
# `FrameInfo.function/filename: str`s to match
|
||||||
|
tuple[str, str],
|
||||||
|
],
|
||||||
|
] = {
|
||||||
|
trio.WouldBlock: {
|
||||||
|
# `trio.Lock.acquire()` has a checkpoint inside the
|
||||||
|
# `WouldBlock`-no_wait path's handler..
|
||||||
|
-5: { # "5th frame up" from checkpoint
|
||||||
|
'filename': 'trio/_sync.py',
|
||||||
|
'function': 'acquire',
|
||||||
|
# 'lineno': 605, # matters?
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def is_expected_masking_case(
|
||||||
|
cases: dict,
|
||||||
|
exc_ctx: Exception,
|
||||||
|
exc_match: BaseException,
|
||||||
|
|
||||||
|
) -> bool|inspect.FrameInfo:
|
||||||
|
'''
|
||||||
|
Determine whether the provided masked exception is from a known
|
||||||
|
bug/special/unintentional-`trio`-impl case which we do not wish
|
||||||
|
to unmask.
|
||||||
|
|
||||||
|
Return any guilty `inspect.FrameInfo` ow `False`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
exc_tb: TracebackType = exc_match.__traceback__
|
||||||
|
if cases := _mask_cases.get(type(exc_ctx)):
|
||||||
|
inner: list[inspect.FrameInfo] = inspect.getinnerframes(exc_tb)
|
||||||
|
|
||||||
|
# from tractor.devx.debug import mk_pdb
|
||||||
|
# mk_pdb().set_trace()
|
||||||
|
for iframe, matchon in cases.items():
|
||||||
|
try:
|
||||||
|
masker_frame: inspect.FrameInfo = inner[iframe]
|
||||||
|
except IndexError:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for field, in_field in matchon.items():
|
||||||
|
val = getattr(
|
||||||
|
masker_frame,
|
||||||
|
field,
|
||||||
|
)
|
||||||
|
if in_field not in val:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
return masker_frame
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# XXX, relevant discussion @ `trio`-core,
|
# XXX, relevant discussion @ `trio`-core,
|
||||||
# https://github.com/python-trio/trio/issues/455
|
# https://github.com/python-trio/trio/issues/455
|
||||||
#
|
#
|
||||||
@acm
|
@acm
|
||||||
async def maybe_raise_from_masking_exc(
|
async def maybe_raise_from_masking_exc(
|
||||||
tn: trio.Nursery|None = None,
|
|
||||||
unmask_from: (
|
unmask_from: (
|
||||||
BaseException|
|
BaseException|
|
||||||
tuple[BaseException]
|
tuple[BaseException]
|
||||||
|
@ -74,18 +140,30 @@ async def maybe_raise_from_masking_exc(
|
||||||
raise_unmasked: bool = True,
|
raise_unmasked: bool = True,
|
||||||
extra_note: str = (
|
extra_note: str = (
|
||||||
'This can occurr when,\n'
|
'This can occurr when,\n'
|
||||||
' - a `trio.Nursery` scope embeds a `finally:`-block '
|
'\n'
|
||||||
'which executes a checkpoint!'
|
' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block '
|
||||||
|
'which execs an un-shielded checkpoint!'
|
||||||
#
|
#
|
||||||
# ^TODO? other cases?
|
# ^TODO? other cases?
|
||||||
),
|
),
|
||||||
|
|
||||||
always_warn_on: tuple[BaseException] = (
|
always_warn_on: tuple[Type[BaseException]] = (
|
||||||
trio.Cancelled,
|
trio.Cancelled,
|
||||||
),
|
),
|
||||||
|
|
||||||
|
# don't ever unmask or warn on any masking pair,
|
||||||
|
# {<masked-excT-key> -> <masking-excT-value>}
|
||||||
|
never_warn_on: dict[
|
||||||
|
Type[BaseException],
|
||||||
|
Type[BaseException],
|
||||||
|
] = {
|
||||||
|
KeyboardInterrupt: trio.Cancelled,
|
||||||
|
trio.Cancelled: trio.Cancelled,
|
||||||
|
},
|
||||||
# ^XXX, special case(s) where we warn-log bc likely
|
# ^XXX, special case(s) where we warn-log bc likely
|
||||||
# there will be no operational diff since the exc
|
# there will be no operational diff since the exc
|
||||||
# is always expected to be consumed.
|
# is always expected to be consumed.
|
||||||
|
|
||||||
) -> BoxedMaybeException:
|
) -> BoxedMaybeException:
|
||||||
'''
|
'''
|
||||||
Maybe un-mask and re-raise exception(s) suppressed by a known
|
Maybe un-mask and re-raise exception(s) suppressed by a known
|
||||||
|
@ -104,81 +182,111 @@ async def maybe_raise_from_masking_exc(
|
||||||
individual sub-excs but maintain the eg-parent's form right?
|
individual sub-excs but maintain the eg-parent's form right?
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
if not isinstance(unmask_from, tuple):
|
||||||
|
raise ValueError(
|
||||||
|
f'Invalid unmask_from = {unmask_from!r}\n'
|
||||||
|
f'Must be a `tuple[Type[BaseException]]`.\n'
|
||||||
|
)
|
||||||
|
|
||||||
from tractor.devx.debug import (
|
from tractor.devx.debug import (
|
||||||
BoxedMaybeException,
|
BoxedMaybeException,
|
||||||
pause,
|
|
||||||
)
|
)
|
||||||
boxed_maybe_exc = BoxedMaybeException(
|
boxed_maybe_exc = BoxedMaybeException(
|
||||||
raise_on_exit=raise_unmasked,
|
raise_on_exit=raise_unmasked,
|
||||||
)
|
)
|
||||||
matching: list[BaseException]|None = None
|
matching: list[BaseException]|None = None
|
||||||
maybe_eg: ExceptionGroup|None
|
try:
|
||||||
|
|
||||||
if tn:
|
|
||||||
try: # handle egs
|
|
||||||
yield boxed_maybe_exc
|
yield boxed_maybe_exc
|
||||||
return
|
return
|
||||||
except* unmask_from as _maybe_eg:
|
except BaseException as _bexc:
|
||||||
maybe_eg = _maybe_eg
|
bexc = _bexc
|
||||||
|
if isinstance(bexc, BaseExceptionGroup):
|
||||||
matches: ExceptionGroup
|
matches: ExceptionGroup
|
||||||
matches, _ = maybe_eg.split(
|
matches, _ = bexc.split(unmask_from)
|
||||||
|
if matches:
|
||||||
|
matching = matches.exceptions
|
||||||
|
|
||||||
|
elif (
|
||||||
unmask_from
|
unmask_from
|
||||||
)
|
and
|
||||||
if not matches:
|
type(bexc) in unmask_from
|
||||||
raise
|
):
|
||||||
|
matching = [bexc]
|
||||||
matching: list[BaseException] = matches.exceptions
|
|
||||||
else:
|
|
||||||
try: # handle non-egs
|
|
||||||
yield boxed_maybe_exc
|
|
||||||
return
|
|
||||||
except unmask_from as _maybe_exc:
|
|
||||||
maybe_exc = _maybe_exc
|
|
||||||
matching: list[BaseException] = [
|
|
||||||
maybe_exc
|
|
||||||
]
|
|
||||||
|
|
||||||
# XXX, only unmask-ed for debuggin!
|
|
||||||
# TODO, remove eventually..
|
|
||||||
except BaseException as _berr:
|
|
||||||
berr = _berr
|
|
||||||
await pause(shield=True)
|
|
||||||
raise berr
|
|
||||||
|
|
||||||
if matching is None:
|
if matching is None:
|
||||||
raise
|
raise
|
||||||
|
|
||||||
masked: list[tuple[BaseException, BaseException]] = []
|
masked: list[tuple[BaseException, BaseException]] = []
|
||||||
for exc_match in matching:
|
for exc_match in matching:
|
||||||
|
|
||||||
if exc_ctx := find_masked_excs(
|
if exc_ctx := find_masked_excs(
|
||||||
maybe_masker=exc_match,
|
maybe_masker=exc_match,
|
||||||
unmask_from={unmask_from},
|
unmask_from=set(unmask_from),
|
||||||
):
|
):
|
||||||
masked.append((exc_ctx, exc_match))
|
masked.append((
|
||||||
|
exc_ctx,
|
||||||
|
exc_match,
|
||||||
|
))
|
||||||
boxed_maybe_exc.value = exc_match
|
boxed_maybe_exc.value = exc_match
|
||||||
note: str = (
|
note: str = (
|
||||||
f'\n'
|
f'\n'
|
||||||
f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n'
|
f'^^WARNING^^\n'
|
||||||
|
f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n'
|
||||||
)
|
)
|
||||||
if extra_note:
|
if extra_note:
|
||||||
note += (
|
note += (
|
||||||
f'\n'
|
f'\n'
|
||||||
f'{extra_note}\n'
|
f'{extra_note}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
do_warn: bool = (
|
||||||
|
never_warn_on.get(
|
||||||
|
type(exc_ctx) # masking type
|
||||||
|
)
|
||||||
|
is not
|
||||||
|
type(exc_match) # masked type
|
||||||
|
)
|
||||||
|
|
||||||
|
if do_warn:
|
||||||
exc_ctx.add_note(note)
|
exc_ctx.add_note(note)
|
||||||
|
|
||||||
if type(exc_match) in always_warn_on:
|
if (
|
||||||
|
do_warn
|
||||||
|
and
|
||||||
|
type(exc_match) in always_warn_on
|
||||||
|
):
|
||||||
log.warning(note)
|
log.warning(note)
|
||||||
|
|
||||||
# await tractor.pause(shield=True)
|
if (
|
||||||
if raise_unmasked:
|
do_warn
|
||||||
|
and
|
||||||
|
raise_unmasked
|
||||||
|
):
|
||||||
if len(masked) < 2:
|
if len(masked) < 2:
|
||||||
|
# don't unmask already known "special" cases..
|
||||||
|
if (
|
||||||
|
_mask_cases
|
||||||
|
and
|
||||||
|
(cases := _mask_cases.get(type(exc_ctx)))
|
||||||
|
and
|
||||||
|
(masker_frame := is_expected_masking_case(
|
||||||
|
cases,
|
||||||
|
exc_ctx,
|
||||||
|
exc_match,
|
||||||
|
))
|
||||||
|
):
|
||||||
|
log.warning(
|
||||||
|
f'Ignoring already-known/non-ideally-valid masker code @\n'
|
||||||
|
f'{masker_frame}\n'
|
||||||
|
f'\n'
|
||||||
|
f'NOT raising {exc_ctx} from masker {exc_match!r}\n'
|
||||||
|
)
|
||||||
|
raise exc_match
|
||||||
|
|
||||||
raise exc_ctx from exc_match
|
raise exc_ctx from exc_match
|
||||||
else:
|
|
||||||
# ?TODO, see above but, possibly unmasking sub-exc
|
# ??TODO, see above but, possibly unmasking sub-exc
|
||||||
# entries if there are > 1
|
# entries if there are > 1
|
||||||
await pause(shield=True)
|
# else:
|
||||||
|
# await pause(shield=True)
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
|
Loading…
Reference in New Issue