Compare commits

...

10 Commits

Author SHA1 Message Date
Tyler Goodlet 249e99a2d9 Pass `hide_tb` to embedded collapser in `open_root_actor()` 2025-08-08 16:35:07 -04:00
Tyler Goodlet b464c9805c Add timeout around inf-streamer suite
Since with the new actorc injection seems to be hanging?
Not sure what exactly the issue is but likely races again
during teardown between the `.run_in_actor()` remote-exc capture
and any actorc after the `portal.cancel()`..

Also tossed in a bp to figure out why actorcs aren't actually showing
outside the `trio.run()`..?
2025-08-08 16:33:46 -04:00
Tyler Goodlet 0aba4dde28 Adjust nested-subs debug test for tbs output
Such that we don't require every single src/relay_uid in the final
output but instead at some point in the pre-output of some prompt.
Added some comments to match each actor sub-layer.
2025-08-06 12:57:40 -04:00
Tyler Goodlet 49aac86167 Add temp breakpoint support to `collapse_eg()` 2025-08-05 16:44:58 -04:00
Tyler Goodlet 39d2a3ee3d WIP, actor-nursery non-graceful-cancel raises EG
Attempting a rework of the post-cancellation "raising semantics" such
that subactors which are `ActorCancelled` as a result of a non-graceful
in-scope error, are acked via a re-raised
`ExceptionGroup[ActorCancelled*N, Exception]`
*outside the an-block*. Eventually, the idea is to have `ActorCancelled`
be relayed from each subactor in response to any
`Actor.cancel()/Portal.cancel_actor()` request much like
`Context.cancel()/ContextCancelled`.

This is a WIP bc it does break a few tests and requires related
`_spawn`-mod-machinery changes to match some of which I'm not yet sure
are required; need to dig into to the details of the currently failing
suites first.

`._supervise` patch deats,
- add `ActorNursery.maybe_error` which delivers the maybe-EG or
  `._scope_error` depending on `.errors` (now `._errors`, a mapping from
  `Aid`-keys) has entries seet for subs.
- raise ^ if non-null in a new outer-`finally` in
  `_open_and_supervise_one_cancels_all_nursery()`; an "outer" block is
  added to ensure all sub-actor-excs are emited/captured as part of
  `ActorNursery.cancel()` being called (as prior) as well as the
  `da_nursery` being explicitly cancelled alongside it (to unblock the
  tn-block, but still not sure why this is necessary yet?..).
- (now masked) tried injecting actorcs from `.cancel()` loop, but (again
  per more explanation in section below) seems to be suffering a race
  issue with RAE relay?
- left in buncha notes obvi for all this..

`._spawn` patch deats,
- as above, expect `errors: dict` to map from `Aid`-keys.
- pass `errors: dict` into `soft_kill()` since it seemed like we'd want
  to (for now) inject `ActoreCancelled` in some cases (but now i'm not
  sure XD).
- tried out a couple spots (which are now masked) to inject
  `ActorCancelled` after calling `Portal.cancel()` in various
  subactor-supervision routines whenev an RAE is not set..
  - oddly seems to be overwriting actual errors (likely due to racing
    with RAE receive and/or actorc-request timeout?) despite the guard
    logic..which clearly doesn't resolve the issue..
- buncha `tn`-style renaming.
2025-08-05 16:34:57 -04:00
Tyler Goodlet 1f269d8c32 Add todo for `tn` to `gather_contexts()` from `find_actor()`? 2025-08-05 11:59:17 -04:00
Tyler Goodlet 0304cac2e8 Use `an` var name in nested subactor debugging ex. 2025-08-05 11:55:45 -04:00
Tyler Goodlet 30d3ccf826 TOSQUASH 313ad93: yeah dun use `._message` as tb-str.. 2025-08-05 01:05:46 -04:00
Tyler Goodlet f614856673 Add an `actorc` test-driven-dev suite
Defining how an actor-nursery should emit an eg based on non-graceful
cancellation in a new `test_actor_nursery` module. Obviously fails atm
until the implementation is completed.
2025-08-04 17:19:35 -04:00
Tyler Goodlet 313ad93e06 Add `ActorCancelled` as an runtime-wide-signal
As in a layer "above" a KBI/SIGINT but "below" a `ContextCancelled` and
generally signalling an interrupt which requests cancellation of the
actor's `trio.run()`.

Impl deats,
- mk the new exc type inherit from our ctxc (for now) but overriding the
  `.canceller` impl to,
  * pull from the `RemoteActorError._extra_msgdata: dict` when no
    `._ipc_msg` is set (which is always to start, until we incorporate
    a new `CancelActor` msg type).
  * not allow a `None` value since we should key-error if not set per
    prev bullet.
- Mk adjustments (related) to parent `RemoteActorError.pformat()` to
  accommodate showing the `.canceller` field in repr output,
  * change `.relay_uid` to not crash when `._ipc_msg` is unset.
  * support `.msg.types.Aid` and use its `.reprol()` from `._mk_fields_str()`.
  * always call `._mk_fields_str()`, not just when `tb_str` is provided,
    and for now use any `._message` in-place of a `tb_str` when
    undefined.
