Compare commits
23 Commits
0cbf02bf2e
...
b096867d40
Author | SHA1 | Date |
---|---|---|
|
b096867d40 | |
|
a3c9822602 | |
|
e3a542f2b5 | |
|
0ffcea1033 | |
|
a7bdf0486c | |
|
d2ac9ecf95 | |
|
dcb1062bb8 | |
|
05d865c0f1 | |
|
8218f0f51f | |
|
8f19f5d3a8 | |
|
64c27a914b | |
|
d9c8d543b3 | |
|
048b154f00 | |
|
88828e9f99 | |
|
25ff195c17 | |
|
f60cc646ff | |
|
a2b754b5f5 | |
|
5e13588aed | |
|
0a56f40bab | |
|
f776c47cb4 | |
|
7f584d4f54 | |
|
d650dda0fa | |
|
f6598e8400 |
|
@ -16,6 +16,7 @@ from tractor import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
MsgStream,
|
MsgStream,
|
||||||
_testing,
|
_testing,
|
||||||
|
trionics,
|
||||||
)
|
)
|
||||||
import trio
|
import trio
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -62,9 +63,8 @@ async def recv_and_spawn_net_killers(
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
async with (
|
async with (
|
||||||
ctx.open_stream() as stream,
|
ctx.open_stream() as stream,
|
||||||
trio.open_nursery(
|
trionics.collapse_eg(),
|
||||||
strict_exception_groups=False,
|
trio.open_nursery() as tn,
|
||||||
) as tn,
|
|
||||||
):
|
):
|
||||||
async for i in stream:
|
async for i in stream:
|
||||||
print(f'child echoing {i}')
|
print(f'child echoing {i}')
|
||||||
|
|
|
@ -23,9 +23,8 @@ async def main():
|
||||||
modules=[__name__]
|
modules=[__name__]
|
||||||
) as portal_map,
|
) as portal_map,
|
||||||
|
|
||||||
trio.open_nursery(
|
tractor.trionics.collapse_eg(),
|
||||||
strict_exception_groups=False,
|
trio.open_nursery() as tn,
|
||||||
) as tn,
|
|
||||||
):
|
):
|
||||||
|
|
||||||
for (name, portal) in portal_map.items():
|
for (name, portal) in portal_map.items():
|
||||||
|
|
|
@ -1,145 +0,0 @@
|
||||||
from contextlib import (
|
|
||||||
contextmanager as cm,
|
|
||||||
# TODO, any diff in async case(s)??
|
|
||||||
# asynccontextmanager as acm,
|
|
||||||
)
|
|
||||||
from functools import partial
|
|
||||||
|
|
||||||
import tractor
|
|
||||||
import trio
|
|
||||||
|
|
||||||
|
|
||||||
log = tractor.log.get_logger(__name__)
|
|
||||||
tractor.log.get_console_log('info')
|
|
||||||
|
|
||||||
@cm
|
|
||||||
def teardown_on_exc(
|
|
||||||
raise_from_handler: bool = False,
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
You could also have a teardown handler which catches any exc and
|
|
||||||
does some required teardown. In this case the problem is
|
|
||||||
compounded UNLESS you ensure the handler's scope is OUTSIDE the
|
|
||||||
`ux.aclose()`.. that is in the caller's enclosing scope.
|
|
||||||
|
|
||||||
'''
|
|
||||||
try:
|
|
||||||
yield
|
|
||||||
except BaseException as _berr:
|
|
||||||
berr = _berr
|
|
||||||
log.exception(
|
|
||||||
f'Handling termination teardown in child due to,\n'
|
|
||||||
f'{berr!r}\n'
|
|
||||||
)
|
|
||||||
if raise_from_handler:
|
|
||||||
# XXX teardown ops XXX
|
|
||||||
# on termination these steps say need to be run to
|
|
||||||
# ensure wider system consistency (like the state of
|
|
||||||
# remote connections/services).
|
|
||||||
#
|
|
||||||
# HOWEVER, any bug in this teardown code is also
|
|
||||||
# masked by the `tx.aclose()`!
|
|
||||||
# this is also true if `_tn.cancel_scope` is
|
|
||||||
# `.cancel_called` by the parent in a graceful
|
|
||||||
# request case..
|
|
||||||
|
|
||||||
# simulate a bug in teardown handler.
|
|
||||||
raise RuntimeError(
|
|
||||||
'woopsie teardown bug!'
|
|
||||||
)
|
|
||||||
|
|
||||||
raise # no teardown bug.
|
|
||||||
|
|
||||||
|
|
||||||
async def finite_stream_to_rent(
|
|
||||||
tx: trio.abc.SendChannel,
|
|
||||||
child_errors_mid_stream: bool,
|
|
||||||
|
|
||||||
task_status: trio.TaskStatus[
|
|
||||||
trio.CancelScope,
|
|
||||||
] = trio.TASK_STATUS_IGNORED,
|
|
||||||
):
|
|
||||||
async with (
|
|
||||||
# XXX without this unmasker the mid-streaming RTE is never
|
|
||||||
# reported since it is masked by the `tx.aclose()`
|
|
||||||
# call which in turn raises `Cancelled`!
|
|
||||||
#
|
|
||||||
# NOTE, this is WITHOUT doing any exception handling
|
|
||||||
# inside the child task!
|
|
||||||
#
|
|
||||||
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
|
||||||
# tractor.trionics.maybe_raise_from_masking_exc(),
|
|
||||||
|
|
||||||
tx as tx, # .aclose() is the guilty masker chkpt!
|
|
||||||
trio.open_nursery() as _tn,
|
|
||||||
):
|
|
||||||
# pass our scope back to parent for supervision\
|
|
||||||
# control.
|
|
||||||
task_status.started(_tn.cancel_scope)
|
|
||||||
|
|
||||||
with teardown_on_exc(
|
|
||||||
raise_from_handler=not child_errors_mid_stream,
|
|
||||||
):
|
|
||||||
for i in range(100):
|
|
||||||
log.info(
|
|
||||||
f'Child tx {i!r}\n'
|
|
||||||
)
|
|
||||||
if (
|
|
||||||
child_errors_mid_stream
|
|
||||||
and
|
|
||||||
i == 66
|
|
||||||
):
|
|
||||||
# oh wait but WOOPS there's a bug
|
|
||||||
# in that teardown code!?
|
|
||||||
raise RuntimeError(
|
|
||||||
'woopsie, a mid-streaming bug!?'
|
|
||||||
)
|
|
||||||
|
|
||||||
await tx.send(i)
|
|
||||||
|
|
||||||
|
|
||||||
async def main(
|
|
||||||
# TODO! toggle this for the 2 cases!
|
|
||||||
# 1. child errors mid-stream while parent is also requesting
|
|
||||||
# (graceful) cancel of that child streamer.
|
|
||||||
#
|
|
||||||
# 2. child contains a teardown handler which contains a
|
|
||||||
# bug and raises.
|
|
||||||
#
|
|
||||||
child_errors_mid_stream: bool,
|
|
||||||
):
|
|
||||||
tx, rx = trio.open_memory_channel(1)
|
|
||||||
|
|
||||||
async with (
|
|
||||||
trio.open_nursery() as tn,
|
|
||||||
rx as rx,
|
|
||||||
):
|
|
||||||
|
|
||||||
_child_cs = await tn.start(
|
|
||||||
partial(
|
|
||||||
finite_stream_to_rent,
|
|
||||||
child_errors_mid_stream=child_errors_mid_stream,
|
|
||||||
tx=tx,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
async for msg in rx:
|
|
||||||
log.info(
|
|
||||||
f'Rent rx {msg!r}\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
# simulate some external cancellation
|
|
||||||
# request **JUST BEFORE** the child errors.
|
|
||||||
if msg == 65:
|
|
||||||
log.cancel(
|
|
||||||
f'Cancelling parent on,\n'
|
|
||||||
f'msg={msg}\n'
|
|
||||||
f'\n'
|
|
||||||
f'Simulates OOB cancel request!\n'
|
|
||||||
)
|
|
||||||
tn.cancel_scope.cancel()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
|
|
||||||
for case in [True, False]:
|
|
||||||
trio.run(main, case)
|
|
|
@ -313,9 +313,8 @@ async def inf_streamer(
|
||||||
# `trio.EndOfChannel` doesn't propagate directly to the above
|
# `trio.EndOfChannel` doesn't propagate directly to the above
|
||||||
# .open_stream() parent, resulting in it also raising instead
|
# .open_stream() parent, resulting in it also raising instead
|
||||||
# of gracefully absorbing as normal.. so how to handle?
|
# of gracefully absorbing as normal.. so how to handle?
|
||||||
trio.open_nursery(
|
tractor.trionics.collapse_eg(),
|
||||||
strict_exception_groups=False,
|
trio.open_nursery() as tn,
|
||||||
) as tn,
|
|
||||||
):
|
):
|
||||||
async def close_stream_on_sentinel():
|
async def close_stream_on_sentinel():
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
|
|
|
@ -532,10 +532,15 @@ def test_cancel_via_SIGINT_other_task(
|
||||||
async def main():
|
async def main():
|
||||||
# should never timeout since SIGINT should cancel the current program
|
# should never timeout since SIGINT should cancel the current program
|
||||||
with trio.fail_after(timeout):
|
with trio.fail_after(timeout):
|
||||||
async with trio.open_nursery(
|
async with (
|
||||||
strict_exception_groups=False,
|
|
||||||
) as n:
|
# XXX ?TODO? why no work!?
|
||||||
await n.start(spawn_and_sleep_forever)
|
# tractor.trionics.collapse_eg(),
|
||||||
|
trio.open_nursery(
|
||||||
|
strict_exception_groups=False,
|
||||||
|
) as tn,
|
||||||
|
):
|
||||||
|
await tn.start(spawn_and_sleep_forever)
|
||||||
if 'mp' in spawn_backend:
|
if 'mp' in spawn_backend:
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
os.kill(pid, signal.SIGINT)
|
os.kill(pid, signal.SIGINT)
|
||||||
|
|
|
@ -117,9 +117,10 @@ async def open_actor_local_nursery(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
):
|
):
|
||||||
global _nursery
|
global _nursery
|
||||||
async with trio.open_nursery(
|
async with (
|
||||||
strict_exception_groups=False,
|
tractor.trionics.collapse_eg(),
|
||||||
) as tn:
|
trio.open_nursery() as tn
|
||||||
|
):
|
||||||
_nursery = tn
|
_nursery = tn
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
await trio.sleep(10)
|
await trio.sleep(10)
|
||||||
|
|
|
@ -11,6 +11,7 @@ import psutil
|
||||||
import pytest
|
import pytest
|
||||||
import subprocess
|
import subprocess
|
||||||
import tractor
|
import tractor
|
||||||
|
from tractor.trionics import collapse_eg
|
||||||
from tractor._testing import tractor_test
|
from tractor._testing import tractor_test
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
@ -193,10 +194,10 @@ async def spawn_and_check_registry(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with tractor.open_nursery() as an:
|
async with tractor.open_nursery() as an:
|
||||||
async with trio.open_nursery(
|
async with (
|
||||||
strict_exception_groups=False,
|
collapse_eg(),
|
||||||
) as trion:
|
trio.open_nursery() as trion,
|
||||||
|
):
|
||||||
portals = {}
|
portals = {}
|
||||||
for i in range(3):
|
for i in range(3):
|
||||||
name = f'a{i}'
|
name = f'a{i}'
|
||||||
|
@ -338,11 +339,12 @@ async def close_chans_before_nursery(
|
||||||
async with portal2.open_stream_from(
|
async with portal2.open_stream_from(
|
||||||
stream_forever
|
stream_forever
|
||||||
) as agen2:
|
) as agen2:
|
||||||
async with trio.open_nursery(
|
async with (
|
||||||
strict_exception_groups=False,
|
collapse_eg(),
|
||||||
) as n:
|
trio.open_nursery() as tn,
|
||||||
n.start_soon(streamer, agen1)
|
):
|
||||||
n.start_soon(cancel, use_signal, .5)
|
tn.start_soon(streamer, agen1)
|
||||||
|
tn.start_soon(cancel, use_signal, .5)
|
||||||
try:
|
try:
|
||||||
await streamer(agen2)
|
await streamer(agen2)
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -234,10 +234,8 @@ async def trio_ctx(
|
||||||
with trio.fail_after(1 + delay):
|
with trio.fail_after(1 + delay):
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery(
|
tractor.trionics.collapse_eg(),
|
||||||
# TODO, for new `trio` / py3.13
|
trio.open_nursery() as tn,
|
||||||
# strict_exception_groups=False,
|
|
||||||
) as tn,
|
|
||||||
tractor.to_asyncio.open_channel_from(
|
tractor.to_asyncio.open_channel_from(
|
||||||
sleep_and_err,
|
sleep_and_err,
|
||||||
) as (first, chan),
|
) as (first, chan),
|
||||||
|
|
|
@ -8,6 +8,7 @@ from contextlib import (
|
||||||
)
|
)
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
from tractor.trionics import collapse_eg
|
||||||
import trio
|
import trio
|
||||||
from trio import TaskStatus
|
from trio import TaskStatus
|
||||||
|
|
||||||
|
@ -64,9 +65,8 @@ def test_stashed_child_nursery(use_start_soon):
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery(
|
collapse_eg(),
|
||||||
strict_exception_groups=False,
|
trio.open_nursery() as pn,
|
||||||
) as pn,
|
|
||||||
):
|
):
|
||||||
cn = await pn.start(mk_child_nursery)
|
cn = await pn.start(mk_child_nursery)
|
||||||
assert cn
|
assert cn
|
||||||
|
@ -197,10 +197,8 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
async with (
|
async with (
|
||||||
# XXX should ensure ONLY the KBI
|
# XXX should ensure ONLY the KBI
|
||||||
# is relayed upward
|
# is relayed upward
|
||||||
trionics.collapse_eg(),
|
collapse_eg(),
|
||||||
trio.open_nursery(
|
trio.open_nursery(), # as tn,
|
||||||
# strict_exception_groups=False,
|
|
||||||
), # as tn,
|
|
||||||
|
|
||||||
trionics.gather_contexts([
|
trionics.gather_contexts([
|
||||||
open_memchan(),
|
open_memchan(),
|
||||||
|
|
|
@ -1760,9 +1760,7 @@ async def async_main(
|
||||||
f' {pformat(ipc_server._peers)}'
|
f' {pformat(ipc_server._peers)}'
|
||||||
)
|
)
|
||||||
log.runtime(teardown_report)
|
log.runtime(teardown_report)
|
||||||
await ipc_server.wait_for_no_more_peers(
|
await ipc_server.wait_for_no_more_peers()
|
||||||
shield=True,
|
|
||||||
)
|
|
||||||
|
|
||||||
teardown_report += (
|
teardown_report += (
|
||||||
'-]> all peer channels are complete.\n'
|
'-]> all peer channels are complete.\n'
|
||||||
|
|
|
@ -814,10 +814,14 @@ class Server(Struct):
|
||||||
|
|
||||||
async def wait_for_no_more_peers(
|
async def wait_for_no_more_peers(
|
||||||
self,
|
self,
|
||||||
shield: bool = False,
|
# XXX, should this even be allowed?
|
||||||
|
# -> i've seen it cause hangs on teardown
|
||||||
|
# in `test_resource_cache.py`
|
||||||
|
# _shield: bool = False,
|
||||||
) -> None:
|
) -> None:
|
||||||
with trio.CancelScope(shield=shield):
|
await self._no_more_peers.wait()
|
||||||
await self._no_more_peers.wait()
|
# with trio.CancelScope(shield=_shield):
|
||||||
|
# await self._no_more_peers.wait()
|
||||||
|
|
||||||
async def wait_for_peer(
|
async def wait_for_peer(
|
||||||
self,
|
self,
|
||||||
|
|
Loading…
Reference in New Issue