Compare commits
23 Commits
0cbf02bf2e
...
b096867d40
Author | SHA1 | Date |
---|---|---|
|
b096867d40 | |
|
a3c9822602 | |
|
e3a542f2b5 | |
|
0ffcea1033 | |
|
a7bdf0486c | |
|
d2ac9ecf95 | |
|
dcb1062bb8 | |
|
05d865c0f1 | |
|
8218f0f51f | |
|
8f19f5d3a8 | |
|
64c27a914b | |
|
d9c8d543b3 | |
|
048b154f00 | |
|
88828e9f99 | |
|
25ff195c17 | |
|
f60cc646ff | |
|
a2b754b5f5 | |
|
5e13588aed | |
|
0a56f40bab | |
|
f776c47cb4 | |
|
7f584d4f54 | |
|
d650dda0fa | |
|
f6598e8400 |
|
@ -16,6 +16,7 @@ from tractor import (
|
|||
ContextCancelled,
|
||||
MsgStream,
|
||||
_testing,
|
||||
trionics,
|
||||
)
|
||||
import trio
|
||||
import pytest
|
||||
|
@ -62,9 +63,8 @@ async def recv_and_spawn_net_killers(
|
|||
await ctx.started()
|
||||
async with (
|
||||
ctx.open_stream() as stream,
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn,
|
||||
trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
async for i in stream:
|
||||
print(f'child echoing {i}')
|
||||
|
|
|
@ -23,9 +23,8 @@ async def main():
|
|||
modules=[__name__]
|
||||
) as portal_map,
|
||||
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn,
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
|
||||
for (name, portal) in portal_map.items():
|
||||
|
|
|
@ -1,145 +0,0 @@
|
|||
from contextlib import (
|
||||
contextmanager as cm,
|
||||
# TODO, any diff in async case(s)??
|
||||
# asynccontextmanager as acm,
|
||||
)
|
||||
from functools import partial
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
|
||||
log = tractor.log.get_logger(__name__)
|
||||
tractor.log.get_console_log('info')
|
||||
|
||||
@cm
|
||||
def teardown_on_exc(
|
||||
raise_from_handler: bool = False,
|
||||
):
|
||||
'''
|
||||
You could also have a teardown handler which catches any exc and
|
||||
does some required teardown. In this case the problem is
|
||||
compounded UNLESS you ensure the handler's scope is OUTSIDE the
|
||||
`ux.aclose()`.. that is in the caller's enclosing scope.
|
||||
|
||||
'''
|
||||
try:
|
||||
yield
|
||||
except BaseException as _berr:
|
||||
berr = _berr
|
||||
log.exception(
|
||||
f'Handling termination teardown in child due to,\n'
|
||||
f'{berr!r}\n'
|
||||
)
|
||||
if raise_from_handler:
|
||||
# XXX teardown ops XXX
|
||||
# on termination these steps say need to be run to
|
||||
# ensure wider system consistency (like the state of
|
||||
# remote connections/services).
|
||||
#
|
||||
# HOWEVER, any bug in this teardown code is also
|
||||
# masked by the `tx.aclose()`!
|
||||
# this is also true if `_tn.cancel_scope` is
|
||||
# `.cancel_called` by the parent in a graceful
|
||||
# request case..
|
||||
|
||||
# simulate a bug in teardown handler.
|
||||
raise RuntimeError(
|
||||
'woopsie teardown bug!'
|
||||
)
|
||||
|
||||
raise # no teardown bug.
|
||||
|
||||
|
||||
async def finite_stream_to_rent(
|
||||
tx: trio.abc.SendChannel,
|
||||
child_errors_mid_stream: bool,
|
||||
|
||||
task_status: trio.TaskStatus[
|
||||
trio.CancelScope,
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
async with (
|
||||
# XXX without this unmasker the mid-streaming RTE is never
|
||||
# reported since it is masked by the `tx.aclose()`
|
||||
# call which in turn raises `Cancelled`!
|
||||
#
|
||||
# NOTE, this is WITHOUT doing any exception handling
|
||||
# inside the child task!
|
||||
#
|
||||
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
||||
# tractor.trionics.maybe_raise_from_masking_exc(),
|
||||
|
||||
tx as tx, # .aclose() is the guilty masker chkpt!
|
||||
trio.open_nursery() as _tn,
|
||||
):
|
||||
# pass our scope back to parent for supervision\
|
||||
# control.
|
||||
task_status.started(_tn.cancel_scope)
|
||||
|
||||
with teardown_on_exc(
|
||||
raise_from_handler=not child_errors_mid_stream,
|
||||
):
|
||||
for i in range(100):
|
||||
log.info(
|
||||
f'Child tx {i!r}\n'
|
||||
)
|
||||
if (
|
||||
child_errors_mid_stream
|
||||
and
|
||||
i == 66
|
||||
):
|
||||
# oh wait but WOOPS there's a bug
|
||||
# in that teardown code!?
|
||||
raise RuntimeError(
|
||||
'woopsie, a mid-streaming bug!?'
|
||||
)
|
||||
|
||||
await tx.send(i)
|
||||
|
||||
|
||||
async def main(
|
||||
# TODO! toggle this for the 2 cases!
|
||||
# 1. child errors mid-stream while parent is also requesting
|
||||
# (graceful) cancel of that child streamer.
|
||||
#
|
||||
# 2. child contains a teardown handler which contains a
|
||||
# bug and raises.
|
||||
#
|
||||
child_errors_mid_stream: bool,
|
||||
):
|
||||
tx, rx = trio.open_memory_channel(1)
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
rx as rx,
|
||||
):
|
||||
|
||||
_child_cs = await tn.start(
|
||||
partial(
|
||||
finite_stream_to_rent,
|
||||
child_errors_mid_stream=child_errors_mid_stream,
|
||||
tx=tx,
|
||||
)
|
||||
)
|
||||
async for msg in rx:
|
||||
log.info(
|
||||
f'Rent rx {msg!r}\n'
|
||||
)
|
||||
|
||||
# simulate some external cancellation
|
||||
# request **JUST BEFORE** the child errors.
|
||||
if msg == 65:
|
||||
log.cancel(
|
||||
f'Cancelling parent on,\n'
|
||||
f'msg={msg}\n'
|
||||
f'\n'
|
||||
f'Simulates OOB cancel request!\n'
|
||||
)
|
||||
tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
for case in [True, False]:
|
||||
trio.run(main, case)
|
|
@ -313,9 +313,8 @@ async def inf_streamer(
|
|||
# `trio.EndOfChannel` doesn't propagate directly to the above
|
||||
# .open_stream() parent, resulting in it also raising instead
|
||||
# of gracefully absorbing as normal.. so how to handle?
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn,
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
async def close_stream_on_sentinel():
|
||||
async for msg in stream:
|
||||
|
|
|
@ -532,10 +532,15 @@ def test_cancel_via_SIGINT_other_task(
|
|||
async def main():
|
||||
# should never timeout since SIGINT should cancel the current program
|
||||
with trio.fail_after(timeout):
|
||||
async with trio.open_nursery(
|
||||
async with (
|
||||
|
||||
# XXX ?TODO? why no work!?
|
||||
# tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as n:
|
||||
await n.start(spawn_and_sleep_forever)
|
||||
) as tn,
|
||||
):
|
||||
await tn.start(spawn_and_sleep_forever)
|
||||
if 'mp' in spawn_backend:
|
||||
time.sleep(0.1)
|
||||
os.kill(pid, signal.SIGINT)
|
||||
|
|
|
@ -117,9 +117,10 @@ async def open_actor_local_nursery(
|
|||
ctx: tractor.Context,
|
||||
):
|
||||
global _nursery
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn:
|
||||
async with (
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn
|
||||
):
|
||||
_nursery = tn
|
||||
await ctx.started()
|
||||
await trio.sleep(10)
|
||||
|
|
|
@ -11,6 +11,7 @@ import psutil
|
|||
import pytest
|
||||
import subprocess
|
||||
import tractor
|
||||
from tractor.trionics import collapse_eg
|
||||
from tractor._testing import tractor_test
|
||||
import trio
|
||||
|
||||
|
@ -193,10 +194,10 @@ async def spawn_and_check_registry(
|
|||
|
||||
try:
|
||||
async with tractor.open_nursery() as an:
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as trion:
|
||||
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as trion,
|
||||
):
|
||||
portals = {}
|
||||
for i in range(3):
|
||||
name = f'a{i}'
|
||||
|
@ -338,11 +339,12 @@ async def close_chans_before_nursery(
|
|||
async with portal2.open_stream_from(
|
||||
stream_forever
|
||||
) as agen2:
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as n:
|
||||
n.start_soon(streamer, agen1)
|
||||
n.start_soon(cancel, use_signal, .5)
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
tn.start_soon(streamer, agen1)
|
||||
tn.start_soon(cancel, use_signal, .5)
|
||||
try:
|
||||
await streamer(agen2)
|
||||
finally:
|
||||
|
|
|
@ -234,10 +234,8 @@ async def trio_ctx(
|
|||
with trio.fail_after(1 + delay):
|
||||
try:
|
||||
async with (
|
||||
trio.open_nursery(
|
||||
# TODO, for new `trio` / py3.13
|
||||
# strict_exception_groups=False,
|
||||
) as tn,
|
||||
tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
tractor.to_asyncio.open_channel_from(
|
||||
sleep_and_err,
|
||||
) as (first, chan),
|
||||
|
|
|
@ -8,6 +8,7 @@ from contextlib import (
|
|||
)
|
||||
|
||||
import pytest
|
||||
from tractor.trionics import collapse_eg
|
||||
import trio
|
||||
from trio import TaskStatus
|
||||
|
||||
|
@ -64,9 +65,8 @@ def test_stashed_child_nursery(use_start_soon):
|
|||
async def main():
|
||||
|
||||
async with (
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as pn,
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as pn,
|
||||
):
|
||||
cn = await pn.start(mk_child_nursery)
|
||||
assert cn
|
||||
|
@ -197,10 +197,8 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
|||
async with (
|
||||
# XXX should ensure ONLY the KBI
|
||||
# is relayed upward
|
||||
trionics.collapse_eg(),
|
||||
trio.open_nursery(
|
||||
# strict_exception_groups=False,
|
||||
), # as tn,
|
||||
collapse_eg(),
|
||||
trio.open_nursery(), # as tn,
|
||||
|
||||
trionics.gather_contexts([
|
||||
open_memchan(),
|
||||
|
|
|
@ -1760,9 +1760,7 @@ async def async_main(
|
|||
f' {pformat(ipc_server._peers)}'
|
||||
)
|
||||
log.runtime(teardown_report)
|
||||
await ipc_server.wait_for_no_more_peers(
|
||||
shield=True,
|
||||
)
|
||||
await ipc_server.wait_for_no_more_peers()
|
||||
|
||||
teardown_report += (
|
||||
'-]> all peer channels are complete.\n'
|
||||
|
|
|
@ -814,10 +814,14 @@ class Server(Struct):
|
|||
|
||||
async def wait_for_no_more_peers(
|
||||
self,
|
||||
shield: bool = False,
|
||||
# XXX, should this even be allowed?
|
||||
# -> i've seen it cause hangs on teardown
|
||||
# in `test_resource_cache.py`
|
||||
# _shield: bool = False,
|
||||
) -> None:
|
||||
with trio.CancelScope(shield=shield):
|
||||
await self._no_more_peers.wait()
|
||||
# with trio.CancelScope(shield=_shield):
|
||||
# await self._no_more_peers.wait()
|
||||
|
||||
async def wait_for_peer(
|
||||
self,
|
||||
|
|
Loading…
Reference in New Issue