2025-08-04 16:21:24 -04:00
10 changed files with 699 additions and 291 deletions

View File

@ -21,12 +21,12 @@ async def breakpoint_forever():
async def spawn_until(depth=0):
""""A nested nursery that triggers another ``NameError``.
"""
async with tractor.open_nursery() as n:
async with tractor.open_nursery() as an:
if depth < 1:
await n.run_in_actor(breakpoint_forever)
await an.run_in_actor(breakpoint_forever)
p = await n.run_in_actor(
p = await an.run_in_actor(
name_error,
name='name_error'
)
@ -38,7 +38,7 @@ async def spawn_until(depth=0):
# recusrive call to spawn another process branching layer of
# the tree
depth -= 1
await n.run_in_actor(
await an.run_in_actor(
spawn_until,
depth=depth,
name=f'spawn_until_{depth}',

View File

@ -709,10 +709,41 @@ def test_multi_nested_subactors_error_through_nurseries(
child = spawn('multi_nested_subactors_error_up_through_nurseries')
# timed_out_early: bool = False
at_least_one: list[str] = [
"bdb.BdbQuit",
for send_char in itertools.cycle(['c', 'q']):
# leaf subs, which actually raise in "user code"
"src_uid=('breakpoint_forever'",
"src_uid=('name_error'",
# 2nd layer subs
"src_uid=('spawn_until_1'",
"src_uid=('spawn_until_2'",
"src_uid=('spawn_until_3'",
"relay_uid=('spawn_until_0'",
# 1st layer subs
"src_uid=('spawner0'",
"src_uid=('spawner1'",
]
for i, send_char in enumerate(
itertools.cycle(['c', 'q'])
):
try:
child.expect(PROMPT)
for patt in at_least_one.copy():
if in_prompt_msg(
child,
[patt],
):
print(
f'Found patt in prompt {i}\n'
f'patt: {patt!r}\n'
)
at_least_one.remove(patt)
child.sendline(send_char)
time.sleep(0.01)
@ -721,27 +752,15 @@ def test_multi_nested_subactors_error_through_nurseries(
assert_before(
child,
[ # boxed source errors
"NameError: name 'doggypants' is not defined",
[
# boxed source errors should show in final
# post-prompt tb to console.
"tractor._exceptions.RemoteActorError:",
"('name_error'",
"bdb.BdbQuit",
"NameError: name 'doggypants' is not defined",
# first level subtrees
# "tractor._exceptions.RemoteActorError: ('spawner0'",
"src_uid=('spawner0'",
# "tractor._exceptions.RemoteActorError: ('spawner1'",
# propagation of errors up through nested subtrees
# "tractor._exceptions.RemoteActorError: ('spawn_until_0'",
# "tractor._exceptions.RemoteActorError: ('spawn_until_1'",
# "tractor._exceptions.RemoteActorError: ('spawn_until_2'",
# ^-NOTE-^ old RAE repr, new one is below with a field
# showing the src actor's uid.
"src_uid=('spawn_until_0'",
"relay_uid=('spawn_until_1'",
"src_uid=('spawn_until_2'",
# TODO? once we get more pedantic with `relay_uid` should
# prolly include all actor-IDs we expect to see in final
# tb?
]
)

View File

@ -0,0 +1,98 @@
'''
Basic `ActorNursery` operations and closure semantics,
- basic remote error collection,
- basic multi-subactor cancellation.
'''
# import os
# import signal
# import platform
# import time
# from itertools import repeat
import pytest
import trio
import tractor
from tractor._exceptions import ActorCancelled
# from tractor._testing import (
# tractor_test,
# )
# from .conftest import no_windows
@pytest.mark.parametrize(
'num_subs',
[
1,
3,
]
)
def test_one_cancels_all(
start_method: str,
loglevel: str,
debug_mode: bool,
num_subs: int,
):
'''
Verify that ifa a single error bubbles to the an-scope the
nursery will be cancelled (just like in `trio`); this is a
one-cancels-all style strategy and are only supervision policy
at the moment.
'''
async def main():
try:
rte = RuntimeError('Uh oh something bad in parent')
async with tractor.open_nursery(
start_method=start_method,
loglevel=loglevel,
debug_mode=debug_mode,
) as an:
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_subs):
name: str= f'sub_{i}'
ptl: tractor.Portal = await an.start_actor(
name=name,
enable_modules=[__name__],
)
dactor_portals.append(ptl)
# wait for booted
async with tractor.wait_for_actor(name):
print(f'{name!r} is up.')
# simulate uncaught exc
raise rte
# should error here with a ``RemoteActorError`` or ``MultiError``
except BaseExceptionGroup as _beg:
beg = _beg
# ?TODO? why can't we do `is` on beg?
assert (
beg.exceptions
==
an.maybe_error.exceptions
)
assert len(beg.exceptions) == (
num_subs
+
1 # rte from root
)
# all subactors should have been implicitly
# `Portal.cancel_actor()`ed.
excs = list(beg.exceptions)
excs.remove(rte)
for exc in excs:
assert isinstance(exc, ActorCancelled)
assert an._scope_error is rte
assert not an._children
assert an.cancelled is True
trio.run(main)

View File

@ -11,6 +11,9 @@ from itertools import repeat
import pytest
import trio
import tractor
from tractor._exceptions import (
ActorCancelled,
)
from tractor._testing import (
tractor_test,
)
@ -124,7 +127,10 @@ def test_multierror(
) as nursery:
await nursery.run_in_actor(assert_err, name='errorer1')
portal2 = await nursery.run_in_actor(assert_err, name='errorer2')
portal2 = await nursery.run_in_actor(
assert_err,
name='errorer2',
)
# get result(s) from main task
try:
@ -137,7 +143,15 @@ def test_multierror(
# here we should get a ``BaseExceptionGroup`` containing exceptions
# from both subactors
with pytest.raises(BaseExceptionGroup):
with pytest.raises(
expected_exception=(
tractor.RemoteActorError,
# ?TODO, should it be this??
# like `trio`'s strict egs?
BaseExceptionGroup,
),
):
trio.run(main)
@ -233,8 +247,9 @@ async def stream_forever():
@tractor_test
async def test_cancel_infinite_streamer(start_method):
async def test_cancel_infinite_streamer(
start_method: str,
):
# stream for at most 1 seconds
with trio.move_on_after(1) as cancel_scope:
async with tractor.open_nursery() as n:
@ -288,6 +303,7 @@ async def test_some_cancels_all(
num_actors_and_errs: tuple,
start_method: str,
loglevel: str,
debug_mode: bool,
):
'''
Verify a subset of failed subactors causes all others in
@ -303,68 +319,81 @@ async def test_some_cancels_all(
ria_func,
da_func,
) = num_actors_and_errs
try:
async with tractor.open_nursery() as an:
with trio.fail_after(
3
if not debug_mode
else 999
):
try:
async with tractor.open_nursery() as an:
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_actors):
dactor_portals.append(await an.start_actor(
f'deamon_{i}',
enable_modules=[__name__],
))
# spawn the same number of deamon actors which should be cancelled
dactor_portals = []
for i in range(num_actors):
dactor_portals.append(await an.start_actor(
f'deamon_{i}',
enable_modules=[__name__],
))
func, kwargs = ria_func
riactor_portals = []
for i in range(num_actors):
# start actor(s) that will fail immediately
riactor_portals.append(
await an.run_in_actor(
func,
name=f'actor_{i}',
**kwargs
func, kwargs = ria_func
riactor_portals = []
for i in range(num_actors):
# start actor(s) that will fail immediately
riactor_portals.append(
await an.run_in_actor(
func,
name=f'actor_{i}',
**kwargs
)
)
)
if da_func:
func, kwargs, expect_error = da_func
for portal in dactor_portals:
# if this function fails then we should error here
# and the nursery should teardown all other actors
try:
await portal.run(func, **kwargs)
if da_func:
func, kwargs, expect_error = da_func
for portal in dactor_portals:
# if this function fails then we should error here
# and the nursery should teardown all other actors
try:
await portal.run(func, **kwargs)
except tractor.RemoteActorError as err:
assert err.boxed_type == err_type
# we only expect this first error to propogate
# (all other daemons are cancelled before they
# can be scheduled)
num_actors = 1
# reraise so nursery teardown is triggered
raise
except tractor.RemoteActorError as err:
assert err.boxed_type == err_type
# we only expect this first error to propogate
# (all other daemons are cancelled before they
# can be scheduled)
num_actors = 1
# reraise so nursery teardown is triggered
raise
else:
if expect_error:
pytest.fail(
"Deamon call should fail at checkpoint?")
# should error here with a ``RemoteActorError`` or ``MultiError``
except first_err as _err:
err = _err
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
# TODO, figure out why these aren't being set?
if isinstance(exc, ActorCancelled):
breakpoint()
if isinstance(exc, tractor.RemoteActorError):
assert exc.boxed_type == err_type
else:
if expect_error:
pytest.fail(
"Deamon call should fail at checkpoint?")
assert isinstance(exc, trio.Cancelled)
# should error here with a ``RemoteActorError`` or ``MultiError``
elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type
except first_err as _err:
err = _err
if isinstance(err, BaseExceptionGroup):
assert len(err.exceptions) == num_actors
for exc in err.exceptions:
if isinstance(exc, tractor.RemoteActorError):
assert exc.boxed_type == err_type
else:
assert isinstance(exc, trio.Cancelled)
elif isinstance(err, tractor.RemoteActorError):
assert err.boxed_type == err_type
assert an.cancelled is True
assert not an._children
else:
pytest.fail("Should have gotten a remote assertion error?")
assert an.cancelled is True
assert not an._children
else:
pytest.fail("Should have gotten a remote assertion error?")
async def spawn_and_error(breadth, depth) -> None:

View File

@ -27,7 +27,7 @@ from typing import (
)
from contextlib import asynccontextmanager as acm
from tractor.log import get_logger
from .log import get_logger
from .trionics import (
gather_contexts,
collapse_eg,
@ -217,7 +217,7 @@ async def find_actor(
raise_on_none: bool = False,
) -> AsyncGenerator[
Portal | list[Portal] | None,
Portal|list[Portal]|None,
None,
]:
'''
@ -259,6 +259,7 @@ async def find_actor(
collapse_eg(),
gather_contexts(
mngrs=maybe_portals,
# tn=tn, # ?TODO, helps to pass rent tn here?
) as portals,
):
# log.runtime(

View File

@ -46,6 +46,7 @@ from msgspec import (
from tractor._state import current_actor
from tractor.log import get_logger
from tractor.msg import (
Aid,
Error,
PayloadMsg,
MsgType,
@ -479,9 +480,10 @@ class RemoteActorError(Exception):
@property
def relay_uid(self) -> tuple[str, str]|None:
return tuple(
self._ipc_msg.relay_path[-1]
)
if msg := self._ipc_msg:
return tuple(
msg.relay_path[-1]
)
@property
def src_uid(self) -> tuple[str, str]|None:
@ -521,7 +523,8 @@ class RemoteActorError(Exception):
for key in fields:
if (
key == 'relay_uid'
and not self.is_inception()
and
not self.is_inception()
):
continue
@ -534,6 +537,13 @@ class RemoteActorError(Exception):
None,
)
)
if (
key == 'canceller'
and
isinstance(val, Aid)
):
val: str = val.reprol(sin_uuid=False)
# TODO: for `.relay_path` on multiline?
# if not isinstance(val, str):
# val_str = pformat(val)
@ -623,12 +633,22 @@ class RemoteActorError(Exception):
# IFF there is an embedded traceback-str we always
# draw the ascii-box around it.
body: str = ''
if tb_str := self.tb_str:
fields: str = self._mk_fields_str(
_body_fields
+
self.extra_body_fields,
)
fields: str = self._mk_fields_str(
_body_fields
+
self.extra_body_fields,
)
tb_str: str = (
self.tb_str
#
# ^TODO? what to use instead? if anything?
# -[ ] ensure the `.message` doesn't show up 2x in output ya?
# -[ ] ._message isn't really right?
# or
# self._message
)
if tb_str:
from tractor.devx import (
pformat_boxed_tb,
)
@ -640,7 +660,7 @@ class RemoteActorError(Exception):
# just after <Type(
# |___ ..
tb_body_indent=1,
boxer_header=self.relay_uid,
boxer_header=self.relay_uid or '-',
)
# !TODO, it'd be nice to import these top level without
@ -713,6 +733,10 @@ class RemoteActorError(Exception):
class ContextCancelled(RemoteActorError):
'''
IPC context cancellation signal/msg.
Often reffed with the short-hand: "ctxc".
Inter-actor task context was cancelled by either a call to
``Portal.cancel_actor()`` or ``Context.cancel()``.
@ -737,8 +761,8 @@ class ContextCancelled(RemoteActorError):
- (simulating) an IPC transport network outage
- a (malicious) pkt sent specifically to cancel an actor's
runtime non-gracefully without ensuring ongoing RPC tasks are
incrementally cancelled as is done with:
runtime non-gracefully without ensuring ongoing RPC tasks
are incrementally cancelled as is done with:
`Actor`
|_`.cancel()`
|_`.cancel_soon()`
@ -759,6 +783,59 @@ class ContextCancelled(RemoteActorError):
# src_actor_uid = canceller
class ActorCancelled(ContextCancelled):
'''
Runtime-layer cancellation signal/msg.
Indicates a "graceful interrupt" of the machinery scheduled by
the py-proc's `trio.run()`.
Often reffed with the short-hand: "actorc".
Raised from within `an: ActorNursery` (via an `ExceptionGroup`)
when an actor has been "process wide" cancel-called using any of,
- `ActorNursery.cancel()`
- `Portal.cancel_actor()`
**and** that cancel request was part of a "non graceful" cancel
condition.
That is, whenever an exception is to be raised outside an `an`
scope-block due to some error raised-in/relayed-to that scope. In
such cases for every subactor which was cancelledand subsequently
( and according to the `an`'s supervision strat ) this is
normally raised per subactor portal.
'''
@property
def canceller(self) -> Aid:
'''
Return the (maybe) `Actor.aid: Aid` for the requesting-author
of this actorc.
Emit a warning msg when `.canceller` has not been set.
See additional relevant notes in
`ContextCancelled.canceller`.
'''
value: tuple[str, str]|None
if msg := self._ipc_msg:
value = msg.canceller
else:
value = self._extra_msgdata['canceller']
if value:
return value
log.warning(
'IPC Context cancelled without a requesting actor?\n'
'Maybe the IPC transport ended abruptly?\n\n'
f'{self}'
)
class MsgTypeError(
RemoteActorError,
):

View File

@ -88,7 +88,8 @@ async def maybe_block_bp(
bp_blocked: bool
if (
debug_mode
and maybe_enable_greenback
and
maybe_enable_greenback
and (
maybe_mod := await debug.maybe_init_greenback(
raise_not_found=False,
@ -478,7 +479,10 @@ async def open_root_actor(
# start runtime in a bg sub-task, yield to caller.
async with (
collapse_eg(),
collapse_eg(
hide_tb=hide_tb,
# bp=True,
),
trio.open_nursery() as root_tn,
# XXX, finally-footgun below?

View File

@ -50,7 +50,11 @@ from tractor._addr import UnwrappedAddress
from tractor._portal import Portal
from tractor._runtime import Actor
from tractor._entry import _mp_main
from tractor._exceptions import ActorFailure
from tractor._exceptions import (
ActorCancelled,
ActorFailure,
# NoResult,
)
from tractor.msg import (
types as msgtypes,
pretty_struct,
@ -137,7 +141,6 @@ def try_set_start_method(
async def exhaust_portal(
portal: Portal,
actor: Actor
@ -185,10 +188,12 @@ async def exhaust_portal(
async def cancel_on_completion(
portal: Portal,
actor: Actor,
errors: dict[tuple[str, str], Exception],
errors: dict[
msgtypes.Aid,
Exception,
],
) -> None:
'''
@ -209,24 +214,57 @@ async def cancel_on_completion(
portal,
actor,
)
aid: msgtypes.Aid = actor.aid
repr_aid: str = aid.reprol(sin_uuid=False)
if isinstance(result, Exception):
errors[actor.uid]: Exception = result
errors[aid]: Exception = result
log.cancel(
'Cancelling subactor runtime due to error:\n\n'
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
f'error: {result}\n'
'Cancelling subactor {repr_aid!r} runtime due to error\n'
f'\n'
f'Portal.cancel_actor() => {portal.channel.uid}\n'
f'\n'
f'{result!r}\n'
)
else:
log.runtime(
'Cancelling subactor gracefully:\n\n'
f'Portal.cancel_actor() => {portal.channel.uid}\n\n'
f'result: {result}\n'
report: str = (
f'Cancelling subactor {repr_aid!r} gracefully..\n'
f'\n'
)
canc_info: str = (
f'Portal.cancel_actor() => {portal.chan.uid}\n'
f'\n'
f'final-result => {result!r}\n'
)
log.cancel(
report
+
canc_info
)
# cancel the process now that we have a final result
await portal.cancel_actor()
if (
not errors.get(aid)
# and
# result is NoResult
):
pass
# await debug.pause(shield=True)
# errors[aid] = ActorCancelled(
# message=(
# f'Cancelled subactor {repr_aid!r}\n'
# f'{canc_info}\n'
# ),
# canceller=current_actor().aid,
# # TODO? should we have a ack-msg?
# # ipc_msg=??
# # boxed_type=trio.Cancelled,
# )
async def hard_kill(
proc: trio.Process,
@ -314,6 +352,10 @@ async def soft_kill(
Awaitable,
],
portal: Portal,
errors: dict[
msgtypes.Aid,
Exception,
],
) -> None:
'''
@ -357,8 +399,8 @@ async def soft_kill(
# below. This means we try to do a graceful teardown
# via sending a cancel message before getting out
# zombie killing tools.
async with trio.open_nursery() as n:
n.cancel_scope.shield = True
async with trio.open_nursery() as tn:
tn.cancel_scope.shield = True
async def cancel_on_proc_deth():
'''
@ -368,24 +410,35 @@ async def soft_kill(
'''
await wait_func(proc)
n.cancel_scope.cancel()
tn.cancel_scope.cancel()
# start a task to wait on the termination of the
# process by itself waiting on a (caller provided) wait
# function which should unblock when the target process
# has terminated.
n.start_soon(cancel_on_proc_deth)
tn.start_soon(cancel_on_proc_deth)
# send the actor-runtime a cancel request.
await portal.cancel_actor()
# if not errors.get(peer_aid):
# errors[peer_aid] = ActorCancelled(
# message=(
# 'Sub-actor cancelled gracefully by parent\n'
# ),
# canceller=current_actor().aid,
# # TODO? should we have a ack-msg?
# # ipc_msg=??
# # boxed_type=trio.Cancelled,
# )
if proc.poll() is None: # type: ignore
log.warning(
'Subactor still alive after cancel request?\n\n'
f'uid: {peer_aid}\n'
f'|_{proc}\n'
)
n.cancel_scope.cancel()
tn.cancel_scope.cancel()
raise
@ -393,7 +446,10 @@ async def new_proc(
name: str,
actor_nursery: ActorNursery,
subactor: Actor,
errors: dict[tuple[str, str], Exception],
errors: dict[
msgtypes.Aid,
Exception,
],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
@ -432,7 +488,10 @@ async def trio_proc(
name: str,
actor_nursery: ActorNursery,
subactor: Actor,
errors: dict[tuple[str, str], Exception],
errors: dict[
msgtypes.Aid,
Exception,
],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
@ -555,9 +614,9 @@ async def trio_proc(
with trio.CancelScope(shield=True):
await actor_nursery._join_procs.wait()
async with trio.open_nursery() as nursery:
async with trio.open_nursery() as ptl_reaper_tn:
if portal in actor_nursery._cancel_after_result_on_exit:
nursery.start_soon(
ptl_reaper_tn.start_soon(
cancel_on_completion,
portal,
subactor,
@ -570,7 +629,8 @@ async def trio_proc(
await soft_kill(
proc,
trio.Process.wait, # XXX, uses `pidfd_open()` below.
portal
portal,
errors,
)
# cancel result waiter that may have been spawned in
@ -579,7 +639,7 @@ async def trio_proc(
'Cancelling portal result reaper task\n'
f'c)> {subactor.aid.reprol()!r}\n'
)
nursery.cancel_scope.cancel()
ptl_reaper_tn.cancel_scope.cancel()
finally:
# XXX NOTE XXX: The "hard" reap since no actor zombies are
@ -652,7 +712,10 @@ async def mp_proc(
name: str,
actor_nursery: ActorNursery, # type: ignore # noqa
subactor: Actor,
errors: dict[tuple[str, str], Exception],
errors: dict[
msgtypes.Aid,
Exception,
],
# passed through to actor main
bind_addrs: list[UnwrappedAddress],
parent_addr: UnwrappedAddress,
@ -777,7 +840,7 @@ async def mp_proc(
cancel_on_completion,
portal,
subactor,
errors
errors,
)
# This is a "soft" (cancellable) join/reap which
@ -786,7 +849,8 @@ async def mp_proc(
await soft_kill(
proc,
proc_waiter,
portal
portal,
errors,
)
# cancel result waiter that may have been spawned in

View File

@ -30,6 +30,9 @@ import warnings
import trio
from .msg import (
types as msgtypes,
)
from .devx import (
debug,
pformat as _pformat,
@ -48,6 +51,7 @@ from .trionics import (
)
from ._exceptions import (
ContextCancelled,
ActorCancelled,
)
from ._root import (
open_root_actor,
@ -99,7 +103,10 @@ class ActorNursery:
actor: Actor,
ria_nursery: trio.Nursery,
da_nursery: trio.Nursery,
errors: dict[tuple[str, str], BaseException],
errors: dict[
msgtypes.Aid,
BaseException,
],
) -> None:
# self.supervisor = supervisor # TODO
@ -117,9 +124,11 @@ class ActorNursery:
]
] = {}
# signals when it is ok to start waiting o subactor procs
# for termination.
self._join_procs = trio.Event()
self._at_least_one_child_in_debug: bool = False
self.errors = errors
self._errors = errors
self._scope_error: BaseException|None = None
self.exited = trio.Event()
@ -260,7 +269,7 @@ class ActorNursery:
name,
self,
subactor,
self.errors,
self._errors,
bind_addrs,
parent_addr,
_rtv, # run time vars
@ -364,7 +373,9 @@ class ActorNursery:
# then `._children`..
children: dict = self._children
child_count: int = len(children)
msg: str = f'Cancelling actor nursery with {child_count} children\n'
msg: str = (
f'Cancelling actor-nursery with {child_count} children\n'
)
server: IPCServer = self._actor.ipc_server
@ -391,7 +402,9 @@ class ActorNursery:
else:
if portal is None: # actor hasn't fully spawned yet
event: trio.Event = server._peer_connected[subactor.uid]
event: trio.Event = server._peer_connected[
subactor.uid
]
log.warning(
f"{subactor.uid} never 't finished spawning?"
)
@ -416,7 +429,20 @@ class ActorNursery:
# spawn cancel tasks for each sub-actor
assert portal
if portal.channel.connected():
tn.start_soon(portal.cancel_actor)
async def canc_subactor():
await portal.cancel_actor()
# aid: msgtypes.Aid = subactor.aid
# reprol: str = aid.reprol(sin_uuid=False)
# if not self._errors.get(aid):
# self._errors[aid] = ActorCancelled(
# message=(
# f'Sub-actor {reprol!r} cancelled gracefully by parent nursery\n'
# ),
# canceller=self._actor.aid,
# )
tn.start_soon(canc_subactor)
log.cancel(msg)
# if we cancelled the cancel (we hung cancelling remote actors)
@ -442,6 +468,47 @@ class ActorNursery:
# mark ourselves as having (tried to have) cancelled all subactors
self._join_procs.set()
@property
def maybe_error(self) -> (
BaseException|
BaseExceptionGroup|
None
):
'''
Deliver any captured scope errors including those relayed
from subactors such as `ActorCancelled` during a non-graceful
cancellation scenario.
When more then a "graceful cancel" occurrs wrap all collected
sub-exceptions in a raised `ExceptionGroup`.
'''
scope_exc: BaseException|None = self._scope_error
# XXX NOTE, only pack an eg if there i at least one
# non-actorc exception received from a subactor, OR
# return `._scope_error` verbatim.
if (errors := self._errors):
# use `BaseExceptionGroup` as needed
excs: list[BaseException] = list(errors.values())
if (
len(excs) > 1
and
any(
type(exc) not in {ActorCancelled,}
for exc in excs
)
):
return ExceptionGroup(
'ActorNursery multi-errored with',
tuple(excs),
)
# raise the lone subactor exc
return list(excs)[0]
return scope_exc
@acm
async def _open_and_supervise_one_cancels_all_nursery(
@ -457,7 +524,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
inner_err: BaseException|None = None
# the collection of errors retreived from spawned sub-actors
errors: dict[tuple[str, str], BaseException] = {}
errors: dict[
msgtypes.Aid,
BaseException,
] = {}
# This is the outermost level "deamon actor" nursery. It is awaited
# **after** the below inner "run in actor nursery". This allows for
@ -467,176 +537,212 @@ async def _open_and_supervise_one_cancels_all_nursery(
# `ActorNursery.start_actor()`).
# errors from this daemon actor nursery bubble up to caller
async with (
collapse_eg(),
trio.open_nursery() as da_nursery,
):
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# `ActorNusery.run_in_actor()`) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with (
collapse_eg(),
trio.open_nursery() as ria_nursery,
):
an = ActorNursery(
actor,
ria_nursery,
da_nursery,
errors
)
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield an
# When we didn't error in the caller's scope,
# signal all process-monitor-tasks to conduct
# the "hard join phase".
log.runtime(
'Waiting on subactors to complete:\n'
f'>}} {len(an._children)}\n'
try:
async with (
collapse_eg(),
trio.open_nursery() as da_nursery,
):
try:
# This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using
# `ActorNusery.run_in_actor()`) are expected to only
# return a single result and then complete (i.e. be canclled
# gracefully). Errors collected from these actors are
# immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified.
async with (
collapse_eg(),
trio.open_nursery() as ria_nursery,
):
an = ActorNursery(
actor,
ria_nursery,
da_nursery,
errors
)
an._join_procs.set()
try:
# spawning of actors happens in the caller's scope
# after we yield upwards
yield an
except BaseException as _inner_err:
inner_err = _inner_err
errors[actor.uid] = inner_err
# When we didn't error in the caller's scope,
# signal all process-monitor-tasks to conduct
# the "hard join phase".
log.runtime(
'Waiting on subactors to complete:\n'
f'>}} {len(an._children)}\n'
)
an._join_procs.set()
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
except BaseException as _inner_err:
inner_err = _inner_err
# errors[actor.aid] = inner_err
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
an._join_procs.set()
# If we error in the root but the debugger is
# engaged we don't want to prematurely kill (and
# thus clobber access to) the local tty since it
# will make the pdb repl unusable.
# Instead try to wait for pdb to be released before
# tearing down.
await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
# XXX NOTE XXX: hypothetically an error could
# be raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype: type = type(inner_err)
if etype in (
trio.Cancelled,
KeyboardInterrupt,
) or (
is_multi_cancelled(inner_err)
):
log.cancel(
f'Actor-nursery cancelled by {etype}\n\n'
# if the caller's scope errored then we activate our
# one-cancels-all supervisor strategy (don't
# worry more are coming).
an._join_procs.set()
f'{current_actor().uid}\n'
f' |_{an}\n\n'
# XXX NOTE XXX: hypothetically an error could
# be raised and then a cancel signal shows up
# slightly after in which case the `else:`
# block here might not complete? For now,
# shield both.
with trio.CancelScope(shield=True):
etype: type = type(inner_err)
if etype in (
trio.Cancelled,
KeyboardInterrupt,
) or (
is_multi_cancelled(inner_err)
):
log.cancel(
f'Actor-nursery cancelled by {etype}\n\n'
# TODO: show tb str?
# f'{tb_str}'
)
elif etype in {
ContextCancelled,
}:
log.cancel(
'Actor-nursery caught remote cancellation\n'
'\n'
f'{inner_err.tb_str}'
)
else:
log.exception(
'Nursery errored with:\n'
f'{current_actor().uid}\n'
f' |_{an}\n\n'
# TODO: same thing as in
# `._invoke()` to compute how to
# place this div-line in the
# middle of the above msg
# content..
# -[ ] prolly helper-func it too
# in our `.log` module..
# '------ - ------'
)
# TODO: show tb str?
# f'{tb_str}'
)
elif etype in {
ContextCancelled,
}:
log.cancel(
'Actor-nursery caught remote cancellation\n'
'\n'
f'{inner_err.tb_str}'
)
else:
log.exception(
'Nursery errored with:\n'
# cancel all subactors
await an.cancel()
# TODO: same thing as in
# `._invoke()` to compute how to
# place this div-line in the
# middle of the above msg
# content..
# -[ ] prolly helper-func it too
# in our `.log` module..
# '------ - ------'
)
# ria_nursery scope end
# cancel all subactors
await an.cancel()
# TODO: this is the handler around the ``.run_in_actor()``
# nursery. Ideally we can drop this entirely in the future as
# the whole ``.run_in_actor()`` API should be built "on top of"
# this lower level spawn-request-cancel "daemon actor" API where
# a local in-actor task nursery is used with one-to-one task
# + `await Portal.run()` calls and the results/errors are
# handled directly (inline) and errors by the local nursery.
except (
Exception,
BaseExceptionGroup,
trio.Cancelled
) as _outer_err:
outer_err = _outer_err
# ria_nursery scope end
an._scope_error = outer_err or inner_err
# TODO: this is the handler around the ``.run_in_actor()``
# nursery. Ideally we can drop this entirely in the future as
# the whole ``.run_in_actor()`` API should be built "on top of"
# this lower level spawn-request-cancel "daemon actor" API where
# a local in-actor task nursery is used with one-to-one task
# + `await Portal.run()` calls and the results/errors are
# handled directly (inline) and errors by the local nursery.
except (
Exception,
BaseExceptionGroup,
trio.Cancelled
) as _outer_err:
outer_err = _outer_err
# XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug.
await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
if an._children:
log.cancel(
'Actor-nursery cancelling due error type:\n'
f'{outer_err}\n'
# XXX: yet another guard before allowing the cancel
# sequence in case a (single) child is in debug.
await debug.maybe_wait_for_debugger(
child_in_debug=an._at_least_one_child_in_debug
)
with trio.CancelScope(shield=True):
await an.cancel()
raise
finally:
# No errors were raised while awaiting ".run_in_actor()"
# actors but those actors may have returned remote errors as
# results (meaning they errored remotely and have relayed
# those errors back to this parent actor). The errors are
# collected in ``errors`` so cancel all actors, summarize
# all errors and re-raise.
if errors:
# If actor-local error was raised while waiting on
# ".run_in_actor()" actors then we also want to cancel all
# remaining sub-actors (due to our lone strategy:
# one-cancels-all).
if an._children:
log.cancel(
'Actor-nursery cancelling due error type:\n'
f'{outer_err}\n'
)
with trio.CancelScope(shield=True):
await an.cancel()
# use `BaseExceptionGroup` as needed
if len(errors) > 1:
raise BaseExceptionGroup(
'tractor.ActorNursery errored with',
tuple(errors.values()),
)
else:
raise list(errors.values())[0]
raise
# show frame on any (likely) internal error
if (
not an.cancelled
and an._scope_error
):
__tracebackhide__: bool = False
finally:
scope_exc = an._scope_error = outer_err or inner_err
# await debug.pause(shield=True)
# if scope_exc:
# errors[actor.aid] = scope_exc
# da_nursery scope end - nursery checkpoint
# final exit
# show this frame on any internal error
if (
not an.cancelled
and
scope_exc
):
__tracebackhide__: bool = False
# NOTE, it's possible no errors were raised while
# awaiting ".run_in_actor()" actors but those
# sub-actors may have delivered remote errors as
# results, normally captured via machinery in
# `._spawn.cancel_on_completion()`.
#
# Any such remote errors are collected in `an._errors`
# which is summarized via `ActorNursery.maybe_error`
# which is maybe re-raised in an outer block (below).
#
# So here we first cancel all subactors the summarize
# all errors and then later (in that outer block)
# maybe-raise on a "non-graceful" cancellation
# outcome, normally as a summary EG.
if (
scope_exc
or
errors
):
if an._children:
with trio.CancelScope(shield=True):
await an.cancel()
# cancel outer tn so we unblock outside this
# finally!
da_nursery.cance_scope.cancel()
#
# ^TODO? still don't get why needed?
# - an.cancel() should cause all spawn-subtasks
# to eventually exit?
# - also, could (instead) we sync to an event here before
# (ever) calling `an.cancel()`??
# `da_nursery` scope end, thus a checkpoint.
finally:
# raise any eg compiled from all subs
# ??TODO should we also adopt strict-egs here like
# `trio.Nursery`??
#
# XXX justification notes,
# docs: https://trio.readthedocs.io/en/stable/reference-core.html#historical-note-non-strict-exceptiongroups
# anthropic: https://discuss.python.org/t/using-exceptiongroup-at-anthropic-experience-report/20888
# gh: https://github.com/python-trio/trio/issues/611
if an_exc := an.maybe_error:
raise an_exc
if scope_exc := an._scope_error:
raise scope_exc
# @acm-fn scope exit
_shutdown_msg: str = (
@ -647,7 +753,7 @@ _shutdown_msg: str = (
@acm
async def open_nursery(
*, # named params only!
hide_tb: bool = True,
hide_tb: bool = False,
**kwargs,
# ^TODO, paramspec for `open_root_actor()`
@ -683,16 +789,21 @@ async def open_nursery(
# mark us for teardown on exit
implicit_runtime: bool = True
async with open_root_actor(
hide_tb=hide_tb,
**kwargs,
) as actor:
async with (
# collapse_eg(hide_tb=hide_tb),
open_root_actor(
hide_tb=hide_tb,
**kwargs,
) as actor,
):
assert actor is current_actor()
try:
async with _open_and_supervise_one_cancels_all_nursery(
actor
) as an:
async with (
_open_and_supervise_one_cancels_all_nursery(
actor
) as an
):
# NOTE: mark this nursery as having
# implicitly started the root actor so

View File

@ -78,7 +78,6 @@ def collapse_exception_group(
def get_collapsed_eg(
beg: BaseExceptionGroup,
bp: bool = False,
) -> BaseException|None:
'''
If the input beg can collapse to a single sub-exception which is
@ -92,7 +91,6 @@ def get_collapsed_eg(
return maybe_exc
@acm
async def collapse_eg(
hide_tb: bool = True,
@ -102,6 +100,8 @@ async def collapse_eg(
# trio.Cancelled,
},
add_notes: bool = True,
bp: bool = False,
):
'''
If `BaseExceptionGroup` raised in the body scope is
@ -115,6 +115,11 @@ async def collapse_eg(
yield
except BaseExceptionGroup as _beg:
beg = _beg
if bp:
import tractor
await tractor.pause(shield=True)
if (
(exc := get_collapsed_eg(beg))
and