diff --git a/examples/trio/lockacquire_not_unmasked.py b/examples/trio/lockacquire_not_unmasked.py new file mode 100644 index 00000000..2f979a00 --- /dev/null +++ b/examples/trio/lockacquire_not_unmasked.py @@ -0,0 +1,85 @@ +from contextlib import ( + asynccontextmanager as acm, +) +from functools import partial + +import tractor +import trio + + +log = tractor.log.get_logger( + name=__name__ +) + +_lock: trio.Lock|None = None + + +@acm +async def acquire_singleton_lock( +) -> None: + global _lock + if _lock is None: + log.info('Allocating LOCK') + _lock = trio.Lock() + + log.info('TRYING TO LOCK ACQUIRE') + async with _lock: + log.info('ACQUIRED') + yield _lock + + log.info('RELEASED') + + + +async def hold_lock_forever( + task_status=trio.TASK_STATUS_IGNORED +): + async with ( + tractor.trionics.maybe_raise_from_masking_exc(), + acquire_singleton_lock() as lock, + ): + task_status.started(lock) + await trio.sleep_forever() + + +async def main( + ignore_special_cases: bool, + loglevel: str = 'info', + debug_mode: bool = True, +): + async with ( + trio.open_nursery() as tn, + + # tractor.trionics.maybe_raise_from_masking_exc() + # ^^^ XXX NOTE, interestingly putting the unmasker + # here does not exhibit the same behaviour ?? + ): + if not ignore_special_cases: + from tractor.trionics import _taskc + _taskc._mask_cases.clear() + + _lock = await tn.start( + hold_lock_forever, + ) + with trio.move_on_after(0.2): + await tn.start( + hold_lock_forever, + ) + + tn.cancel_scope.cancel() + + +# XXX, manual test as script +if __name__ == '__main__': + tractor.log.get_console_log(level='info') + for case in [True, False]: + log.info( + f'\n' + f'------ RUNNING SCRIPT TRIAL ------\n' + f'ignore_special_cases: {case!r}\n' + ) + trio.run(partial( + main, + ignore_special_cases=case, + loglevel='info', + )) diff --git a/examples/trio/send_chan_aclose_masks_beg.py b/examples/trio/send_chan_aclose_masks_beg.py new file mode 100644 index 00000000..e7f895b7 --- /dev/null +++ b/examples/trio/send_chan_aclose_masks_beg.py @@ -0,0 +1,195 @@ +from contextlib import ( + contextmanager as cm, + # TODO, any diff in async case(s)?? + # asynccontextmanager as acm, +) +from functools import partial + +import tractor +import trio + + +log = tractor.log.get_logger( + name=__name__ +) + + +@cm +def teardown_on_exc( + raise_from_handler: bool = False, +): + ''' + You could also have a teardown handler which catches any exc and + does some required teardown. In this case the problem is + compounded UNLESS you ensure the handler's scope is OUTSIDE the + `ux.aclose()`.. that is in the caller's enclosing scope. + + ''' + try: + yield + except BaseException as _berr: + berr = _berr + log.exception( + f'Handling termination teardown in child due to,\n' + f'{berr!r}\n' + ) + if raise_from_handler: + # XXX teardown ops XXX + # on termination these steps say need to be run to + # ensure wider system consistency (like the state of + # remote connections/services). + # + # HOWEVER, any bug in this teardown code is also + # masked by the `tx.aclose()`! + # this is also true if `_tn.cancel_scope` is + # `.cancel_called` by the parent in a graceful + # request case.. + + # simulate a bug in teardown handler. + raise RuntimeError( + 'woopsie teardown bug!' + ) + + raise # no teardown bug. + + +async def finite_stream_to_rent( + tx: trio.abc.SendChannel, + child_errors_mid_stream: bool, + raise_unmasked: bool, + + task_status: trio.TaskStatus[ + trio.CancelScope, + ] = trio.TASK_STATUS_IGNORED, +): + async with ( + # XXX without this unmasker the mid-streaming RTE is never + # reported since it is masked by the `tx.aclose()` + # call which in turn raises `Cancelled`! + # + # NOTE, this is WITHOUT doing any exception handling + # inside the child task! + # + # TODO, uncomment next LoC to see the supprsessed beg[RTE]! + tractor.trionics.maybe_raise_from_masking_exc( + raise_unmasked=raise_unmasked, + ), + + tx as tx, # .aclose() is the guilty masker chkpt! + + # XXX, this ONLY matters in the + # `child_errors_mid_stream=False` case oddly!? + # THAT IS, if no tn is opened in that case then the + # test will not fail; it raises the RTE correctly? + # + # -> so it seems this new scope somehow affects the form of + # eventual in the parent EG? + tractor.trionics.maybe_open_nursery( + nursery=( + None + if not child_errors_mid_stream + else True + ), + ) as _tn, + ): + # pass our scope back to parent for supervision\ + # control. + cs: trio.CancelScope|None = ( + None + if _tn is True + else _tn.cancel_scope + ) + task_status.started(cs) + + with teardown_on_exc( + raise_from_handler=not child_errors_mid_stream, + ): + for i in range(100): + log.debug( + f'Child tx {i!r}\n' + ) + if ( + child_errors_mid_stream + and + i == 66 + ): + # oh wait but WOOPS there's a bug + # in that teardown code!? + raise RuntimeError( + 'woopsie, a mid-streaming bug!?' + ) + + await tx.send(i) + + +async def main( + # TODO! toggle this for the 2 cases! + # 1. child errors mid-stream while parent is also requesting + # (graceful) cancel of that child streamer. + # + # 2. child contains a teardown handler which contains a + # bug and raises. + # + child_errors_mid_stream: bool, + + raise_unmasked: bool = False, + loglevel: str = 'info', +): + tractor.log.get_console_log(level=loglevel) + + # the `.aclose()` being checkpoints on these + # is the source of the problem.. + tx, rx = trio.open_memory_channel(1) + + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn, + rx as rx, + ): + _child_cs = await tn.start( + partial( + finite_stream_to_rent, + child_errors_mid_stream=child_errors_mid_stream, + raise_unmasked=raise_unmasked, + tx=tx, + ) + ) + async for msg in rx: + log.debug( + f'Rent rx {msg!r}\n' + ) + + # simulate some external cancellation + # request **JUST BEFORE** the child errors. + if msg == 65: + log.cancel( + f'Cancelling parent on,\n' + f'msg={msg}\n' + f'\n' + f'Simulates OOB cancel request!\n' + ) + tn.cancel_scope.cancel() + + +# XXX, manual test as script +if __name__ == '__main__': + tractor.log.get_console_log(level='info') + for case in [True, False]: + log.info( + f'\n' + f'------ RUNNING SCRIPT TRIAL ------\n' + f'child_errors_midstream: {case!r}\n' + ) + try: + trio.run(partial( + main, + child_errors_mid_stream=case, + # raise_unmasked=True, + loglevel='info', + )) + except Exception as _exc: + exc = _exc + log.exception( + 'Should have raised an RTE or Cancelled?\n' + ) + breakpoint() diff --git a/tests/test_docs_examples.py b/tests/test_docs_examples.py index 6250e0aa..b4cf85eb 100644 --- a/tests/test_docs_examples.py +++ b/tests/test_docs_examples.py @@ -95,6 +95,7 @@ def run_example_in_subproc( and 'integration' not in p[0] and 'advanced_faults' not in p[0] and 'multihost' not in p[0] + and 'trio' not in p[0] ) ], ids=lambda t: t[1], diff --git a/tests/test_trioisms.py b/tests/test_trioisms.py index ca1e6d55..516d30e1 100644 --- a/tests/test_trioisms.py +++ b/tests/test_trioisms.py @@ -6,11 +6,18 @@ want to see changed. from contextlib import ( asynccontextmanager as acm, ) +from types import ModuleType + +from functools import partial import pytest +from _pytest import pathlib from tractor.trionics import collapse_eg import trio from trio import TaskStatus +from tractor._testing import ( + examples_dir, +) @pytest.mark.parametrize( @@ -106,8 +113,9 @@ def test_acm_embedded_nursery_propagates_enter_err( debug_mode: bool, ): ''' - Demo how a masking `trio.Cancelled` could be handled by unmasking from the - `.__context__` field when a user (by accident) re-raises from a `finally:`. + Demo how a masking `trio.Cancelled` could be handled by unmasking + from the `.__context__` field when a user (by accident) re-raises + from a `finally:`. ''' import tractor @@ -117,11 +125,9 @@ def test_acm_embedded_nursery_propagates_enter_err( async with ( trio.open_nursery() as tn, tractor.trionics.maybe_raise_from_masking_exc( - tn=tn, unmask_from=( - trio.Cancelled - if unmask_from_canc - else None + (trio.Cancelled,) if unmask_from_canc + else () ), ) ): @@ -136,8 +142,7 @@ def test_acm_embedded_nursery_propagates_enter_err( with tractor.devx.maybe_open_crash_handler( pdb=debug_mode, ) as bxerr: - if bxerr: - assert not bxerr.value + assert not bxerr.value async with ( wraps_tn_that_always_cancels() as tn, @@ -145,11 +150,12 @@ def test_acm_embedded_nursery_propagates_enter_err( assert not tn.cancel_scope.cancel_called assert 0 - assert ( - (err := bxerr.value) - and - type(err) is AssertionError - ) + if debug_mode: + assert ( + (err := bxerr.value) + and + type(err) is AssertionError + ) with pytest.raises(ExceptionGroup) as excinfo: trio.run(_main) @@ -160,13 +166,13 @@ def test_acm_embedded_nursery_propagates_enter_err( assert len(assert_eg.exceptions) == 1 - def test_gatherctxs_with_memchan_breaks_multicancelled( debug_mode: bool, ): ''' - Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task - will break a strict-eg-tn's multi-cancelled absorption.. + Demo how a using an `async with sndchan` inside + a `.trionics.gather_contexts()` task will break a strict-eg-tn's + multi-cancelled absorption.. ''' from tractor import ( @@ -192,7 +198,6 @@ def test_gatherctxs_with_memchan_breaks_multicancelled( f'Closed {task!r}\n' ) - async def main(): async with ( # XXX should ensure ONLY the KBI @@ -213,3 +218,85 @@ def test_gatherctxs_with_memchan_breaks_multicancelled( with pytest.raises(KeyboardInterrupt): trio.run(main) + + +@pytest.mark.parametrize( + 'raise_unmasked', [ + True, + pytest.param( + False, + marks=pytest.mark.xfail( + reason="see examples/trio/send_chan_aclose_masks.py" + ) + ), + ] +) +@pytest.mark.parametrize( + 'child_errors_mid_stream', + [True, False], +) +def test_unmask_aclose_as_checkpoint_on_aexit( + raise_unmasked: bool, + child_errors_mid_stream: bool, + debug_mode: bool, +): + ''' + Verify that our unmasker util works over the common case where + a mem-chan's `.aclose()` is included in an `@acm` stack + and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded + exception from user code resulting in a silent failure which + appears like graceful cancellation. + + This test suite is mostly implemented as an example script so it + could more easily be shared with `trio`-core peeps as `tractor`-less + minimum reproducing example. + + ''' + mod: ModuleType = pathlib.import_path( + examples_dir() + / 'trio' + / 'send_chan_aclose_masks_beg.py', + root=examples_dir(), + consider_namespace_packages=False, + ) + with pytest.raises(RuntimeError): + trio.run(partial( + mod.main, + raise_unmasked=raise_unmasked, + child_errors_mid_stream=child_errors_mid_stream, + )) + + + +@pytest.mark.parametrize( + 'ignore_special_cases', [ + True, + pytest.param( + False, + marks=pytest.mark.xfail( + reason="see examples/trio/lockacquire_not_umasked.py" + ) + ), + ] +) +def test_cancelled_lockacquire_in_ipctx_not_unmasked( + ignore_special_cases: bool, + loglevel: str, + debug_mode: bool, +): + mod: ModuleType = pathlib.import_path( + examples_dir() + / 'trio' + / 'lockacquire_not_unmasked.py', + root=examples_dir(), + consider_namespace_packages=False, + ) + async def _main(): + with trio.fail_after(2): + await mod.main( + ignore_special_cases=ignore_special_cases, + loglevel=loglevel, + debug_mode=debug_mode, + ) + + trio.run(_main) diff --git a/tractor/_rpc.py b/tractor/_rpc.py index 573aa77b..68ce56ea 100644 --- a/tractor/_rpc.py +++ b/tractor/_rpc.py @@ -654,8 +654,7 @@ async def _invoke( # scope ensures unasking of the `await coro` below # *should* never be interfered with!! maybe_raise_from_masking_exc( - tn=tn, - unmask_from=Cancelled, + unmask_from=(Cancelled,), ) as _mbme, # maybe boxed masked exc ): ctx._scope_nursery = tn diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 3acfbeda..57897c4e 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -31,7 +31,6 @@ from typing import ( AsyncIterator, Callable, Hashable, - Optional, Sequence, TypeVar, TYPE_CHECKING, @@ -204,7 +203,7 @@ class _Cache: a kept-alive-while-in-use async resource. ''' - service_tn: Optional[trio.Nursery] = None + service_tn: trio.Nursery|None = None locks: dict[Hashable, trio.Lock] = {} users: int = 0 values: dict[Any, Any] = {} @@ -213,7 +212,7 @@ class _Cache: tuple[trio.Nursery, trio.Event] ] = {} # nurseries: dict[int, trio.Nursery] = {} - no_more_users: Optional[trio.Event] = None + no_more_users: trio.Event|None = None @classmethod async def run_ctx( @@ -223,16 +222,18 @@ class _Cache: task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED, ) -> None: - async with mng as value: - _, no_more_users = cls.resources[ctx_key] - cls.values[ctx_key] = value - task_status.started(value) - try: - await no_more_users.wait() - finally: - # discard nursery ref so it won't be re-used (an error)? - value = cls.values.pop(ctx_key) - cls.resources.pop(ctx_key) + try: + async with mng as value: + _, no_more_users = cls.resources[ctx_key] + try: + cls.values[ctx_key] = value + task_status.started(value) + await no_more_users.wait() + finally: + value = cls.values.pop(ctx_key) + finally: + # discard nursery ref so it won't be re-used (an error)? + cls.resources.pop(ctx_key) @acm diff --git a/tractor/trionics/_taskc.py b/tractor/trionics/_taskc.py index 8809524b..0298912d 100644 --- a/tractor/trionics/_taskc.py +++ b/tractor/trionics/_taskc.py @@ -22,7 +22,14 @@ from __future__ import annotations from contextlib import ( asynccontextmanager as acm, ) -from typing import TYPE_CHECKING +import inspect +from types import ( + TracebackType, +) +from typing import ( + Type, + TYPE_CHECKING, +) import trio from tractor.log import get_logger @@ -60,12 +67,71 @@ def find_masked_excs( return None +_mask_cases: dict[ + Type[Exception], # masked exc type + dict[ + int, # inner-frame index into `inspect.getinnerframes()` + # `FrameInfo.function/filename: str`s to match + dict[str, str], + ], +] = { + trio.WouldBlock: { + # `trio.Lock.acquire()` has a checkpoint inside the + # `WouldBlock`-no_wait path's handler.. + -5: { # "5th frame up" from checkpoint + 'filename': 'trio/_sync.py', + 'function': 'acquire', + # 'lineno': 605, # matters? + }, + } +} + + +def is_expected_masking_case( + cases: dict, + exc_ctx: Exception, + exc_match: BaseException, + +) -> bool|inspect.FrameInfo: + ''' + Determine whether the provided masked exception is from a known + bug/special/unintentional-`trio`-impl case which we do not wish + to unmask. + + Return any guilty `inspect.FrameInfo` ow `False`. + + ''' + exc_tb: TracebackType = exc_match.__traceback__ + if cases := _mask_cases.get(type(exc_ctx)): + inner: list[inspect.FrameInfo] = inspect.getinnerframes(exc_tb) + + # from tractor.devx.debug import mk_pdb + # mk_pdb().set_trace() + for iframe, matchon in cases.items(): + try: + masker_frame: inspect.FrameInfo = inner[iframe] + except IndexError: + continue + + for field, in_field in matchon.items(): + val = getattr( + masker_frame, + field, + ) + if in_field not in val: + break + else: + return masker_frame + + return False + + + # XXX, relevant discussion @ `trio`-core, # https://github.com/python-trio/trio/issues/455 # @acm async def maybe_raise_from_masking_exc( - tn: trio.Nursery|None = None, unmask_from: ( BaseException| tuple[BaseException] @@ -74,18 +140,30 @@ async def maybe_raise_from_masking_exc( raise_unmasked: bool = True, extra_note: str = ( 'This can occurr when,\n' - ' - a `trio.Nursery` scope embeds a `finally:`-block ' - 'which executes a checkpoint!' + '\n' + ' - a `trio.Nursery/CancelScope` embeds a `finally/except:`-block ' + 'which execs an un-shielded checkpoint!' # # ^TODO? other cases? ), - always_warn_on: tuple[BaseException] = ( + always_warn_on: tuple[Type[BaseException]] = ( trio.Cancelled, ), + + # don't ever unmask or warn on any masking pair, + # { -> } + never_warn_on: dict[ + Type[BaseException], + Type[BaseException], + ] = { + KeyboardInterrupt: trio.Cancelled, + trio.Cancelled: trio.Cancelled, + }, # ^XXX, special case(s) where we warn-log bc likely # there will be no operational diff since the exc # is always expected to be consumed. + ) -> BoxedMaybeException: ''' Maybe un-mask and re-raise exception(s) suppressed by a known @@ -104,81 +182,112 @@ async def maybe_raise_from_masking_exc( individual sub-excs but maintain the eg-parent's form right? ''' + if not isinstance(unmask_from, tuple): + raise ValueError( + f'Invalid unmask_from = {unmask_from!r}\n' + f'Must be a `tuple[Type[BaseException]]`.\n' + ) + from tractor.devx.debug import ( BoxedMaybeException, - pause, ) boxed_maybe_exc = BoxedMaybeException( raise_on_exit=raise_unmasked, ) matching: list[BaseException]|None = None - maybe_eg: ExceptionGroup|None - - if tn: - try: # handle egs - yield boxed_maybe_exc - return - except* unmask_from as _maybe_eg: - maybe_eg = _maybe_eg + try: + yield boxed_maybe_exc + return + except BaseException as _bexc: + bexc = _bexc + if isinstance(bexc, BaseExceptionGroup): matches: ExceptionGroup - matches, _ = maybe_eg.split( - unmask_from - ) - if not matches: - raise + matches, _ = bexc.split(unmask_from) + if matches: + matching = matches.exceptions - matching: list[BaseException] = matches.exceptions - else: - try: # handle non-egs - yield boxed_maybe_exc - return - except unmask_from as _maybe_exc: - maybe_exc = _maybe_exc - matching: list[BaseException] = [ - maybe_exc - ] - - # XXX, only unmask-ed for debuggin! - # TODO, remove eventually.. - except BaseException as _berr: - berr = _berr - await pause(shield=True) - raise berr + elif ( + unmask_from + and + type(bexc) in unmask_from + ): + matching = [bexc] if matching is None: raise masked: list[tuple[BaseException, BaseException]] = [] for exc_match in matching: - if exc_ctx := find_masked_excs( maybe_masker=exc_match, - unmask_from={unmask_from}, + unmask_from=set(unmask_from), ): - masked.append((exc_ctx, exc_match)) + masked.append(( + exc_ctx, + exc_match, + )) boxed_maybe_exc.value = exc_match note: str = ( f'\n' - f'^^WARNING^^ the above {exc_ctx!r} was masked by a {unmask_from!r}\n' + f'^^WARNING^^\n' + f'the above {type(exc_ctx)!r} was masked by a {type(exc_match)!r}\n' ) if extra_note: note += ( f'\n' f'{extra_note}\n' ) - exc_ctx.add_note(note) - if type(exc_match) in always_warn_on: + do_warn: bool = ( + never_warn_on.get( + type(exc_ctx) # masking type + ) + is not + type(exc_match) # masked type + ) + + if do_warn: + exc_ctx.add_note(note) + + if ( + do_warn + and + type(exc_match) in always_warn_on + ): log.warning(note) - # await tractor.pause(shield=True) - if raise_unmasked: - + if ( + do_warn + and + raise_unmasked + ): if len(masked) < 2: + # don't unmask already known "special" cases.. + if ( + _mask_cases + and + (cases := _mask_cases.get(type(exc_ctx))) + and + (masker_frame := is_expected_masking_case( + cases, + exc_ctx, + exc_match, + )) + ): + log.warning( + f'Ignoring already-known, non-ideal-but-valid ' + f'masker code @\n' + f'{masker_frame}\n' + f'\n' + f'NOT raising {exc_ctx} from masker {exc_match!r}\n' + ) + raise exc_match + raise exc_ctx from exc_match - else: - # ?TODO, see above but, possibly unmasking sub-exc - # entries if there are > 1 - await pause(shield=True) + + # ??TODO, see above but, possibly unmasking sub-exc + # entries if there are > 1 + # else: + # await pause(shield=True) else: raise