Compare commits
No commits in common. "249e99a2d9c0722ef38d851405bb5d85c6d4a727" and "23240c31e39c7825c9d19631aed3159eb84a76d0" have entirely different histories.
249e99a2d9
...
23240c31e3
|
@ -21,12 +21,12 @@ async def breakpoint_forever():
|
||||||
async def spawn_until(depth=0):
|
async def spawn_until(depth=0):
|
||||||
""""A nested nursery that triggers another ``NameError``.
|
""""A nested nursery that triggers another ``NameError``.
|
||||||
"""
|
"""
|
||||||
async with tractor.open_nursery() as an:
|
async with tractor.open_nursery() as n:
|
||||||
if depth < 1:
|
if depth < 1:
|
||||||
|
|
||||||
await an.run_in_actor(breakpoint_forever)
|
await n.run_in_actor(breakpoint_forever)
|
||||||
|
|
||||||
p = await an.run_in_actor(
|
p = await n.run_in_actor(
|
||||||
name_error,
|
name_error,
|
||||||
name='name_error'
|
name='name_error'
|
||||||
)
|
)
|
||||||
|
@ -38,7 +38,7 @@ async def spawn_until(depth=0):
|
||||||
# recusrive call to spawn another process branching layer of
|
# recusrive call to spawn another process branching layer of
|
||||||
# the tree
|
# the tree
|
||||||
depth -= 1
|
depth -= 1
|
||||||
await an.run_in_actor(
|
await n.run_in_actor(
|
||||||
spawn_until,
|
spawn_until,
|
||||||
depth=depth,
|
depth=depth,
|
||||||
name=f'spawn_until_{depth}',
|
name=f'spawn_until_{depth}',
|
||||||
|
|
|
@ -709,41 +709,10 @@ def test_multi_nested_subactors_error_through_nurseries(
|
||||||
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
child = spawn('multi_nested_subactors_error_up_through_nurseries')
|
||||||
|
|
||||||
# timed_out_early: bool = False
|
# timed_out_early: bool = False
|
||||||
at_least_one: list[str] = [
|
|
||||||
"bdb.BdbQuit",
|
|
||||||
|
|
||||||
# leaf subs, which actually raise in "user code"
|
for send_char in itertools.cycle(['c', 'q']):
|
||||||
"src_uid=('breakpoint_forever'",
|
|
||||||
"src_uid=('name_error'",
|
|
||||||
|
|
||||||
# 2nd layer subs
|
|
||||||
"src_uid=('spawn_until_1'",
|
|
||||||
"src_uid=('spawn_until_2'",
|
|
||||||
"src_uid=('spawn_until_3'",
|
|
||||||
"relay_uid=('spawn_until_0'",
|
|
||||||
|
|
||||||
# 1st layer subs
|
|
||||||
"src_uid=('spawner0'",
|
|
||||||
"src_uid=('spawner1'",
|
|
||||||
]
|
|
||||||
|
|
||||||
for i, send_char in enumerate(
|
|
||||||
itertools.cycle(['c', 'q'])
|
|
||||||
):
|
|
||||||
try:
|
try:
|
||||||
child.expect(PROMPT)
|
child.expect(PROMPT)
|
||||||
|
|
||||||
for patt in at_least_one.copy():
|
|
||||||
if in_prompt_msg(
|
|
||||||
child,
|
|
||||||
[patt],
|
|
||||||
):
|
|
||||||
print(
|
|
||||||
f'Found patt in prompt {i}\n'
|
|
||||||
f'patt: {patt!r}\n'
|
|
||||||
)
|
|
||||||
at_least_one.remove(patt)
|
|
||||||
|
|
||||||
child.sendline(send_char)
|
child.sendline(send_char)
|
||||||
time.sleep(0.01)
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
@ -752,15 +721,27 @@ def test_multi_nested_subactors_error_through_nurseries(
|
||||||
|
|
||||||
assert_before(
|
assert_before(
|
||||||
child,
|
child,
|
||||||
[
|
[ # boxed source errors
|
||||||
# boxed source errors should show in final
|
|
||||||
# post-prompt tb to console.
|
|
||||||
"tractor._exceptions.RemoteActorError:",
|
|
||||||
"NameError: name 'doggypants' is not defined",
|
"NameError: name 'doggypants' is not defined",
|
||||||
|
"tractor._exceptions.RemoteActorError:",
|
||||||
|
"('name_error'",
|
||||||
|
"bdb.BdbQuit",
|
||||||
|
|
||||||
# TODO? once we get more pedantic with `relay_uid` should
|
# first level subtrees
|
||||||
# prolly include all actor-IDs we expect to see in final
|
# "tractor._exceptions.RemoteActorError: ('spawner0'",
|
||||||
# tb?
|
"src_uid=('spawner0'",
|
||||||
|
|
||||||
|
# "tractor._exceptions.RemoteActorError: ('spawner1'",
|
||||||
|
|
||||||
|
# propagation of errors up through nested subtrees
|
||||||
|
# "tractor._exceptions.RemoteActorError: ('spawn_until_0'",
|
||||||
|
# "tractor._exceptions.RemoteActorError: ('spawn_until_1'",
|
||||||
|
# "tractor._exceptions.RemoteActorError: ('spawn_until_2'",
|
||||||
|
# ^-NOTE-^ old RAE repr, new one is below with a field
|
||||||
|
# showing the src actor's uid.
|
||||||
|
"src_uid=('spawn_until_0'",
|
||||||
|
"relay_uid=('spawn_until_1'",
|
||||||
|
"src_uid=('spawn_until_2'",
|
||||||
]
|
]
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
|
@ -1,98 +0,0 @@
|
||||||
'''
|
|
||||||
Basic `ActorNursery` operations and closure semantics,
|
|
||||||
- basic remote error collection,
|
|
||||||
- basic multi-subactor cancellation.
|
|
||||||
|
|
||||||
'''
|
|
||||||
# import os
|
|
||||||
# import signal
|
|
||||||
# import platform
|
|
||||||
# import time
|
|
||||||
# from itertools import repeat
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
import trio
|
|
||||||
import tractor
|
|
||||||
from tractor._exceptions import ActorCancelled
|
|
||||||
# from tractor._testing import (
|
|
||||||
# tractor_test,
|
|
||||||
# )
|
|
||||||
# from .conftest import no_windows
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize(
|
|
||||||
'num_subs',
|
|
||||||
[
|
|
||||||
1,
|
|
||||||
3,
|
|
||||||
]
|
|
||||||
)
|
|
||||||
def test_one_cancels_all(
|
|
||||||
start_method: str,
|
|
||||||
loglevel: str,
|
|
||||||
debug_mode: bool,
|
|
||||||
num_subs: int,
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
Verify that ifa a single error bubbles to the an-scope the
|
|
||||||
nursery will be cancelled (just like in `trio`); this is a
|
|
||||||
one-cancels-all style strategy and are only supervision policy
|
|
||||||
at the moment.
|
|
||||||
|
|
||||||
'''
|
|
||||||
async def main():
|
|
||||||
try:
|
|
||||||
rte = RuntimeError('Uh oh something bad in parent')
|
|
||||||
async with tractor.open_nursery(
|
|
||||||
start_method=start_method,
|
|
||||||
loglevel=loglevel,
|
|
||||||
debug_mode=debug_mode,
|
|
||||||
) as an:
|
|
||||||
|
|
||||||
# spawn the same number of deamon actors which should be cancelled
|
|
||||||
dactor_portals = []
|
|
||||||
for i in range(num_subs):
|
|
||||||
name: str= f'sub_{i}'
|
|
||||||
ptl: tractor.Portal = await an.start_actor(
|
|
||||||
name=name,
|
|
||||||
enable_modules=[__name__],
|
|
||||||
)
|
|
||||||
dactor_portals.append(ptl)
|
|
||||||
|
|
||||||
# wait for booted
|
|
||||||
async with tractor.wait_for_actor(name):
|
|
||||||
print(f'{name!r} is up.')
|
|
||||||
|
|
||||||
# simulate uncaught exc
|
|
||||||
raise rte
|
|
||||||
|
|
||||||
# should error here with a ``RemoteActorError`` or ``MultiError``
|
|
||||||
|
|
||||||
except BaseExceptionGroup as _beg:
|
|
||||||
beg = _beg
|
|
||||||
|
|
||||||
# ?TODO? why can't we do `is` on beg?
|
|
||||||
assert (
|
|
||||||
beg.exceptions
|
|
||||||
==
|
|
||||||
an.maybe_error.exceptions
|
|
||||||
)
|
|
||||||
|
|
||||||
assert len(beg.exceptions) == (
|
|
||||||
num_subs
|
|
||||||
+
|
|
||||||
1 # rte from root
|
|
||||||
)
|
|
||||||
|
|
||||||
# all subactors should have been implicitly
|
|
||||||
# `Portal.cancel_actor()`ed.
|
|
||||||
excs = list(beg.exceptions)
|
|
||||||
excs.remove(rte)
|
|
||||||
for exc in excs:
|
|
||||||
assert isinstance(exc, ActorCancelled)
|
|
||||||
|
|
||||||
assert an._scope_error is rte
|
|
||||||
assert not an._children
|
|
||||||
assert an.cancelled is True
|
|
||||||
|
|
||||||
trio.run(main)
|
|
|
@ -11,9 +11,6 @@ from itertools import repeat
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
import tractor
|
import tractor
|
||||||
from tractor._exceptions import (
|
|
||||||
ActorCancelled,
|
|
||||||
)
|
|
||||||
from tractor._testing import (
|
from tractor._testing import (
|
||||||
tractor_test,
|
tractor_test,
|
||||||
)
|
)
|
||||||
|
@ -127,10 +124,7 @@ def test_multierror(
|
||||||
) as nursery:
|
) as nursery:
|
||||||
|
|
||||||
await nursery.run_in_actor(assert_err, name='errorer1')
|
await nursery.run_in_actor(assert_err, name='errorer1')
|
||||||
portal2 = await nursery.run_in_actor(
|
portal2 = await nursery.run_in_actor(assert_err, name='errorer2')
|
||||||
assert_err,
|
|
||||||
name='errorer2',
|
|
||||||
)
|
|
||||||
|
|
||||||
# get result(s) from main task
|
# get result(s) from main task
|
||||||
try:
|
try:
|
||||||
|
@ -143,15 +137,7 @@ def test_multierror(
|
||||||
# here we should get a ``BaseExceptionGroup`` containing exceptions
|
# here we should get a ``BaseExceptionGroup`` containing exceptions
|
||||||
# from both subactors
|
# from both subactors
|
||||||
|
|
||||||
with pytest.raises(
|
with pytest.raises(BaseExceptionGroup):
|
||||||
expected_exception=(
|
|
||||||
tractor.RemoteActorError,
|
|
||||||
|
|
||||||
# ?TODO, should it be this??
|
|
||||||
# like `trio`'s strict egs?
|
|
||||||
BaseExceptionGroup,
|
|
||||||
),
|
|
||||||
):
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@ -247,9 +233,8 @@ async def stream_forever():
|
||||||
|
|
||||||
|
|
||||||
@tractor_test
|
@tractor_test
|
||||||
async def test_cancel_infinite_streamer(
|
async def test_cancel_infinite_streamer(start_method):
|
||||||
start_method: str,
|
|
||||||
):
|
|
||||||
# stream for at most 1 seconds
|
# stream for at most 1 seconds
|
||||||
with trio.move_on_after(1) as cancel_scope:
|
with trio.move_on_after(1) as cancel_scope:
|
||||||
async with tractor.open_nursery() as n:
|
async with tractor.open_nursery() as n:
|
||||||
|
@ -303,7 +288,6 @@ async def test_some_cancels_all(
|
||||||
num_actors_and_errs: tuple,
|
num_actors_and_errs: tuple,
|
||||||
start_method: str,
|
start_method: str,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
debug_mode: bool,
|
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
Verify a subset of failed subactors causes all others in
|
Verify a subset of failed subactors causes all others in
|
||||||
|
@ -319,11 +303,6 @@ async def test_some_cancels_all(
|
||||||
ria_func,
|
ria_func,
|
||||||
da_func,
|
da_func,
|
||||||
) = num_actors_and_errs
|
) = num_actors_and_errs
|
||||||
with trio.fail_after(
|
|
||||||
3
|
|
||||||
if not debug_mode
|
|
||||||
else 999
|
|
||||||
):
|
|
||||||
try:
|
try:
|
||||||
async with tractor.open_nursery() as an:
|
async with tractor.open_nursery() as an:
|
||||||
|
|
||||||
|
@ -372,21 +351,13 @@ async def test_some_cancels_all(
|
||||||
|
|
||||||
except first_err as _err:
|
except first_err as _err:
|
||||||
err = _err
|
err = _err
|
||||||
|
|
||||||
if isinstance(err, BaseExceptionGroup):
|
if isinstance(err, BaseExceptionGroup):
|
||||||
|
|
||||||
assert len(err.exceptions) == num_actors
|
assert len(err.exceptions) == num_actors
|
||||||
for exc in err.exceptions:
|
for exc in err.exceptions:
|
||||||
|
|
||||||
# TODO, figure out why these aren't being set?
|
|
||||||
if isinstance(exc, ActorCancelled):
|
|
||||||
breakpoint()
|
|
||||||
|
|
||||||
if isinstance(exc, tractor.RemoteActorError):
|
if isinstance(exc, tractor.RemoteActorError):
|
||||||
assert exc.boxed_type == err_type
|
assert exc.boxed_type == err_type
|
||||||
else:
|
else:
|
||||||
assert isinstance(exc, trio.Cancelled)
|
assert isinstance(exc, trio.Cancelled)
|
||||||
|
|
||||||
elif isinstance(err, tractor.RemoteActorError):
|
elif isinstance(err, tractor.RemoteActorError):
|
||||||
assert err.boxed_type == err_type
|
assert err.boxed_type == err_type
|
||||||
|
|
||||||
|
|
|
@ -27,7 +27,7 @@ from typing import (
|
||||||
)
|
)
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
|
|
||||||
from .log import get_logger
|
from tractor.log import get_logger
|
||||||
from .trionics import (
|
from .trionics import (
|
||||||
gather_contexts,
|
gather_contexts,
|
||||||
collapse_eg,
|
collapse_eg,
|
||||||
|
@ -259,7 +259,6 @@ async def find_actor(
|
||||||
collapse_eg(),
|
collapse_eg(),
|
||||||
gather_contexts(
|
gather_contexts(
|
||||||
mngrs=maybe_portals,
|
mngrs=maybe_portals,
|
||||||
# tn=tn, # ?TODO, helps to pass rent tn here?
|
|
||||||
) as portals,
|
) as portals,
|
||||||
):
|
):
|
||||||
# log.runtime(
|
# log.runtime(
|
||||||
|
|
|
@ -46,7 +46,6 @@ from msgspec import (
|
||||||
from tractor._state import current_actor
|
from tractor._state import current_actor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
Aid,
|
|
||||||
Error,
|
Error,
|
||||||
PayloadMsg,
|
PayloadMsg,
|
||||||
MsgType,
|
MsgType,
|
||||||
|
@ -480,9 +479,8 @@ class RemoteActorError(Exception):
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def relay_uid(self) -> tuple[str, str]|None:
|
def relay_uid(self) -> tuple[str, str]|None:
|
||||||
if msg := self._ipc_msg:
|
|
||||||
return tuple(
|
return tuple(
|
||||||
msg.relay_path[-1]
|
self._ipc_msg.relay_path[-1]
|
||||||
)
|
)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
|
@ -523,8 +521,7 @@ class RemoteActorError(Exception):
|
||||||
for key in fields:
|
for key in fields:
|
||||||
if (
|
if (
|
||||||
key == 'relay_uid'
|
key == 'relay_uid'
|
||||||
and
|
and not self.is_inception()
|
||||||
not self.is_inception()
|
|
||||||
):
|
):
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
@ -537,13 +534,6 @@ class RemoteActorError(Exception):
|
||||||
None,
|
None,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
if (
|
|
||||||
key == 'canceller'
|
|
||||||
and
|
|
||||||
isinstance(val, Aid)
|
|
||||||
):
|
|
||||||
val: str = val.reprol(sin_uuid=False)
|
|
||||||
|
|
||||||
# TODO: for `.relay_path` on multiline?
|
# TODO: for `.relay_path` on multiline?
|
||||||
# if not isinstance(val, str):
|
# if not isinstance(val, str):
|
||||||
# val_str = pformat(val)
|
# val_str = pformat(val)
|
||||||
|
@ -633,22 +623,12 @@ class RemoteActorError(Exception):
|
||||||
# IFF there is an embedded traceback-str we always
|
# IFF there is an embedded traceback-str we always
|
||||||
# draw the ascii-box around it.
|
# draw the ascii-box around it.
|
||||||
body: str = ''
|
body: str = ''
|
||||||
|
if tb_str := self.tb_str:
|
||||||
fields: str = self._mk_fields_str(
|
fields: str = self._mk_fields_str(
|
||||||
_body_fields
|
_body_fields
|
||||||
+
|
+
|
||||||
self.extra_body_fields,
|
self.extra_body_fields,
|
||||||
)
|
)
|
||||||
|
|
||||||
tb_str: str = (
|
|
||||||
self.tb_str
|
|
||||||
#
|
|
||||||
# ^TODO? what to use instead? if anything?
|
|
||||||
# -[ ] ensure the `.message` doesn't show up 2x in output ya?
|
|
||||||
# -[ ] ._message isn't really right?
|
|
||||||
# or
|
|
||||||
# self._message
|
|
||||||
)
|
|
||||||
if tb_str:
|
|
||||||
from tractor.devx import (
|
from tractor.devx import (
|
||||||
pformat_boxed_tb,
|
pformat_boxed_tb,
|
||||||
)
|
)
|
||||||
|
@ -660,7 +640,7 @@ class RemoteActorError(Exception):
|
||||||
# just after <Type(
|
# just after <Type(
|
||||||
# |___ ..
|
# |___ ..
|
||||||
tb_body_indent=1,
|
tb_body_indent=1,
|
||||||
boxer_header=self.relay_uid or '-',
|
boxer_header=self.relay_uid,
|
||||||
)
|
)
|
||||||
|
|
||||||
# !TODO, it'd be nice to import these top level without
|
# !TODO, it'd be nice to import these top level without
|
||||||
|
@ -733,10 +713,6 @@ class RemoteActorError(Exception):
|
||||||
|
|
||||||
class ContextCancelled(RemoteActorError):
|
class ContextCancelled(RemoteActorError):
|
||||||
'''
|
'''
|
||||||
IPC context cancellation signal/msg.
|
|
||||||
|
|
||||||
Often reffed with the short-hand: "ctxc".
|
|
||||||
|
|
||||||
Inter-actor task context was cancelled by either a call to
|
Inter-actor task context was cancelled by either a call to
|
||||||
``Portal.cancel_actor()`` or ``Context.cancel()``.
|
``Portal.cancel_actor()`` or ``Context.cancel()``.
|
||||||
|
|
||||||
|
@ -761,8 +737,8 @@ class ContextCancelled(RemoteActorError):
|
||||||
|
|
||||||
- (simulating) an IPC transport network outage
|
- (simulating) an IPC transport network outage
|
||||||
- a (malicious) pkt sent specifically to cancel an actor's
|
- a (malicious) pkt sent specifically to cancel an actor's
|
||||||
runtime non-gracefully without ensuring ongoing RPC tasks
|
runtime non-gracefully without ensuring ongoing RPC tasks are
|
||||||
are incrementally cancelled as is done with:
|
incrementally cancelled as is done with:
|
||||||
`Actor`
|
`Actor`
|
||||||
|_`.cancel()`
|
|_`.cancel()`
|
||||||
|_`.cancel_soon()`
|
|_`.cancel_soon()`
|
||||||
|
@ -783,59 +759,6 @@ class ContextCancelled(RemoteActorError):
|
||||||
# src_actor_uid = canceller
|
# src_actor_uid = canceller
|
||||||
|
|
||||||
|
|
||||||
class ActorCancelled(ContextCancelled):
|
|
||||||
'''
|
|
||||||
Runtime-layer cancellation signal/msg.
|
|
||||||
|
|
||||||
Indicates a "graceful interrupt" of the machinery scheduled by
|
|
||||||
the py-proc's `trio.run()`.
|
|
||||||
|
|
||||||
Often reffed with the short-hand: "actorc".
|
|
||||||
|
|
||||||
Raised from within `an: ActorNursery` (via an `ExceptionGroup`)
|
|
||||||
when an actor has been "process wide" cancel-called using any of,
|
|
||||||
|
|
||||||
- `ActorNursery.cancel()`
|
|
||||||
- `Portal.cancel_actor()`
|
|
||||||
|
|
||||||
**and** that cancel request was part of a "non graceful" cancel
|
|
||||||
condition.
|
|
||||||
|
|
||||||
That is, whenever an exception is to be raised outside an `an`
|
|
||||||
scope-block due to some error raised-in/relayed-to that scope. In
|
|
||||||
such cases for every subactor which was cancelledand subsequently
|
|
||||||
( and according to the `an`'s supervision strat ) this is
|
|
||||||
normally raised per subactor portal.
|
|
||||||
|
|
||||||
'''
|
|
||||||
@property
|
|
||||||
def canceller(self) -> Aid:
|
|
||||||
'''
|
|
||||||
Return the (maybe) `Actor.aid: Aid` for the requesting-author
|
|
||||||
of this actorc.
|
|
||||||
|
|
||||||
Emit a warning msg when `.canceller` has not been set.
|
|
||||||
|
|
||||||
See additional relevant notes in
|
|
||||||
`ContextCancelled.canceller`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
value: tuple[str, str]|None
|
|
||||||
if msg := self._ipc_msg:
|
|
||||||
value = msg.canceller
|
|
||||||
else:
|
|
||||||
value = self._extra_msgdata['canceller']
|
|
||||||
|
|
||||||
if value:
|
|
||||||
return value
|
|
||||||
|
|
||||||
log.warning(
|
|
||||||
'IPC Context cancelled without a requesting actor?\n'
|
|
||||||
'Maybe the IPC transport ended abruptly?\n\n'
|
|
||||||
f'{self}'
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class MsgTypeError(
|
class MsgTypeError(
|
||||||
RemoteActorError,
|
RemoteActorError,
|
||||||
):
|
):
|
||||||
|
|
|
@ -88,8 +88,7 @@ async def maybe_block_bp(
|
||||||
bp_blocked: bool
|
bp_blocked: bool
|
||||||
if (
|
if (
|
||||||
debug_mode
|
debug_mode
|
||||||
and
|
and maybe_enable_greenback
|
||||||
maybe_enable_greenback
|
|
||||||
and (
|
and (
|
||||||
maybe_mod := await debug.maybe_init_greenback(
|
maybe_mod := await debug.maybe_init_greenback(
|
||||||
raise_not_found=False,
|
raise_not_found=False,
|
||||||
|
@ -479,10 +478,7 @@ async def open_root_actor(
|
||||||
|
|
||||||
# start runtime in a bg sub-task, yield to caller.
|
# start runtime in a bg sub-task, yield to caller.
|
||||||
async with (
|
async with (
|
||||||
collapse_eg(
|
collapse_eg(),
|
||||||
hide_tb=hide_tb,
|
|
||||||
# bp=True,
|
|
||||||
),
|
|
||||||
trio.open_nursery() as root_tn,
|
trio.open_nursery() as root_tn,
|
||||||
|
|
||||||
# XXX, finally-footgun below?
|
# XXX, finally-footgun below?
|
||||||
|
|
|
@ -50,11 +50,7 @@ from tractor._addr import UnwrappedAddress
|
||||||
from tractor._portal import Portal
|
from tractor._portal import Portal
|
||||||
from tractor._runtime import Actor
|
from tractor._runtime import Actor
|
||||||
from tractor._entry import _mp_main
|
from tractor._entry import _mp_main
|
||||||
from tractor._exceptions import (
|
from tractor._exceptions import ActorFailure
|
||||||
ActorCancelled,
|
|
||||||
ActorFailure,
|
|
||||||
# NoResult,
|
|
||||||
)
|
|
||||||
from tractor.msg import (
|
from tractor.msg import (
|
||||||
types as msgtypes,
|
types as msgtypes,
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
|
@ -141,6 +137,7 @@ def try_set_start_method(
|
||||||
|
|
||||||
|
|
||||||
async def exhaust_portal(
|
async def exhaust_portal(
|
||||||
|
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
actor: Actor
|
actor: Actor
|
||||||
|
|
||||||
|
@ -188,12 +185,10 @@ async def exhaust_portal(
|
||||||
|
|
||||||
|
|
||||||
async def cancel_on_completion(
|
async def cancel_on_completion(
|
||||||
|
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
errors: dict[
|
errors: dict[tuple[str, str], Exception],
|
||||||
msgtypes.Aid,
|
|
||||||
Exception,
|
|
||||||
],
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -214,57 +209,24 @@ async def cancel_on_completion(
|
||||||
portal,
|
portal,
|
||||||
actor,
|
actor,
|
||||||
)
|
)
|
||||||
aid: msgtypes.Aid = actor.aid
|
|
||||||
repr_aid: str = aid.reprol(sin_uuid=False)
|
|
||||||
|
|
||||||
if isinstance(result, Exception):
|
if isinstance(result, Exception):
|
||||||
errors[aid]: Exception = result
|
errors[actor.uid]: Exception = result
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Cancelling subactor {repr_aid!r} runtime due to error\n'
|
'Cancelling subactor runtime due to error:\n\n'
|
||||||
f'\n'
|
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
|
||||||
f'Portal.cancel_actor() => {portal.channel.uid}\n'
|
f'error: {result}\n'
|
||||||
f'\n'
|
|
||||||
f'{result!r}\n'
|
|
||||||
)
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
report: str = (
|
log.runtime(
|
||||||
f'Cancelling subactor {repr_aid!r} gracefully..\n'
|
'Cancelling subactor gracefully:\n\n'
|
||||||
f'\n'
|
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
|
||||||
)
|
f'result: {result}\n'
|
||||||
canc_info: str = (
|
|
||||||
f'Portal.cancel_actor() => {portal.chan.uid}\n'
|
|
||||||
f'\n'
|
|
||||||
f'final-result => {result!r}\n'
|
|
||||||
)
|
|
||||||
log.cancel(
|
|
||||||
report
|
|
||||||
+
|
|
||||||
canc_info
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# cancel the process now that we have a final result
|
# cancel the process now that we have a final result
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
if (
|
|
||||||
not errors.get(aid)
|
|
||||||
# and
|
|
||||||
# result is NoResult
|
|
||||||
):
|
|
||||||
pass
|
|
||||||
# await debug.pause(shield=True)
|
|
||||||
|
|
||||||
# errors[aid] = ActorCancelled(
|
|
||||||
# message=(
|
|
||||||
# f'Cancelled subactor {repr_aid!r}\n'
|
|
||||||
# f'{canc_info}\n'
|
|
||||||
# ),
|
|
||||||
# canceller=current_actor().aid,
|
|
||||||
# # TODO? should we have a ack-msg?
|
|
||||||
# # ipc_msg=??
|
|
||||||
# # boxed_type=trio.Cancelled,
|
|
||||||
# )
|
|
||||||
|
|
||||||
|
|
||||||
async def hard_kill(
|
async def hard_kill(
|
||||||
proc: trio.Process,
|
proc: trio.Process,
|
||||||
|
@ -352,10 +314,6 @@ async def soft_kill(
|
||||||
Awaitable,
|
Awaitable,
|
||||||
],
|
],
|
||||||
portal: Portal,
|
portal: Portal,
|
||||||
errors: dict[
|
|
||||||
msgtypes.Aid,
|
|
||||||
Exception,
|
|
||||||
],
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
|
@ -399,8 +357,8 @@ async def soft_kill(
|
||||||
# below. This means we try to do a graceful teardown
|
# below. This means we try to do a graceful teardown
|
||||||
# via sending a cancel message before getting out
|
# via sending a cancel message before getting out
|
||||||
# zombie killing tools.
|
# zombie killing tools.
|
||||||
async with trio.open_nursery() as tn:
|
async with trio.open_nursery() as n:
|
||||||
tn.cancel_scope.shield = True
|
n.cancel_scope.shield = True
|
||||||
|
|
||||||
async def cancel_on_proc_deth():
|
async def cancel_on_proc_deth():
|
||||||
'''
|
'''
|
||||||
|
@ -410,35 +368,24 @@ async def soft_kill(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
await wait_func(proc)
|
await wait_func(proc)
|
||||||
tn.cancel_scope.cancel()
|
n.cancel_scope.cancel()
|
||||||
|
|
||||||
# start a task to wait on the termination of the
|
# start a task to wait on the termination of the
|
||||||
# process by itself waiting on a (caller provided) wait
|
# process by itself waiting on a (caller provided) wait
|
||||||
# function which should unblock when the target process
|
# function which should unblock when the target process
|
||||||
# has terminated.
|
# has terminated.
|
||||||
tn.start_soon(cancel_on_proc_deth)
|
n.start_soon(cancel_on_proc_deth)
|
||||||
|
|
||||||
# send the actor-runtime a cancel request.
|
# send the actor-runtime a cancel request.
|
||||||
await portal.cancel_actor()
|
await portal.cancel_actor()
|
||||||
|
|
||||||
# if not errors.get(peer_aid):
|
|
||||||
# errors[peer_aid] = ActorCancelled(
|
|
||||||
# message=(
|
|
||||||
# 'Sub-actor cancelled gracefully by parent\n'
|
|
||||||
# ),
|
|
||||||
# canceller=current_actor().aid,
|
|
||||||
# # TODO? should we have a ack-msg?
|
|
||||||
# # ipc_msg=??
|
|
||||||
# # boxed_type=trio.Cancelled,
|
|
||||||
# )
|
|
||||||
|
|
||||||
if proc.poll() is None: # type: ignore
|
if proc.poll() is None: # type: ignore
|
||||||
log.warning(
|
log.warning(
|
||||||
'Subactor still alive after cancel request?\n\n'
|
'Subactor still alive after cancel request?\n\n'
|
||||||
f'uid: {peer_aid}\n'
|
f'uid: {peer_aid}\n'
|
||||||
f'|_{proc}\n'
|
f'|_{proc}\n'
|
||||||
)
|
)
|
||||||
tn.cancel_scope.cancel()
|
n.cancel_scope.cancel()
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
|
@ -446,10 +393,7 @@ async def new_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: ActorNursery,
|
actor_nursery: ActorNursery,
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[
|
errors: dict[tuple[str, str], Exception],
|
||||||
msgtypes.Aid,
|
|
||||||
Exception,
|
|
||||||
],
|
|
||||||
|
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[UnwrappedAddress],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
|
@ -488,10 +432,7 @@ async def trio_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: ActorNursery,
|
actor_nursery: ActorNursery,
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[
|
errors: dict[tuple[str, str], Exception],
|
||||||
msgtypes.Aid,
|
|
||||||
Exception,
|
|
||||||
],
|
|
||||||
|
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[UnwrappedAddress],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
|
@ -614,9 +555,9 @@ async def trio_proc(
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await actor_nursery._join_procs.wait()
|
await actor_nursery._join_procs.wait()
|
||||||
|
|
||||||
async with trio.open_nursery() as ptl_reaper_tn:
|
async with trio.open_nursery() as nursery:
|
||||||
if portal in actor_nursery._cancel_after_result_on_exit:
|
if portal in actor_nursery._cancel_after_result_on_exit:
|
||||||
ptl_reaper_tn.start_soon(
|
nursery.start_soon(
|
||||||
cancel_on_completion,
|
cancel_on_completion,
|
||||||
portal,
|
portal,
|
||||||
subactor,
|
subactor,
|
||||||
|
@ -629,8 +570,7 @@ async def trio_proc(
|
||||||
await soft_kill(
|
await soft_kill(
|
||||||
proc,
|
proc,
|
||||||
trio.Process.wait, # XXX, uses `pidfd_open()` below.
|
trio.Process.wait, # XXX, uses `pidfd_open()` below.
|
||||||
portal,
|
portal
|
||||||
errors,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# cancel result waiter that may have been spawned in
|
# cancel result waiter that may have been spawned in
|
||||||
|
@ -639,7 +579,7 @@ async def trio_proc(
|
||||||
'Cancelling portal result reaper task\n'
|
'Cancelling portal result reaper task\n'
|
||||||
f'c)> {subactor.aid.reprol()!r}\n'
|
f'c)> {subactor.aid.reprol()!r}\n'
|
||||||
)
|
)
|
||||||
ptl_reaper_tn.cancel_scope.cancel()
|
nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# XXX NOTE XXX: The "hard" reap since no actor zombies are
|
# XXX NOTE XXX: The "hard" reap since no actor zombies are
|
||||||
|
@ -712,10 +652,7 @@ async def mp_proc(
|
||||||
name: str,
|
name: str,
|
||||||
actor_nursery: ActorNursery, # type: ignore # noqa
|
actor_nursery: ActorNursery, # type: ignore # noqa
|
||||||
subactor: Actor,
|
subactor: Actor,
|
||||||
errors: dict[
|
errors: dict[tuple[str, str], Exception],
|
||||||
msgtypes.Aid,
|
|
||||||
Exception,
|
|
||||||
],
|
|
||||||
# passed through to actor main
|
# passed through to actor main
|
||||||
bind_addrs: list[UnwrappedAddress],
|
bind_addrs: list[UnwrappedAddress],
|
||||||
parent_addr: UnwrappedAddress,
|
parent_addr: UnwrappedAddress,
|
||||||
|
@ -840,7 +777,7 @@ async def mp_proc(
|
||||||
cancel_on_completion,
|
cancel_on_completion,
|
||||||
portal,
|
portal,
|
||||||
subactor,
|
subactor,
|
||||||
errors,
|
errors
|
||||||
)
|
)
|
||||||
|
|
||||||
# This is a "soft" (cancellable) join/reap which
|
# This is a "soft" (cancellable) join/reap which
|
||||||
|
@ -849,8 +786,7 @@ async def mp_proc(
|
||||||
await soft_kill(
|
await soft_kill(
|
||||||
proc,
|
proc,
|
||||||
proc_waiter,
|
proc_waiter,
|
||||||
portal,
|
portal
|
||||||
errors,
|
|
||||||
)
|
)
|
||||||
|
|
||||||
# cancel result waiter that may have been spawned in
|
# cancel result waiter that may have been spawned in
|
||||||
|
|
|
@ -30,9 +30,6 @@ import warnings
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
|
||||||
from .msg import (
|
|
||||||
types as msgtypes,
|
|
||||||
)
|
|
||||||
from .devx import (
|
from .devx import (
|
||||||
debug,
|
debug,
|
||||||
pformat as _pformat,
|
pformat as _pformat,
|
||||||
|
@ -51,7 +48,6 @@ from .trionics import (
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
ActorCancelled,
|
|
||||||
)
|
)
|
||||||
from ._root import (
|
from ._root import (
|
||||||
open_root_actor,
|
open_root_actor,
|
||||||
|
@ -103,10 +99,7 @@ class ActorNursery:
|
||||||
actor: Actor,
|
actor: Actor,
|
||||||
ria_nursery: trio.Nursery,
|
ria_nursery: trio.Nursery,
|
||||||
da_nursery: trio.Nursery,
|
da_nursery: trio.Nursery,
|
||||||
errors: dict[
|
errors: dict[tuple[str, str], BaseException],
|
||||||
msgtypes.Aid,
|
|
||||||
BaseException,
|
|
||||||
],
|
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
# self.supervisor = supervisor # TODO
|
# self.supervisor = supervisor # TODO
|
||||||
|
@ -124,11 +117,9 @@ class ActorNursery:
|
||||||
]
|
]
|
||||||
] = {}
|
] = {}
|
||||||
|
|
||||||
# signals when it is ok to start waiting o subactor procs
|
|
||||||
# for termination.
|
|
||||||
self._join_procs = trio.Event()
|
self._join_procs = trio.Event()
|
||||||
self._at_least_one_child_in_debug: bool = False
|
self._at_least_one_child_in_debug: bool = False
|
||||||
self._errors = errors
|
self.errors = errors
|
||||||
self._scope_error: BaseException|None = None
|
self._scope_error: BaseException|None = None
|
||||||
self.exited = trio.Event()
|
self.exited = trio.Event()
|
||||||
|
|
||||||
|
@ -269,7 +260,7 @@ class ActorNursery:
|
||||||
name,
|
name,
|
||||||
self,
|
self,
|
||||||
subactor,
|
subactor,
|
||||||
self._errors,
|
self.errors,
|
||||||
bind_addrs,
|
bind_addrs,
|
||||||
parent_addr,
|
parent_addr,
|
||||||
_rtv, # run time vars
|
_rtv, # run time vars
|
||||||
|
@ -373,9 +364,7 @@ class ActorNursery:
|
||||||
# then `._children`..
|
# then `._children`..
|
||||||
children: dict = self._children
|
children: dict = self._children
|
||||||
child_count: int = len(children)
|
child_count: int = len(children)
|
||||||
msg: str = (
|
msg: str = f'Cancelling actor nursery with {child_count} children\n'
|
||||||
f'Cancelling actor-nursery with {child_count} children\n'
|
|
||||||
)
|
|
||||||
|
|
||||||
server: IPCServer = self._actor.ipc_server
|
server: IPCServer = self._actor.ipc_server
|
||||||
|
|
||||||
|
@ -402,9 +391,7 @@ class ActorNursery:
|
||||||
|
|
||||||
else:
|
else:
|
||||||
if portal is None: # actor hasn't fully spawned yet
|
if portal is None: # actor hasn't fully spawned yet
|
||||||
event: trio.Event = server._peer_connected[
|
event: trio.Event = server._peer_connected[subactor.uid]
|
||||||
subactor.uid
|
|
||||||
]
|
|
||||||
log.warning(
|
log.warning(
|
||||||
f"{subactor.uid} never 't finished spawning?"
|
f"{subactor.uid} never 't finished spawning?"
|
||||||
)
|
)
|
||||||
|
@ -429,20 +416,7 @@ class ActorNursery:
|
||||||
# spawn cancel tasks for each sub-actor
|
# spawn cancel tasks for each sub-actor
|
||||||
assert portal
|
assert portal
|
||||||
if portal.channel.connected():
|
if portal.channel.connected():
|
||||||
|
tn.start_soon(portal.cancel_actor)
|
||||||
async def canc_subactor():
|
|
||||||
await portal.cancel_actor()
|
|
||||||
# aid: msgtypes.Aid = subactor.aid
|
|
||||||
# reprol: str = aid.reprol(sin_uuid=False)
|
|
||||||
# if not self._errors.get(aid):
|
|
||||||
# self._errors[aid] = ActorCancelled(
|
|
||||||
# message=(
|
|
||||||
# f'Sub-actor {reprol!r} cancelled gracefully by parent nursery\n'
|
|
||||||
# ),
|
|
||||||
# canceller=self._actor.aid,
|
|
||||||
# )
|
|
||||||
|
|
||||||
tn.start_soon(canc_subactor)
|
|
||||||
|
|
||||||
log.cancel(msg)
|
log.cancel(msg)
|
||||||
# if we cancelled the cancel (we hung cancelling remote actors)
|
# if we cancelled the cancel (we hung cancelling remote actors)
|
||||||
|
@ -468,47 +442,6 @@ class ActorNursery:
|
||||||
# mark ourselves as having (tried to have) cancelled all subactors
|
# mark ourselves as having (tried to have) cancelled all subactors
|
||||||
self._join_procs.set()
|
self._join_procs.set()
|
||||||
|
|
||||||
@property
|
|
||||||
def maybe_error(self) -> (
|
|
||||||
BaseException|
|
|
||||||
BaseExceptionGroup|
|
|
||||||
None
|
|
||||||
):
|
|
||||||
'''
|
|
||||||
Deliver any captured scope errors including those relayed
|
|
||||||
from subactors such as `ActorCancelled` during a non-graceful
|
|
||||||
cancellation scenario.
|
|
||||||
|
|
||||||
When more then a "graceful cancel" occurrs wrap all collected
|
|
||||||
sub-exceptions in a raised `ExceptionGroup`.
|
|
||||||
|
|
||||||
'''
|
|
||||||
scope_exc: BaseException|None = self._scope_error
|
|
||||||
|
|
||||||
# XXX NOTE, only pack an eg if there i at least one
|
|
||||||
# non-actorc exception received from a subactor, OR
|
|
||||||
# return `._scope_error` verbatim.
|
|
||||||
if (errors := self._errors):
|
|
||||||
# use `BaseExceptionGroup` as needed
|
|
||||||
excs: list[BaseException] = list(errors.values())
|
|
||||||
if (
|
|
||||||
len(excs) > 1
|
|
||||||
and
|
|
||||||
any(
|
|
||||||
type(exc) not in {ActorCancelled,}
|
|
||||||
for exc in excs
|
|
||||||
)
|
|
||||||
):
|
|
||||||
return ExceptionGroup(
|
|
||||||
'ActorNursery multi-errored with',
|
|
||||||
tuple(excs),
|
|
||||||
)
|
|
||||||
|
|
||||||
# raise the lone subactor exc
|
|
||||||
return list(excs)[0]
|
|
||||||
|
|
||||||
return scope_exc
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def _open_and_supervise_one_cancels_all_nursery(
|
async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
|
@ -524,10 +457,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
inner_err: BaseException|None = None
|
inner_err: BaseException|None = None
|
||||||
|
|
||||||
# the collection of errors retreived from spawned sub-actors
|
# the collection of errors retreived from spawned sub-actors
|
||||||
errors: dict[
|
errors: dict[tuple[str, str], BaseException] = {}
|
||||||
msgtypes.Aid,
|
|
||||||
BaseException,
|
|
||||||
] = {}
|
|
||||||
|
|
||||||
# This is the outermost level "deamon actor" nursery. It is awaited
|
# This is the outermost level "deamon actor" nursery. It is awaited
|
||||||
# **after** the below inner "run in actor nursery". This allows for
|
# **after** the below inner "run in actor nursery". This allows for
|
||||||
|
@ -537,7 +467,6 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# `ActorNursery.start_actor()`).
|
# `ActorNursery.start_actor()`).
|
||||||
|
|
||||||
# errors from this daemon actor nursery bubble up to caller
|
# errors from this daemon actor nursery bubble up to caller
|
||||||
try:
|
|
||||||
async with (
|
async with (
|
||||||
collapse_eg(),
|
collapse_eg(),
|
||||||
trio.open_nursery() as da_nursery,
|
trio.open_nursery() as da_nursery,
|
||||||
|
@ -577,7 +506,7 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
|
|
||||||
except BaseException as _inner_err:
|
except BaseException as _inner_err:
|
||||||
inner_err = _inner_err
|
inner_err = _inner_err
|
||||||
# errors[actor.aid] = inner_err
|
errors[actor.uid] = inner_err
|
||||||
|
|
||||||
# If we error in the root but the debugger is
|
# If we error in the root but the debugger is
|
||||||
# engaged we don't want to prematurely kill (and
|
# engaged we don't want to prematurely kill (and
|
||||||
|
@ -657,6 +586,8 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
) as _outer_err:
|
) as _outer_err:
|
||||||
outer_err = _outer_err
|
outer_err = _outer_err
|
||||||
|
|
||||||
|
an._scope_error = outer_err or inner_err
|
||||||
|
|
||||||
# XXX: yet another guard before allowing the cancel
|
# XXX: yet another guard before allowing the cancel
|
||||||
# sequence in case a (single) child is in debug.
|
# sequence in case a (single) child is in debug.
|
||||||
await debug.maybe_wait_for_debugger(
|
await debug.maybe_wait_for_debugger(
|
||||||
|
@ -674,75 +605,38 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
)
|
)
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await an.cancel()
|
await an.cancel()
|
||||||
|
|
||||||
raise
|
raise
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
scope_exc = an._scope_error = outer_err or inner_err
|
# No errors were raised while awaiting ".run_in_actor()"
|
||||||
# await debug.pause(shield=True)
|
# actors but those actors may have returned remote errors as
|
||||||
# if scope_exc:
|
# results (meaning they errored remotely and have relayed
|
||||||
# errors[actor.aid] = scope_exc
|
# those errors back to this parent actor). The errors are
|
||||||
|
# collected in ``errors`` so cancel all actors, summarize
|
||||||
# show this frame on any internal error
|
# all errors and re-raise.
|
||||||
if (
|
if errors:
|
||||||
not an.cancelled
|
|
||||||
and
|
|
||||||
scope_exc
|
|
||||||
):
|
|
||||||
__tracebackhide__: bool = False
|
|
||||||
|
|
||||||
# NOTE, it's possible no errors were raised while
|
|
||||||
# awaiting ".run_in_actor()" actors but those
|
|
||||||
# sub-actors may have delivered remote errors as
|
|
||||||
# results, normally captured via machinery in
|
|
||||||
# `._spawn.cancel_on_completion()`.
|
|
||||||
#
|
|
||||||
# Any such remote errors are collected in `an._errors`
|
|
||||||
# which is summarized via `ActorNursery.maybe_error`
|
|
||||||
# which is maybe re-raised in an outer block (below).
|
|
||||||
#
|
|
||||||
# So here we first cancel all subactors the summarize
|
|
||||||
# all errors and then later (in that outer block)
|
|
||||||
# maybe-raise on a "non-graceful" cancellation
|
|
||||||
# outcome, normally as a summary EG.
|
|
||||||
if (
|
|
||||||
scope_exc
|
|
||||||
or
|
|
||||||
errors
|
|
||||||
):
|
|
||||||
|
|
||||||
if an._children:
|
if an._children:
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
await an.cancel()
|
await an.cancel()
|
||||||
|
|
||||||
# cancel outer tn so we unblock outside this
|
# use `BaseExceptionGroup` as needed
|
||||||
# finally!
|
if len(errors) > 1:
|
||||||
da_nursery.cance_scope.cancel()
|
raise BaseExceptionGroup(
|
||||||
#
|
'tractor.ActorNursery errored with',
|
||||||
# ^TODO? still don't get why needed?
|
tuple(errors.values()),
|
||||||
# - an.cancel() should cause all spawn-subtasks
|
)
|
||||||
# to eventually exit?
|
else:
|
||||||
# - also, could (instead) we sync to an event here before
|
raise list(errors.values())[0]
|
||||||
# (ever) calling `an.cancel()`??
|
|
||||||
|
|
||||||
# `da_nursery` scope end, thus a checkpoint.
|
# show frame on any (likely) internal error
|
||||||
finally:
|
if (
|
||||||
|
not an.cancelled
|
||||||
|
and an._scope_error
|
||||||
|
):
|
||||||
|
__tracebackhide__: bool = False
|
||||||
|
|
||||||
# raise any eg compiled from all subs
|
# da_nursery scope end - nursery checkpoint
|
||||||
# ??TODO should we also adopt strict-egs here like
|
# final exit
|
||||||
# `trio.Nursery`??
|
|
||||||
#
|
|
||||||
# XXX justification notes,
|
|
||||||
# docs: https://trio.readthedocs.io/en/stable/reference-core.html#historical-note-non-strict-exceptiongroups
|
|
||||||
# anthropic: https://discuss.python.org/t/using-exceptiongroup-at-anthropic-experience-report/20888
|
|
||||||
# gh: https://github.com/python-trio/trio/issues/611
|
|
||||||
if an_exc := an.maybe_error:
|
|
||||||
raise an_exc
|
|
||||||
|
|
||||||
if scope_exc := an._scope_error:
|
|
||||||
raise scope_exc
|
|
||||||
|
|
||||||
# @acm-fn scope exit
|
|
||||||
|
|
||||||
|
|
||||||
_shutdown_msg: str = (
|
_shutdown_msg: str = (
|
||||||
|
@ -753,7 +647,7 @@ _shutdown_msg: str = (
|
||||||
@acm
|
@acm
|
||||||
async def open_nursery(
|
async def open_nursery(
|
||||||
*, # named params only!
|
*, # named params only!
|
||||||
hide_tb: bool = False,
|
hide_tb: bool = True,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
# ^TODO, paramspec for `open_root_actor()`
|
# ^TODO, paramspec for `open_root_actor()`
|
||||||
|
|
||||||
|
@ -789,21 +683,16 @@ async def open_nursery(
|
||||||
# mark us for teardown on exit
|
# mark us for teardown on exit
|
||||||
implicit_runtime: bool = True
|
implicit_runtime: bool = True
|
||||||
|
|
||||||
async with (
|
async with open_root_actor(
|
||||||
# collapse_eg(hide_tb=hide_tb),
|
|
||||||
open_root_actor(
|
|
||||||
hide_tb=hide_tb,
|
hide_tb=hide_tb,
|
||||||
**kwargs,
|
**kwargs,
|
||||||
) as actor,
|
) as actor:
|
||||||
):
|
|
||||||
assert actor is current_actor()
|
assert actor is current_actor()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
async with (
|
async with _open_and_supervise_one_cancels_all_nursery(
|
||||||
_open_and_supervise_one_cancels_all_nursery(
|
|
||||||
actor
|
actor
|
||||||
) as an
|
) as an:
|
||||||
):
|
|
||||||
|
|
||||||
# NOTE: mark this nursery as having
|
# NOTE: mark this nursery as having
|
||||||
# implicitly started the root actor so
|
# implicitly started the root actor so
|
||||||
|
|
|
@ -78,6 +78,7 @@ def collapse_exception_group(
|
||||||
def get_collapsed_eg(
|
def get_collapsed_eg(
|
||||||
beg: BaseExceptionGroup,
|
beg: BaseExceptionGroup,
|
||||||
|
|
||||||
|
bp: bool = False,
|
||||||
) -> BaseException|None:
|
) -> BaseException|None:
|
||||||
'''
|
'''
|
||||||
If the input beg can collapse to a single sub-exception which is
|
If the input beg can collapse to a single sub-exception which is
|
||||||
|
@ -91,6 +92,7 @@ def get_collapsed_eg(
|
||||||
return maybe_exc
|
return maybe_exc
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def collapse_eg(
|
async def collapse_eg(
|
||||||
hide_tb: bool = True,
|
hide_tb: bool = True,
|
||||||
|
@ -100,8 +102,6 @@ async def collapse_eg(
|
||||||
# trio.Cancelled,
|
# trio.Cancelled,
|
||||||
},
|
},
|
||||||
add_notes: bool = True,
|
add_notes: bool = True,
|
||||||
|
|
||||||
bp: bool = False,
|
|
||||||
):
|
):
|
||||||
'''
|
'''
|
||||||
If `BaseExceptionGroup` raised in the body scope is
|
If `BaseExceptionGroup` raised in the body scope is
|
||||||
|
@ -115,11 +115,6 @@ async def collapse_eg(
|
||||||
yield
|
yield
|
||||||
except BaseExceptionGroup as _beg:
|
except BaseExceptionGroup as _beg:
|
||||||
beg = _beg
|
beg = _beg
|
||||||
|
|
||||||
if bp:
|
|
||||||
import tractor
|
|
||||||
await tractor.pause(shield=True)
|
|
||||||
|
|
||||||
if (
|
if (
|
||||||
(exc := get_collapsed_eg(beg))
|
(exc := get_collapsed_eg(beg))
|
||||||
and
|
and
|
||||||
|
|
Loading…
Reference in New Issue