Compare commits
21 Commits
b096867d40
...
0cbf02bf2e
Author | SHA1 | Date |
---|---|---|
|
0cbf02bf2e | |
|
4a0b78d447 | |
|
718d0887c9 | |
|
40a40f613a | |
|
c692b30e7f | |
|
c91bf6dcc2 | |
|
5afa9407c9 | |
|
9b1dd7b279 | |
|
910257bb46 | |
|
ca37d8ed91 | |
|
072cdd0c66 | |
|
30302b051d | |
|
bb04e55d5f | |
|
226d06dbfa | |
|
1b502736d6 | |
|
51992ef546 | |
|
6da470918a | |
|
93d97c3ff9 | |
|
abaea4de8e | |
|
4b3ba35cd6 | |
|
70a508e9d7 |
|
@ -16,7 +16,6 @@ from tractor import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
MsgStream,
|
MsgStream,
|
||||||
_testing,
|
_testing,
|
||||||
trionics,
|
|
||||||
)
|
)
|
||||||
import trio
|
import trio
|
||||||
import pytest
|
import pytest
|
||||||
|
@ -63,8 +62,9 @@ async def recv_and_spawn_net_killers(
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
async with (
|
async with (
|
||||||
ctx.open_stream() as stream,
|
ctx.open_stream() as stream,
|
||||||
trionics.collapse_eg(),
|
trio.open_nursery(
|
||||||
trio.open_nursery() as tn,
|
strict_exception_groups=False,
|
||||||
|
) as tn,
|
||||||
):
|
):
|
||||||
async for i in stream:
|
async for i in stream:
|
||||||
print(f'child echoing {i}')
|
print(f'child echoing {i}')
|
||||||
|
|
|
@ -23,8 +23,9 @@ async def main():
|
||||||
modules=[__name__]
|
modules=[__name__]
|
||||||
) as portal_map,
|
) as portal_map,
|
||||||
|
|
||||||
tractor.trionics.collapse_eg(),
|
trio.open_nursery(
|
||||||
trio.open_nursery() as tn,
|
strict_exception_groups=False,
|
||||||
|
) as tn,
|
||||||
):
|
):
|
||||||
|
|
||||||
for (name, portal) in portal_map.items():
|
for (name, portal) in portal_map.items():
|
||||||
|
|
|
@ -0,0 +1,145 @@
|
||||||
|
from contextlib import (
|
||||||
|
contextmanager as cm,
|
||||||
|
# TODO, any diff in async case(s)??
|
||||||
|
# asynccontextmanager as acm,
|
||||||
|
)
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
|
import tractor
|
||||||
|
import trio
|
||||||
|
|
||||||
|
|
||||||
|
log = tractor.log.get_logger(__name__)
|
||||||
|
tractor.log.get_console_log('info')
|
||||||
|
|
||||||
|
@cm
|
||||||
|
def teardown_on_exc(
|
||||||
|
raise_from_handler: bool = False,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
You could also have a teardown handler which catches any exc and
|
||||||
|
does some required teardown. In this case the problem is
|
||||||
|
compounded UNLESS you ensure the handler's scope is OUTSIDE the
|
||||||
|
`ux.aclose()`.. that is in the caller's enclosing scope.
|
||||||
|
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
except BaseException as _berr:
|
||||||
|
berr = _berr
|
||||||
|
log.exception(
|
||||||
|
f'Handling termination teardown in child due to,\n'
|
||||||
|
f'{berr!r}\n'
|
||||||
|
)
|
||||||
|
if raise_from_handler:
|
||||||
|
# XXX teardown ops XXX
|
||||||
|
# on termination these steps say need to be run to
|
||||||
|
# ensure wider system consistency (like the state of
|
||||||
|
# remote connections/services).
|
||||||
|
#
|
||||||
|
# HOWEVER, any bug in this teardown code is also
|
||||||
|
# masked by the `tx.aclose()`!
|
||||||
|
# this is also true if `_tn.cancel_scope` is
|
||||||
|
# `.cancel_called` by the parent in a graceful
|
||||||
|
# request case..
|
||||||
|
|
||||||
|
# simulate a bug in teardown handler.
|
||||||
|
raise RuntimeError(
|
||||||
|
'woopsie teardown bug!'
|
||||||
|
)
|
||||||
|
|
||||||
|
raise # no teardown bug.
|
||||||
|
|
||||||
|
|
||||||
|
async def finite_stream_to_rent(
|
||||||
|
tx: trio.abc.SendChannel,
|
||||||
|
child_errors_mid_stream: bool,
|
||||||
|
|
||||||
|
task_status: trio.TaskStatus[
|
||||||
|
trio.CancelScope,
|
||||||
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
|
):
|
||||||
|
async with (
|
||||||
|
# XXX without this unmasker the mid-streaming RTE is never
|
||||||
|
# reported since it is masked by the `tx.aclose()`
|
||||||
|
# call which in turn raises `Cancelled`!
|
||||||
|
#
|
||||||
|
# NOTE, this is WITHOUT doing any exception handling
|
||||||
|
# inside the child task!
|
||||||
|
#
|
||||||
|
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
||||||
|
# tractor.trionics.maybe_raise_from_masking_exc(),
|
||||||
|
|
||||||
|
tx as tx, # .aclose() is the guilty masker chkpt!
|
||||||
|
trio.open_nursery() as _tn,
|
||||||
|
):
|
||||||
|
# pass our scope back to parent for supervision\
|
||||||
|
# control.
|
||||||
|
task_status.started(_tn.cancel_scope)
|
||||||
|
|
||||||
|
with teardown_on_exc(
|
||||||
|
raise_from_handler=not child_errors_mid_stream,
|
||||||
|
):
|
||||||
|
for i in range(100):
|
||||||
|
log.info(
|
||||||
|
f'Child tx {i!r}\n'
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
child_errors_mid_stream
|
||||||
|
and
|
||||||
|
i == 66
|
||||||
|
):
|
||||||
|
# oh wait but WOOPS there's a bug
|
||||||
|
# in that teardown code!?
|
||||||
|
raise RuntimeError(
|
||||||
|
'woopsie, a mid-streaming bug!?'
|
||||||
|
)
|
||||||
|
|
||||||
|
await tx.send(i)
|
||||||
|
|
||||||
|
|
||||||
|
async def main(
|
||||||
|
# TODO! toggle this for the 2 cases!
|
||||||
|
# 1. child errors mid-stream while parent is also requesting
|
||||||
|
# (graceful) cancel of that child streamer.
|
||||||
|
#
|
||||||
|
# 2. child contains a teardown handler which contains a
|
||||||
|
# bug and raises.
|
||||||
|
#
|
||||||
|
child_errors_mid_stream: bool,
|
||||||
|
):
|
||||||
|
tx, rx = trio.open_memory_channel(1)
|
||||||
|
|
||||||
|
async with (
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
rx as rx,
|
||||||
|
):
|
||||||
|
|
||||||
|
_child_cs = await tn.start(
|
||||||
|
partial(
|
||||||
|
finite_stream_to_rent,
|
||||||
|
child_errors_mid_stream=child_errors_mid_stream,
|
||||||
|
tx=tx,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
async for msg in rx:
|
||||||
|
log.info(
|
||||||
|
f'Rent rx {msg!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# simulate some external cancellation
|
||||||
|
# request **JUST BEFORE** the child errors.
|
||||||
|
if msg == 65:
|
||||||
|
log.cancel(
|
||||||
|
f'Cancelling parent on,\n'
|
||||||
|
f'msg={msg}\n'
|
||||||
|
f'\n'
|
||||||
|
f'Simulates OOB cancel request!\n'
|
||||||
|
)
|
||||||
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
for case in [True, False]:
|
||||||
|
trio.run(main, case)
|
|
@ -313,8 +313,9 @@ async def inf_streamer(
|
||||||
# `trio.EndOfChannel` doesn't propagate directly to the above
|
# `trio.EndOfChannel` doesn't propagate directly to the above
|
||||||
# .open_stream() parent, resulting in it also raising instead
|
# .open_stream() parent, resulting in it also raising instead
|
||||||
# of gracefully absorbing as normal.. so how to handle?
|
# of gracefully absorbing as normal.. so how to handle?
|
||||||
tractor.trionics.collapse_eg(),
|
trio.open_nursery(
|
||||||
trio.open_nursery() as tn,
|
strict_exception_groups=False,
|
||||||
|
) as tn,
|
||||||
):
|
):
|
||||||
async def close_stream_on_sentinel():
|
async def close_stream_on_sentinel():
|
||||||
async for msg in stream:
|
async for msg in stream:
|
||||||
|
|
|
@ -532,15 +532,10 @@ def test_cancel_via_SIGINT_other_task(
|
||||||
async def main():
|
async def main():
|
||||||
# should never timeout since SIGINT should cancel the current program
|
# should never timeout since SIGINT should cancel the current program
|
||||||
with trio.fail_after(timeout):
|
with trio.fail_after(timeout):
|
||||||
async with (
|
async with trio.open_nursery(
|
||||||
|
strict_exception_groups=False,
|
||||||
# XXX ?TODO? why no work!?
|
) as n:
|
||||||
# tractor.trionics.collapse_eg(),
|
await n.start(spawn_and_sleep_forever)
|
||||||
trio.open_nursery(
|
|
||||||
strict_exception_groups=False,
|
|
||||||
) as tn,
|
|
||||||
):
|
|
||||||
await tn.start(spawn_and_sleep_forever)
|
|
||||||
if 'mp' in spawn_backend:
|
if 'mp' in spawn_backend:
|
||||||
time.sleep(0.1)
|
time.sleep(0.1)
|
||||||
os.kill(pid, signal.SIGINT)
|
os.kill(pid, signal.SIGINT)
|
||||||
|
|
|
@ -117,10 +117,9 @@ async def open_actor_local_nursery(
|
||||||
ctx: tractor.Context,
|
ctx: tractor.Context,
|
||||||
):
|
):
|
||||||
global _nursery
|
global _nursery
|
||||||
async with (
|
async with trio.open_nursery(
|
||||||
tractor.trionics.collapse_eg(),
|
strict_exception_groups=False,
|
||||||
trio.open_nursery() as tn
|
) as tn:
|
||||||
):
|
|
||||||
_nursery = tn
|
_nursery = tn
|
||||||
await ctx.started()
|
await ctx.started()
|
||||||
await trio.sleep(10)
|
await trio.sleep(10)
|
||||||
|
|
|
@ -11,7 +11,6 @@ import psutil
|
||||||
import pytest
|
import pytest
|
||||||
import subprocess
|
import subprocess
|
||||||
import tractor
|
import tractor
|
||||||
from tractor.trionics import collapse_eg
|
|
||||||
from tractor._testing import tractor_test
|
from tractor._testing import tractor_test
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
@ -194,10 +193,10 @@ async def spawn_and_check_registry(
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with tractor.open_nursery() as an:
|
async with tractor.open_nursery() as an:
|
||||||
async with (
|
async with trio.open_nursery(
|
||||||
collapse_eg(),
|
strict_exception_groups=False,
|
||||||
trio.open_nursery() as trion,
|
) as trion:
|
||||||
):
|
|
||||||
portals = {}
|
portals = {}
|
||||||
for i in range(3):
|
for i in range(3):
|
||||||
name = f'a{i}'
|
name = f'a{i}'
|
||||||
|
@ -339,12 +338,11 @@ async def close_chans_before_nursery(
|
||||||
async with portal2.open_stream_from(
|
async with portal2.open_stream_from(
|
||||||
stream_forever
|
stream_forever
|
||||||
) as agen2:
|
) as agen2:
|
||||||
async with (
|
async with trio.open_nursery(
|
||||||
collapse_eg(),
|
strict_exception_groups=False,
|
||||||
trio.open_nursery() as tn,
|
) as n:
|
||||||
):
|
n.start_soon(streamer, agen1)
|
||||||
tn.start_soon(streamer, agen1)
|
n.start_soon(cancel, use_signal, .5)
|
||||||
tn.start_soon(cancel, use_signal, .5)
|
|
||||||
try:
|
try:
|
||||||
await streamer(agen2)
|
await streamer(agen2)
|
||||||
finally:
|
finally:
|
||||||
|
|
|
@ -234,8 +234,10 @@ async def trio_ctx(
|
||||||
with trio.fail_after(1 + delay):
|
with trio.fail_after(1 + delay):
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
tractor.trionics.collapse_eg(),
|
trio.open_nursery(
|
||||||
trio.open_nursery() as tn,
|
# TODO, for new `trio` / py3.13
|
||||||
|
# strict_exception_groups=False,
|
||||||
|
) as tn,
|
||||||
tractor.to_asyncio.open_channel_from(
|
tractor.to_asyncio.open_channel_from(
|
||||||
sleep_and_err,
|
sleep_and_err,
|
||||||
) as (first, chan),
|
) as (first, chan),
|
||||||
|
|
|
@ -8,7 +8,6 @@ from contextlib import (
|
||||||
)
|
)
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
from tractor.trionics import collapse_eg
|
|
||||||
import trio
|
import trio
|
||||||
from trio import TaskStatus
|
from trio import TaskStatus
|
||||||
|
|
||||||
|
@ -65,8 +64,9 @@ def test_stashed_child_nursery(use_start_soon):
|
||||||
async def main():
|
async def main():
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
collapse_eg(),
|
trio.open_nursery(
|
||||||
trio.open_nursery() as pn,
|
strict_exception_groups=False,
|
||||||
|
) as pn,
|
||||||
):
|
):
|
||||||
cn = await pn.start(mk_child_nursery)
|
cn = await pn.start(mk_child_nursery)
|
||||||
assert cn
|
assert cn
|
||||||
|
@ -197,8 +197,10 @@ def test_gatherctxs_with_memchan_breaks_multicancelled(
|
||||||
async with (
|
async with (
|
||||||
# XXX should ensure ONLY the KBI
|
# XXX should ensure ONLY the KBI
|
||||||
# is relayed upward
|
# is relayed upward
|
||||||
collapse_eg(),
|
trionics.collapse_eg(),
|
||||||
trio.open_nursery(), # as tn,
|
trio.open_nursery(
|
||||||
|
# strict_exception_groups=False,
|
||||||
|
), # as tn,
|
||||||
|
|
||||||
trionics.gather_contexts([
|
trionics.gather_contexts([
|
||||||
open_memchan(),
|
open_memchan(),
|
||||||
|
|
|
@ -1760,7 +1760,9 @@ async def async_main(
|
||||||
f' {pformat(ipc_server._peers)}'
|
f' {pformat(ipc_server._peers)}'
|
||||||
)
|
)
|
||||||
log.runtime(teardown_report)
|
log.runtime(teardown_report)
|
||||||
await ipc_server.wait_for_no_more_peers()
|
await ipc_server.wait_for_no_more_peers(
|
||||||
|
shield=True,
|
||||||
|
)
|
||||||
|
|
||||||
teardown_report += (
|
teardown_report += (
|
||||||
'-]> all peer channels are complete.\n'
|
'-]> all peer channels are complete.\n'
|
||||||
|
|
|
@ -814,14 +814,10 @@ class Server(Struct):
|
||||||
|
|
||||||
async def wait_for_no_more_peers(
|
async def wait_for_no_more_peers(
|
||||||
self,
|
self,
|
||||||
# XXX, should this even be allowed?
|
shield: bool = False,
|
||||||
# -> i've seen it cause hangs on teardown
|
|
||||||
# in `test_resource_cache.py`
|
|
||||||
# _shield: bool = False,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
await self._no_more_peers.wait()
|
with trio.CancelScope(shield=shield):
|
||||||
# with trio.CancelScope(shield=_shield):
|
await self._no_more_peers.wait()
|
||||||
# await self._no_more_peers.wait()
|
|
||||||
|
|
||||||
async def wait_for_peer(
|
async def wait_for_peer(
|
||||||
self,
|
self,
|
||||||
|
|
Loading…
Reference in New Issue