Compare commits
2 Commits
e9f3689191
...
04c3d5e239
Author | SHA1 | Date |
---|---|---|
|
04c3d5e239 | |
|
759174729c |
|
@ -9,8 +9,10 @@ import tractor
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
|
||||||
log = tractor.log.get_logger(__name__)
|
log = tractor.log.get_logger(
|
||||||
tractor.log.get_console_log('info')
|
name=__name__
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@cm
|
@cm
|
||||||
def teardown_on_exc(
|
def teardown_on_exc(
|
||||||
|
@ -54,6 +56,7 @@ def teardown_on_exc(
|
||||||
async def finite_stream_to_rent(
|
async def finite_stream_to_rent(
|
||||||
tx: trio.abc.SendChannel,
|
tx: trio.abc.SendChannel,
|
||||||
child_errors_mid_stream: bool,
|
child_errors_mid_stream: bool,
|
||||||
|
raise_unmasked: bool,
|
||||||
|
|
||||||
task_status: trio.TaskStatus[
|
task_status: trio.TaskStatus[
|
||||||
trio.CancelScope,
|
trio.CancelScope,
|
||||||
|
@ -68,20 +71,41 @@ async def finite_stream_to_rent(
|
||||||
# inside the child task!
|
# inside the child task!
|
||||||
#
|
#
|
||||||
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
||||||
# tractor.trionics.maybe_raise_from_masking_exc(),
|
tractor.trionics.maybe_raise_from_masking_exc(
|
||||||
|
raise_unmasked=raise_unmasked,
|
||||||
|
),
|
||||||
|
|
||||||
tx as tx, # .aclose() is the guilty masker chkpt!
|
tx as tx, # .aclose() is the guilty masker chkpt!
|
||||||
trio.open_nursery() as _tn,
|
|
||||||
|
# XXX, this ONLY matters in the
|
||||||
|
# `child_errors_mid_stream=False` case oddly!?
|
||||||
|
# THAT IS, if no tn is opened in that case then the
|
||||||
|
# test will not fail; it raises the RTE correctly?
|
||||||
|
#
|
||||||
|
# -> so it seems this new scope somehow affects the form of
|
||||||
|
# eventual in the parent EG?
|
||||||
|
tractor.trionics.maybe_open_nursery(
|
||||||
|
nursery=(
|
||||||
|
None
|
||||||
|
if not child_errors_mid_stream
|
||||||
|
else True
|
||||||
|
),
|
||||||
|
) as _tn,
|
||||||
):
|
):
|
||||||
# pass our scope back to parent for supervision\
|
# pass our scope back to parent for supervision\
|
||||||
# control.
|
# control.
|
||||||
task_status.started(_tn.cancel_scope)
|
cs: trio.CancelScope|None = (
|
||||||
|
None
|
||||||
|
if _tn is True
|
||||||
|
else _tn.cancel_scope
|
||||||
|
)
|
||||||
|
task_status.started(cs)
|
||||||
|
|
||||||
with teardown_on_exc(
|
with teardown_on_exc(
|
||||||
raise_from_handler=not child_errors_mid_stream,
|
raise_from_handler=not child_errors_mid_stream,
|
||||||
):
|
):
|
||||||
for i in range(100):
|
for i in range(100):
|
||||||
log.info(
|
log.debug(
|
||||||
f'Child tx {i!r}\n'
|
f'Child tx {i!r}\n'
|
||||||
)
|
)
|
||||||
if (
|
if (
|
||||||
|
@ -107,23 +131,31 @@ async def main(
|
||||||
# bug and raises.
|
# bug and raises.
|
||||||
#
|
#
|
||||||
child_errors_mid_stream: bool,
|
child_errors_mid_stream: bool,
|
||||||
|
|
||||||
|
raise_unmasked: bool = False,
|
||||||
|
loglevel: str = 'info',
|
||||||
):
|
):
|
||||||
|
tractor.log.get_console_log(level=loglevel)
|
||||||
|
|
||||||
|
# the `.aclose()` being checkpoints on these
|
||||||
|
# is the source of the problem..
|
||||||
tx, rx = trio.open_memory_channel(1)
|
tx, rx = trio.open_memory_channel(1)
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
|
tractor.trionics.collapse_eg(),
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
rx as rx,
|
rx as rx,
|
||||||
):
|
):
|
||||||
|
|
||||||
_child_cs = await tn.start(
|
_child_cs = await tn.start(
|
||||||
partial(
|
partial(
|
||||||
finite_stream_to_rent,
|
finite_stream_to_rent,
|
||||||
child_errors_mid_stream=child_errors_mid_stream,
|
child_errors_mid_stream=child_errors_mid_stream,
|
||||||
|
raise_unmasked=raise_unmasked,
|
||||||
tx=tx,
|
tx=tx,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
async for msg in rx:
|
async for msg in rx:
|
||||||
log.info(
|
log.debug(
|
||||||
f'Rent rx {msg!r}\n'
|
f'Rent rx {msg!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -139,7 +171,25 @@ async def main(
|
||||||
tn.cancel_scope.cancel()
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
# XXX, manual test as script
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
|
tractor.log.get_console_log(level='info')
|
||||||
for case in [True, False]:
|
for case in [True, False]:
|
||||||
trio.run(main, case)
|
log.info(
|
||||||
|
f'\n'
|
||||||
|
f'------ RUNNING SCRIPT TRIAL ------\n'
|
||||||
|
f'child_errors_midstream: {case!r}\n'
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
trio.run(partial(
|
||||||
|
main,
|
||||||
|
child_errors_mid_stream=case,
|
||||||
|
# raise_unmasked=True,
|
||||||
|
loglevel='info',
|
||||||
|
))
|
||||||
|
except Exception as _exc:
|
||||||
|
exc = _exc
|
||||||
|
log.exception(
|
||||||
|
'Should have raised an RTE or Cancelled?\n'
|
||||||
|
)
|
||||||
|
breakpoint()
|
||||||
|
|
|
@ -6,11 +6,18 @@ want to see changed.
|
||||||
from contextlib import (
|
from contextlib import (
|
||||||
asynccontextmanager as acm,
|
asynccontextmanager as acm,
|
||||||
)
|
)
|
||||||
|
from types import ModuleType
|
||||||
|
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from _pytest import pathlib
|
||||||
from tractor.trionics import collapse_eg
|
from tractor.trionics import collapse_eg
|
||||||
import trio
|
import trio
|
||||||
from trio import TaskStatus
|
from trio import TaskStatus
|
||||||
|
from tractor._testing import (
|
||||||
|
examples_dir,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
@pytest.mark.parametrize(
|
||||||
|
@ -106,8 +113,9 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Demo how a masking `trio.Cancelled` could be handled by unmasking from the
|
Demo how a masking `trio.Cancelled` could be handled by unmasking
|
||||||
`.__context__` field when a user (by accident) re-raises from a `finally:`.
|
from the `.__context__` field when a user (by accident) re-raises
|
||||||
|
from a `finally:`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
import tractor
|
import tractor
|
||||||
|
@ -158,13 +166,13 @@ def test_acm_embedded_nursery_propagates_enter_err(
|
||||||
assert len(assert_eg.exceptions) == 1
|
assert len(assert_eg.exceptions) == 1
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def test_gatherctxs_with_memchan_breaks_multicancelled(
|
def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Demo how a using an `async with sndchan` inside a `.trionics.gather_contexts()` task
|
Demo how a using an `async with sndchan` inside
|
||||||
will break a strict-eg-tn's multi-cancelled absorption..
|
a `.trionics.gather_contexts()` task will break a strict-eg-tn's
|
||||||
|
multi-cancelled absorption..
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from tractor import (
|
from tractor import (
|
||||||
|
@ -190,7 +198,6 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
f'Closed {task!r}\n'
|
f'Closed {task!r}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
async with (
|
async with (
|
||||||
# XXX should ensure ONLY the KBI
|
# XXX should ensure ONLY the KBI
|
||||||
|
@ -211,3 +218,50 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
|
|
||||||
with pytest.raises(KeyboardInterrupt):
|
with pytest.raises(KeyboardInterrupt):
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'raise_unmasked', [
|
||||||
|
True,
|
||||||
|
pytest.param(
|
||||||
|
False,
|
||||||
|
marks=pytest.mark.xfail(
|
||||||
|
reason="see examples/trio/send_chan_aclose_masks.py"
|
||||||
|
)
|
||||||
|
),
|
||||||
|
]
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'child_errors_mid_stream',
|
||||||
|
[True, False],
|
||||||
|
)
|
||||||
|
def test_unmask_aclose_as_checkpoint_on_aexit(
|
||||||
|
raise_unmasked: bool,
|
||||||
|
child_errors_mid_stream: bool,
|
||||||
|
debug_mode: bool,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that our unmasker util works over the common case where
|
||||||
|
a mem-chan's `.aclose()` is included in an `@acm` stack
|
||||||
|
and it being currently a checkpoint, can `trio.Cancelled`-mask an embedded
|
||||||
|
exception from user code resulting in a silent failure which
|
||||||
|
appears like graceful cancellation.
|
||||||
|
|
||||||
|
This test suite is mostly implemented as an example script so it
|
||||||
|
could more easily be shared with `trio`-core peeps as `tractor`-less
|
||||||
|
minimum reproducing example.
|
||||||
|
|
||||||
|
'''
|
||||||
|
mod: ModuleType = pathlib.import_path(
|
||||||
|
examples_dir()
|
||||||
|
/ 'trio'
|
||||||
|
/ 'send_chan_aclose_masks_beg.py',
|
||||||
|
root=examples_dir(),
|
||||||
|
consider_namespace_packages=False,
|
||||||
|
)
|
||||||
|
with pytest.raises(RuntimeError):
|
||||||
|
trio.run(partial(
|
||||||
|
mod.main,
|
||||||
|
raise_unmasked=raise_unmasked,
|
||||||
|
child_errors_mid_stream=child_errors_mid_stream,
|
||||||
|
))
|
||||||
|
|
Loading…
Reference in New Issue