Compare commits
23 Commits
b32121f540
...
0cbf02bf2e
Author | SHA1 | Date |
---|---|---|
|
0cbf02bf2e | |
|
4a0b78d447 | |
|
718d0887c9 | |
|
40a40f613a | |
|
c692b30e7f | |
|
c91bf6dcc2 | |
|
5afa9407c9 | |
|
9b1dd7b279 | |
|
910257bb46 | |
|
ca37d8ed91 | |
|
072cdd0c66 | |
|
30302b051d | |
|
bb04e55d5f | |
|
226d06dbfa | |
|
1b502736d6 | |
|
51992ef546 | |
|
6da470918a | |
|
93d97c3ff9 | |
|
abaea4de8e | |
|
4b3ba35cd6 | |
|
70a508e9d7 | |
|
59822ff093 | |
|
ca427aec7e |
|
@ -0,0 +1,145 @@
|
|||
from contextlib import (
|
||||
contextmanager as cm,
|
||||
# TODO, any diff in async case(s)??
|
||||
# asynccontextmanager as acm,
|
||||
)
|
||||
from functools import partial
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
|
||||
log = tractor.log.get_logger(__name__)
|
||||
tractor.log.get_console_log('info')
|
||||
|
||||
@cm
|
||||
def teardown_on_exc(
|
||||
raise_from_handler: bool = False,
|
||||
):
|
||||
'''
|
||||
You could also have a teardown handler which catches any exc and
|
||||
does some required teardown. In this case the problem is
|
||||
compounded UNLESS you ensure the handler's scope is OUTSIDE the
|
||||
`ux.aclose()`.. that is in the caller's enclosing scope.
|
||||
|
||||
'''
|
||||
try:
|
||||
yield
|
||||
except BaseException as _berr:
|
||||
berr = _berr
|
||||
log.exception(
|
||||
f'Handling termination teardown in child due to,\n'
|
||||
f'{berr!r}\n'
|
||||
)
|
||||
if raise_from_handler:
|
||||
# XXX teardown ops XXX
|
||||
# on termination these steps say need to be run to
|
||||
# ensure wider system consistency (like the state of
|
||||
# remote connections/services).
|
||||
#
|
||||
# HOWEVER, any bug in this teardown code is also
|
||||
# masked by the `tx.aclose()`!
|
||||
# this is also true if `_tn.cancel_scope` is
|
||||
# `.cancel_called` by the parent in a graceful
|
||||
# request case..
|
||||
|
||||
# simulate a bug in teardown handler.
|
||||
raise RuntimeError(
|
||||
'woopsie teardown bug!'
|
||||
)
|
||||
|
||||
raise # no teardown bug.
|
||||
|
||||
|
||||
async def finite_stream_to_rent(
|
||||
tx: trio.abc.SendChannel,
|
||||
child_errors_mid_stream: bool,
|
||||
|
||||
task_status: trio.TaskStatus[
|
||||
trio.CancelScope,
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
async with (
|
||||
# XXX without this unmasker the mid-streaming RTE is never
|
||||
# reported since it is masked by the `tx.aclose()`
|
||||
# call which in turn raises `Cancelled`!
|
||||
#
|
||||
# NOTE, this is WITHOUT doing any exception handling
|
||||
# inside the child task!
|
||||
#
|
||||
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
||||
# tractor.trionics.maybe_raise_from_masking_exc(),
|
||||
|
||||
tx as tx, # .aclose() is the guilty masker chkpt!
|
||||
trio.open_nursery() as _tn,
|
||||
):
|
||||
# pass our scope back to parent for supervision\
|
||||
# control.
|
||||
task_status.started(_tn.cancel_scope)
|
||||
|
||||
with teardown_on_exc(
|
||||
raise_from_handler=not child_errors_mid_stream,
|
||||
):
|
||||
for i in range(100):
|
||||
log.info(
|
||||
f'Child tx {i!r}\n'
|
||||
)
|
||||
if (
|
||||
child_errors_mid_stream
|
||||
and
|
||||
i == 66
|
||||
):
|
||||
# oh wait but WOOPS there's a bug
|
||||
# in that teardown code!?
|
||||
raise RuntimeError(
|
||||
'woopsie, a mid-streaming bug!?'
|
||||
)
|
||||
|
||||
await tx.send(i)
|
||||
|
||||
|
||||
async def main(
|
||||
# TODO! toggle this for the 2 cases!
|
||||
# 1. child errors mid-stream while parent is also requesting
|
||||
# (graceful) cancel of that child streamer.
|
||||
#
|
||||
# 2. child contains a teardown handler which contains a
|
||||
# bug and raises.
|
||||
#
|
||||
child_errors_mid_stream: bool,
|
||||
):
|
||||
tx, rx = trio.open_memory_channel(1)
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
rx as rx,
|
||||
):
|
||||
|
||||
_child_cs = await tn.start(
|
||||
partial(
|
||||
finite_stream_to_rent,
|
||||
child_errors_mid_stream=child_errors_mid_stream,
|
||||
tx=tx,
|
||||
)
|
||||
)
|
||||
async for msg in rx:
|
||||
log.info(
|
||||
f'Rent rx {msg!r}\n'
|
||||
)
|
||||
|
||||
# simulate some external cancellation
|
||||
# request **JUST BEFORE** the child errors.
|
||||
if msg == 65:
|
||||
log.cancel(
|
||||
f'Cancelling parent on,\n'
|
||||
f'msg={msg}\n'
|
||||
f'\n'
|
||||
f'Simulates OOB cancel request!\n'
|
||||
)
|
||||
tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
for case in [True, False]:
|
||||
trio.run(main, case)
|
|
@ -317,7 +317,6 @@ def test_subactor_breakpoint(
|
|||
|
||||
assert in_prompt_msg(
|
||||
child, [
|
||||
'MessagingError:',
|
||||
'RemoteActorError:',
|
||||
"('breakpoint_forever'",
|
||||
'bdb.BdbQuit',
|
||||
|
|
|
@ -284,20 +284,32 @@ async def test_cancel_infinite_streamer(start_method):
|
|||
],
|
||||
)
|
||||
@tractor_test
|
||||
async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
|
||||
"""Verify a subset of failed subactors causes all others in
|
||||
async def test_some_cancels_all(
|
||||
num_actors_and_errs: tuple,
|
||||
start_method: str,
|
||||
loglevel: str,
|
||||
):
|
||||
'''
|
||||
Verify a subset of failed subactors causes all others in
|
||||
the nursery to be cancelled just like the strategy in trio.
|
||||
|
||||
This is the first and only supervisory strategy at the moment.
|
||||
"""
|
||||
num_actors, first_err, err_type, ria_func, da_func = num_actors_and_errs
|
||||
|
||||
'''
|
||||
(
|
||||
num_actors,
|
||||
first_err,
|
||||
err_type,
|
||||
ria_func,
|
||||
da_func,
|
||||
) = num_actors_and_errs
|
||||
try:
|
||||
async with tractor.open_nursery() as n:
|
||||
async with tractor.open_nursery() as an:
|
||||
|
||||
# spawn the same number of deamon actors which should be cancelled
|
||||
dactor_portals = []
|
||||
for i in range(num_actors):
|
||||
dactor_portals.append(await n.start_actor(
|
||||
dactor_portals.append(await an.start_actor(
|
||||
f'deamon_{i}',
|
||||
enable_modules=[__name__],
|
||||
))
|
||||
|
@ -307,7 +319,7 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
|
|||
for i in range(num_actors):
|
||||
# start actor(s) that will fail immediately
|
||||
riactor_portals.append(
|
||||
await n.run_in_actor(
|
||||
await an.run_in_actor(
|
||||
func,
|
||||
name=f'actor_{i}',
|
||||
**kwargs
|
||||
|
@ -337,7 +349,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
|
|||
|
||||
# should error here with a ``RemoteActorError`` or ``MultiError``
|
||||
|
||||
except first_err as err:
|
||||
except first_err as _err:
|
||||
err = _err
|
||||
if isinstance(err, BaseExceptionGroup):
|
||||
assert len(err.exceptions) == num_actors
|
||||
for exc in err.exceptions:
|
||||
|
@ -348,8 +361,8 @@ async def test_some_cancels_all(num_actors_and_errs, start_method, loglevel):
|
|||
elif isinstance(err, tractor.RemoteActorError):
|
||||
assert err.boxed_type == err_type
|
||||
|
||||
assert n.cancelled is True
|
||||
assert not n._children
|
||||
assert an.cancelled is True
|
||||
assert not an._children
|
||||
else:
|
||||
pytest.fail("Should have gotten a remote assertion error?")
|
||||
|
||||
|
@ -533,38 +546,123 @@ def test_cancel_via_SIGINT_other_task(
|
|||
|
||||
async def spin_for(period=3):
|
||||
"Sync sleep."
|
||||
print(f'sync sleeping in sub-sub for {period}\n')
|
||||
time.sleep(period)
|
||||
|
||||
|
||||
async def spawn():
|
||||
async with tractor.open_nursery() as tn:
|
||||
await tn.run_in_actor(
|
||||
async def spawn_sub_with_sync_blocking_task():
|
||||
async with tractor.open_nursery() as an:
|
||||
print('starting sync blocking subactor..\n')
|
||||
await an.run_in_actor(
|
||||
spin_for,
|
||||
name='sleeper',
|
||||
)
|
||||
print('exiting first subactor layer..\n')
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'man_cancel_outer',
|
||||
[
|
||||
False, # passes if delay != 2
|
||||
|
||||
# always causes an unexpected eg-w-embedded-assert-err?
|
||||
pytest.param(True,
|
||||
marks=pytest.mark.xfail(
|
||||
reason=(
|
||||
'always causes an unexpected eg-w-embedded-assert-err?'
|
||||
)
|
||||
),
|
||||
),
|
||||
],
|
||||
)
|
||||
@no_windows
|
||||
def test_cancel_while_childs_child_in_sync_sleep(
|
||||
loglevel,
|
||||
start_method,
|
||||
spawn_backend,
|
||||
loglevel: str,
|
||||
start_method: str,
|
||||
spawn_backend: str,
|
||||
debug_mode: bool,
|
||||
reg_addr: tuple,
|
||||
man_cancel_outer: bool,
|
||||
):
|
||||
"""Verify that a child cancelled while executing sync code is torn
|
||||
'''
|
||||
Verify that a child cancelled while executing sync code is torn
|
||||
down even when that cancellation is triggered by the parent
|
||||
2 nurseries "up".
|
||||
"""
|
||||
|
||||
Though the grandchild should stay blocking its actor runtime, its
|
||||
parent should issue a "zombie reaper" to hard kill it after
|
||||
sufficient timeout.
|
||||
|
||||
'''
|
||||
if start_method == 'forkserver':
|
||||
pytest.skip("Forksever sux hard at resuming from sync sleep...")
|
||||
|
||||
async def main():
|
||||
with trio.fail_after(2):
|
||||
async with tractor.open_nursery() as tn:
|
||||
await tn.run_in_actor(
|
||||
spawn,
|
||||
name='spawn',
|
||||
#
|
||||
# XXX BIG TODO NOTE XXX
|
||||
#
|
||||
# it seems there's a strange race that can happen
|
||||
# where where the fail-after will trigger outer scope
|
||||
# .cancel() which then causes the inner scope to raise,
|
||||
#
|
||||
# BaseExceptionGroup('Exceptions from Trio nursery', [
|
||||
# BaseExceptionGroup('Exceptions from Trio nursery',
|
||||
# [
|
||||
# Cancelled(),
|
||||
# Cancelled(),
|
||||
# ]
|
||||
# ),
|
||||
# AssertionError('assert 0')
|
||||
# ])
|
||||
#
|
||||
# WHY THIS DOESN'T MAKE SENSE:
|
||||
# ---------------------------
|
||||
# - it should raise too-slow-error when too slow..
|
||||
# * verified that using simple-cs and manually cancelling
|
||||
# you get same outcome -> indicates that the fail-after
|
||||
# can have its TooSlowError overriden!
|
||||
# |_ to check this it's easy, simplly decrease the timeout
|
||||
# as per the var below.
|
||||
#
|
||||
# - when using the manual simple-cs the outcome is different
|
||||
# DESPITE the `assert 0` which means regardless of the
|
||||
# inner scope effectively failing in the same way, the
|
||||
# bubbling up **is NOT the same**.
|
||||
#
|
||||
# delays trigger diff outcomes..
|
||||
# ---------------------------
|
||||
# as seen by uncommenting various lines below there is from
|
||||
# my POV an unexpected outcome due to the delay=2 case.
|
||||
#
|
||||
# delay = 1 # no AssertionError in eg, TooSlowError raised.
|
||||
# delay = 2 # is AssertionError in eg AND no TooSlowError !?
|
||||
delay = 4 # is AssertionError in eg AND no _cs cancellation.
|
||||
|
||||
with trio.fail_after(delay) as _cs:
|
||||
# with trio.CancelScope() as cs:
|
||||
# ^XXX^ can be used instead to see same outcome.
|
||||
|
||||
async with (
|
||||
# tractor.trionics.collapse_eg(), # doesn't help
|
||||
tractor.open_nursery(
|
||||
hide_tb=False,
|
||||
debug_mode=debug_mode,
|
||||
registry_addrs=[reg_addr],
|
||||
) as an,
|
||||
):
|
||||
await an.run_in_actor(
|
||||
spawn_sub_with_sync_blocking_task,
|
||||
name='sync_blocking_sub',
|
||||
)
|
||||
await trio.sleep(1)
|
||||
|
||||
if man_cancel_outer:
|
||||
print('Cancelling manually in root')
|
||||
_cs.cancel()
|
||||
|
||||
# trigger exc-srced taskc down
|
||||
# the actor tree.
|
||||
print('RAISING IN ROOT')
|
||||
assert 0
|
||||
|
||||
with pytest.raises(AssertionError):
|
||||
|
|
|
@ -13,26 +13,24 @@ MESSAGE = 'tractoring at full speed'
|
|||
def test_empty_mngrs_input_raises() -> None:
|
||||
|
||||
async def main():
|
||||
with trio.fail_after(1):
|
||||
with trio.fail_after(3):
|
||||
async with (
|
||||
open_actor_cluster(
|
||||
modules=[__name__],
|
||||
|
||||
# NOTE: ensure we can passthrough runtime opts
|
||||
loglevel='info',
|
||||
# debug_mode=True,
|
||||
loglevel='cancel',
|
||||
debug_mode=False,
|
||||
|
||||
) as portals,
|
||||
|
||||
gather_contexts(
|
||||
# NOTE: it's the use of inline-generator syntax
|
||||
# here that causes the empty input.
|
||||
mngrs=(
|
||||
p.open_context(worker) for p in portals.values()
|
||||
),
|
||||
),
|
||||
gather_contexts(mngrs=()),
|
||||
):
|
||||
assert 0
|
||||
# should fail before this?
|
||||
assert portals
|
||||
|
||||
# test should fail if we mk it here!
|
||||
assert 0, 'Should have raised val-err !?'
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
trio.run(main)
|
||||
|
|
|
@ -235,10 +235,16 @@ async def cancel_after(wait, reg_addr):
|
|||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def time_quad_ex(reg_addr, ci_env, spawn_backend):
|
||||
def time_quad_ex(
|
||||
reg_addr: tuple,
|
||||
ci_env: bool,
|
||||
spawn_backend: str,
|
||||
):
|
||||
if spawn_backend == 'mp':
|
||||
"""no idea but the mp *nix runs are flaking out here often...
|
||||
"""
|
||||
'''
|
||||
no idea but the mp *nix runs are flaking out here often...
|
||||
|
||||
'''
|
||||
pytest.skip("Test is too flaky on mp in CI")
|
||||
|
||||
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
|
||||
|
@ -249,12 +255,24 @@ def time_quad_ex(reg_addr, ci_env, spawn_backend):
|
|||
return results, diff
|
||||
|
||||
|
||||
def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
|
||||
"""This also serves as a kind of "we'd like to be this fast test"."""
|
||||
def test_a_quadruple_example(
|
||||
time_quad_ex: tuple,
|
||||
ci_env: bool,
|
||||
spawn_backend: str,
|
||||
):
|
||||
'''
|
||||
This also serves as a kind of "we'd like to be this fast test".
|
||||
|
||||
'''
|
||||
results, diff = time_quad_ex
|
||||
assert results
|
||||
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
|
||||
this_fast = (
|
||||
6 if platform.system() in (
|
||||
'Windows',
|
||||
'Darwin',
|
||||
)
|
||||
else 3
|
||||
)
|
||||
assert diff < this_fast
|
||||
|
||||
|
||||
|
|
|
@ -147,8 +147,7 @@ def test_trio_prestarted_task_bubbles(
|
|||
await trio.sleep_forever()
|
||||
|
||||
async def _trio_main():
|
||||
# with trio.fail_after(2):
|
||||
with trio.fail_after(999):
|
||||
with trio.fail_after(2 if not debug_mode else 999):
|
||||
first: str
|
||||
chan: to_asyncio.LinkedTaskChannel
|
||||
aio_ev = asyncio.Event()
|
||||
|
@ -217,32 +216,25 @@ def test_trio_prestarted_task_bubbles(
|
|||
):
|
||||
aio_ev.set()
|
||||
|
||||
with pytest.raises(
|
||||
expected_exception=ExceptionGroup,
|
||||
) as excinfo:
|
||||
tractor.to_asyncio.run_as_asyncio_guest(
|
||||
trio_main=_trio_main,
|
||||
)
|
||||
|
||||
eg = excinfo.value
|
||||
rte_eg, rest_eg = eg.split(RuntimeError)
|
||||
|
||||
# ensure the trio-task's error bubbled despite the aio-side
|
||||
# having (maybe) errored first.
|
||||
if aio_err_trigger in (
|
||||
'after_trio_task_starts',
|
||||
'after_start_point',
|
||||
):
|
||||
assert len(errs := rest_eg.exceptions) == 1
|
||||
typerr = errs[0]
|
||||
assert (
|
||||
type(typerr) is TypeError
|
||||
and
|
||||
'trio-side' in typerr.args
|
||||
)
|
||||
patt: str = 'trio-side'
|
||||
expect_exc = TypeError
|
||||
|
||||
# when aio errors BEFORE (last) trio task is scheduled, we should
|
||||
# never see anythinb but the aio-side.
|
||||
else:
|
||||
assert len(rtes := rte_eg.exceptions) == 1
|
||||
assert 'asyncio-side' in rtes[0].args[0]
|
||||
patt: str = 'asyncio-side'
|
||||
expect_exc = RuntimeError
|
||||
|
||||
with pytest.raises(expect_exc) as excinfo:
|
||||
tractor.to_asyncio.run_as_asyncio_guest(
|
||||
trio_main=_trio_main,
|
||||
)
|
||||
|
||||
caught_exc = excinfo.value
|
||||
assert patt in caught_exc.args
|
||||
|
|
|
@ -55,10 +55,17 @@ async def open_actor_cluster(
|
|||
raise ValueError(
|
||||
'Number of names is {len(names)} but count it {count}')
|
||||
|
||||
async with tractor.open_nursery(
|
||||
**runtime_kwargs,
|
||||
) as an:
|
||||
async with trio.open_nursery() as n:
|
||||
async with (
|
||||
# tractor.trionics.collapse_eg(),
|
||||
tractor.open_nursery(
|
||||
**runtime_kwargs,
|
||||
) as an
|
||||
):
|
||||
async with (
|
||||
# tractor.trionics.collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
tractor.trionics.maybe_raise_from_masking_exc()
|
||||
):
|
||||
uid = tractor.current_actor().uid
|
||||
|
||||
async def _start(name: str) -> None:
|
||||
|
@ -69,9 +76,8 @@ async def open_actor_cluster(
|
|||
)
|
||||
|
||||
for name in names:
|
||||
n.start_soon(_start, name)
|
||||
tn.start_soon(_start, name)
|
||||
|
||||
assert len(portals) == count
|
||||
yield portals
|
||||
|
||||
await an.cancel(hard_kill=hard_kill)
|
||||
|
|
|
@ -101,6 +101,9 @@ from ._state import (
|
|||
debug_mode,
|
||||
_ctxvar_Context,
|
||||
)
|
||||
from .trionics import (
|
||||
collapse_eg,
|
||||
)
|
||||
# ------ - ------
|
||||
if TYPE_CHECKING:
|
||||
from ._portal import Portal
|
||||
|
@ -942,7 +945,7 @@ class Context:
|
|||
self.cancel_called = True
|
||||
|
||||
header: str = (
|
||||
f'Cancelling ctx from {side.upper()}-side\n'
|
||||
f'Cancelling ctx from {side!r}-side\n'
|
||||
)
|
||||
reminfo: str = (
|
||||
# ' =>\n'
|
||||
|
@ -950,7 +953,7 @@ class Context:
|
|||
f'\n'
|
||||
f'c)=> {self.chan.uid}\n'
|
||||
f' |_[{self.dst_maddr}\n'
|
||||
f' >>{self.repr_rpc}\n'
|
||||
f' >> {self.repr_rpc}\n'
|
||||
# f' >> {self._nsf}() -> {codec}[dict]:\n\n'
|
||||
# TODO: pull msg-type from spec re #320
|
||||
)
|
||||
|
@ -2025,10 +2028,8 @@ async def open_context_from_portal(
|
|||
ctxc_from_callee: ContextCancelled|None = None
|
||||
try:
|
||||
async with (
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn,
|
||||
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
msgops.maybe_limit_plds(
|
||||
ctx=ctx,
|
||||
spec=ctx_meta.get('pld_spec'),
|
||||
|
|
|
@ -28,7 +28,10 @@ from typing import (
|
|||
from contextlib import asynccontextmanager as acm
|
||||
|
||||
from tractor.log import get_logger
|
||||
from .trionics import gather_contexts
|
||||
from .trionics import (
|
||||
gather_contexts,
|
||||
collapse_eg,
|
||||
)
|
||||
from .ipc import _connect_chan, Channel
|
||||
from ._addr import (
|
||||
UnwrappedAddress,
|
||||
|
@ -87,7 +90,6 @@ async def get_registry(
|
|||
yield regstr_ptl
|
||||
|
||||
|
||||
|
||||
@acm
|
||||
async def get_root(
|
||||
**kwargs,
|
||||
|
@ -253,9 +255,12 @@ async def find_actor(
|
|||
for addr in registry_addrs
|
||||
)
|
||||
portals: list[Portal]
|
||||
async with gather_contexts(
|
||||
mngrs=maybe_portals,
|
||||
) as portals:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
gather_contexts(
|
||||
mngrs=maybe_portals,
|
||||
) as portals,
|
||||
):
|
||||
# log.runtime(
|
||||
# 'Gathered portals:\n'
|
||||
# f'{portals}'
|
||||
|
|
|
@ -39,7 +39,10 @@ import warnings
|
|||
|
||||
import trio
|
||||
|
||||
from .trionics import maybe_open_nursery
|
||||
from .trionics import (
|
||||
maybe_open_nursery,
|
||||
collapse_eg,
|
||||
)
|
||||
from ._state import (
|
||||
current_actor,
|
||||
)
|
||||
|
@ -583,14 +586,13 @@ async def open_portal(
|
|||
assert actor
|
||||
was_connected: bool = False
|
||||
|
||||
async with maybe_open_nursery(
|
||||
tn,
|
||||
shield=shield,
|
||||
strict_exception_groups=False,
|
||||
# ^XXX^ TODO? soo roll our own then ??
|
||||
# -> since we kinda want the "if only one `.exception` then
|
||||
# just raise that" interface?
|
||||
) as tn:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
maybe_open_nursery(
|
||||
tn,
|
||||
shield=shield,
|
||||
) as tn,
|
||||
):
|
||||
|
||||
if not channel.connected():
|
||||
await channel.connect()
|
||||
|
|
|
@ -37,13 +37,7 @@ import warnings
|
|||
|
||||
import trio
|
||||
|
||||
from ._runtime import (
|
||||
Actor,
|
||||
Arbiter,
|
||||
# TODO: rename and make a non-actor subtype?
|
||||
# Arbiter as Registry,
|
||||
async_main,
|
||||
)
|
||||
from . import _runtime
|
||||
from .devx import (
|
||||
debug,
|
||||
_frame_stack,
|
||||
|
@ -64,6 +58,7 @@ from ._addr import (
|
|||
)
|
||||
from .trionics import (
|
||||
is_multi_cancelled,
|
||||
collapse_eg,
|
||||
)
|
||||
from ._exceptions import (
|
||||
RuntimeFailure,
|
||||
|
@ -102,7 +97,7 @@ async def maybe_block_bp(
|
|||
):
|
||||
logger.info(
|
||||
f'Found `greenback` installed @ {maybe_mod}\n'
|
||||
'Enabling `tractor.pause_from_sync()` support!\n'
|
||||
f'Enabling `tractor.pause_from_sync()` support!\n'
|
||||
)
|
||||
os.environ['PYTHONBREAKPOINT'] = (
|
||||
'tractor.devx.debug._sync_pause_from_builtin'
|
||||
|
@ -197,9 +192,13 @@ async def open_root_actor(
|
|||
# read-only state to sublayers?
|
||||
# extra_rt_vars: dict|None = None,
|
||||
|
||||
) -> Actor:
|
||||
) -> _runtime.Actor:
|
||||
'''
|
||||
Runtime init entry point for ``tractor``.
|
||||
Initialize the `tractor` runtime by starting a "root actor" in
|
||||
a parent-most Python process.
|
||||
|
||||
All (disjoint) actor-process-trees-as-programs are created via
|
||||
this entrypoint.
|
||||
|
||||
'''
|
||||
# XXX NEVER allow nested actor-trees!
|
||||
|
@ -397,7 +396,7 @@ async def open_root_actor(
|
|||
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
|
||||
)
|
||||
|
||||
actor = Actor(
|
||||
actor = _runtime.Actor(
|
||||
name=name or 'anonymous',
|
||||
uuid=mk_uuid(),
|
||||
registry_addrs=ponged_addrs,
|
||||
|
@ -436,7 +435,8 @@ async def open_root_actor(
|
|||
# https://github.com/goodboy/tractor/pull/348
|
||||
# https://github.com/goodboy/tractor/issues/296
|
||||
|
||||
actor = Arbiter(
|
||||
# TODO: rename as `RootActor` or is that even necessary?
|
||||
actor = _runtime.Arbiter(
|
||||
name=name or 'registrar',
|
||||
uuid=mk_uuid(),
|
||||
registry_addrs=registry_addrs,
|
||||
|
@ -471,18 +471,21 @@ async def open_root_actor(
|
|||
'-> Opening new registry @ '
|
||||
+
|
||||
'\n'.join(
|
||||
f'@{addr}' for addr in reg_addrs
|
||||
f'{addr}' for addr in reg_addrs
|
||||
)
|
||||
)
|
||||
logger.info(f'{report}\n')
|
||||
|
||||
# start the actor runtime in a new task
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
|
||||
) as nursery:
|
||||
# start runtime in a bg sub-task, yield to caller.
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as root_tn,
|
||||
|
||||
# ``_runtime.async_main()`` creates an internal nursery
|
||||
# XXX, finally-footgun below?
|
||||
# -> see note on why shielding.
|
||||
# maybe_raise_from_masking_exc(),
|
||||
):
|
||||
# `_runtime.async_main()` creates an internal nursery
|
||||
# and blocks here until any underlying actor(-process)
|
||||
# tree has terminated thereby conducting so called
|
||||
# "end-to-end" structured concurrency throughout an
|
||||
|
@ -490,9 +493,9 @@ async def open_root_actor(
|
|||
# "actor runtime" primitives are SC-compat and thus all
|
||||
# transitively spawned actors/processes must be as
|
||||
# well.
|
||||
await nursery.start(
|
||||
await root_tn.start(
|
||||
partial(
|
||||
async_main,
|
||||
_runtime.async_main,
|
||||
actor,
|
||||
accept_addrs=trans_bind_addrs,
|
||||
parent_addr=None
|
||||
|
@ -540,7 +543,7 @@ async def open_root_actor(
|
|||
raise
|
||||
|
||||
finally:
|
||||
# NOTE: not sure if we'll ever need this but it's
|
||||
# NOTE/TODO?, not sure if we'll ever need this but it's
|
||||
# possibly better for even more determinism?
|
||||
# logger.cancel(
|
||||
# f'Waiting on {len(nurseries)} nurseries in root..')
|
||||
|
|
|
@ -765,7 +765,6 @@ async def _invoke(
|
|||
BaseExceptionGroup,
|
||||
BaseException,
|
||||
trio.Cancelled,
|
||||
|
||||
) as _scope_err:
|
||||
scope_err = _scope_err
|
||||
if (
|
||||
|
|
|
@ -74,6 +74,9 @@ from tractor.msg import (
|
|||
pretty_struct,
|
||||
types as msgtypes,
|
||||
)
|
||||
from .trionics import (
|
||||
collapse_eg,
|
||||
)
|
||||
from .ipc import (
|
||||
Channel,
|
||||
# IPCServer, # causes cycles atm..
|
||||
|
@ -359,7 +362,7 @@ class Actor:
|
|||
|
||||
def pformat(
|
||||
self,
|
||||
ds: str = ':',
|
||||
ds: str = ': ',
|
||||
indent: int = 0,
|
||||
privates: bool = False,
|
||||
) -> str:
|
||||
|
@ -549,6 +552,14 @@ class Actor:
|
|||
)
|
||||
raise
|
||||
|
||||
# ?TODO, factor this meth-iface into a new `.rpc` subsys primitive?
|
||||
# - _get_rpc_func(),
|
||||
# - _deliver_ctx_payload(),
|
||||
# - get_context(),
|
||||
# - start_remote_task(),
|
||||
# - cancel_rpc_tasks(),
|
||||
# - _cancel_task(),
|
||||
#
|
||||
def _get_rpc_func(self, ns, funcname):
|
||||
'''
|
||||
Try to lookup and return a target RPC func from the
|
||||
|
@ -1116,14 +1127,6 @@ class Actor:
|
|||
self._cancel_complete.set()
|
||||
return True
|
||||
|
||||
# XXX: hard kill logic if needed?
|
||||
# def _hard_mofo_kill(self):
|
||||
# # If we're the root actor or zombied kill everything
|
||||
# if self._parent_chan is None: # TODO: more robust check
|
||||
# root = trio.lowlevel.current_root_task()
|
||||
# for n in root.child_nurseries:
|
||||
# n.cancel_scope.cancel()
|
||||
|
||||
async def _cancel_task(
|
||||
self,
|
||||
cid: str,
|
||||
|
@ -1358,25 +1361,13 @@ class Actor:
|
|||
'''
|
||||
return self.accept_addrs[0]
|
||||
|
||||
def get_parent(self) -> Portal:
|
||||
'''
|
||||
Return a `Portal` to our parent.
|
||||
|
||||
'''
|
||||
assert self._parent_chan, "No parent channel for this actor?"
|
||||
return Portal(self._parent_chan)
|
||||
|
||||
def get_chans(
|
||||
self,
|
||||
uid: tuple[str, str],
|
||||
|
||||
) -> list[Channel]:
|
||||
'''
|
||||
Return all IPC channels to the actor with provided `uid`.
|
||||
|
||||
'''
|
||||
return self._ipc_server._peers[uid]
|
||||
|
||||
# TODO, this should delegate ONLY to the
|
||||
# `._spawn_spec._runtime_vars: dict` / `._state` APIs?
|
||||
#
|
||||
# XXX, AH RIGHT that's why..
|
||||
# it's bc we pass this as a CLI flag to the child.py precisely
|
||||
# bc we need the bootstrapping pre `async_main()`.. but maybe
|
||||
# keep this as an impl deat and not part of the pub iface impl?
|
||||
def is_infected_aio(self) -> bool:
|
||||
'''
|
||||
If `True`, this actor is running `trio` in guest mode on
|
||||
|
@ -1387,6 +1378,23 @@ class Actor:
|
|||
'''
|
||||
return self._infected_aio
|
||||
|
||||
# ?TODO, is this the right type for this method?
|
||||
def get_parent(self) -> Portal:
|
||||
'''
|
||||
Return a `Portal` to our parent.
|
||||
|
||||
'''
|
||||
assert self._parent_chan, "No parent channel for this actor?"
|
||||
return Portal(self._parent_chan)
|
||||
|
||||
# XXX: hard kill logic if needed?
|
||||
# def _hard_mofo_kill(self):
|
||||
# # If we're the root actor or zombied kill everything
|
||||
# if self._parent_chan is None: # TODO: more robust check
|
||||
# root = trio.lowlevel.current_root_task()
|
||||
# for n in root.child_nurseries:
|
||||
# n.cancel_scope.cancel()
|
||||
|
||||
|
||||
async def async_main(
|
||||
actor: Actor,
|
||||
|
@ -1466,17 +1474,18 @@ async def async_main(
|
|||
# parent is kept alive as a resilient service until
|
||||
# cancellation steps have (mostly) occurred in
|
||||
# a deterministic way.
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as root_nursery:
|
||||
actor._root_n = root_nursery
|
||||
root_tn: trio.Nursery
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as root_tn,
|
||||
):
|
||||
actor._root_n = root_tn
|
||||
assert actor._root_n
|
||||
|
||||
ipc_server: _server.IPCServer
|
||||
async with (
|
||||
trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as service_nursery,
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as service_nursery,
|
||||
_server.open_ipc_server(
|
||||
parent_tn=service_nursery,
|
||||
stream_handler_tn=service_nursery,
|
||||
|
@ -1600,7 +1609,7 @@ async def async_main(
|
|||
# start processing parent requests until our channel
|
||||
# server is 100% up and running.
|
||||
if actor._parent_chan:
|
||||
await root_nursery.start(
|
||||
await root_tn.start(
|
||||
partial(
|
||||
_rpc.process_messages,
|
||||
chan=actor._parent_chan,
|
||||
|
|
|
@ -44,6 +44,7 @@ from ._runtime import Actor
|
|||
from ._portal import Portal
|
||||
from .trionics import (
|
||||
is_multi_cancelled,
|
||||
collapse_eg,
|
||||
)
|
||||
from ._exceptions import (
|
||||
ContextCancelled,
|
||||
|
@ -326,9 +327,10 @@ class ActorNursery:
|
|||
server: IPCServer = self._actor.ipc_server
|
||||
|
||||
with trio.move_on_after(3) as cs:
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
) as tn:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
|
||||
subactor: Actor
|
||||
proc: trio.Process
|
||||
|
@ -421,10 +423,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
# `ActorNursery.start_actor()`).
|
||||
|
||||
# errors from this daemon actor nursery bubble up to caller
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
|
||||
) as da_nursery:
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as da_nursery,
|
||||
):
|
||||
try:
|
||||
# This is the inner level "run in actor" nursery. It is
|
||||
# awaited first since actors spawned in this way (using
|
||||
|
@ -434,11 +436,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
|||
# immediately raised for handling by a supervisor strategy.
|
||||
# As such if the strategy propagates any error(s) upwards
|
||||
# the above "daemon actor" nursery will be notified.
|
||||
async with trio.open_nursery(
|
||||
strict_exception_groups=False,
|
||||
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
|
||||
) as ria_nursery:
|
||||
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as ria_nursery,
|
||||
):
|
||||
an = ActorNursery(
|
||||
actor,
|
||||
ria_nursery,
|
||||
|
|
|
@ -31,7 +31,7 @@ from ._broadcast import (
|
|||
)
|
||||
from ._beg import (
|
||||
collapse_eg as collapse_eg,
|
||||
maybe_collapse_eg as maybe_collapse_eg,
|
||||
get_collapsed_eg as get_collapsed_eg,
|
||||
is_multi_cancelled as is_multi_cancelled,
|
||||
)
|
||||
from ._taskc import (
|
||||
|
|
|
@ -15,8 +15,9 @@
|
|||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
`BaseExceptionGroup` related utils and helpers pertaining to
|
||||
first-class-`trio` from a historical perspective B)
|
||||
`BaseExceptionGroup` utils and helpers pertaining to
|
||||
first-class-`trio` from a "historical" perspective, like "loose
|
||||
exception group" task-nurseries.
|
||||
|
||||
'''
|
||||
from contextlib import (
|
||||
|
@ -24,27 +25,84 @@ from contextlib import (
|
|||
)
|
||||
from typing import (
|
||||
Literal,
|
||||
Type,
|
||||
)
|
||||
|
||||
import trio
|
||||
# from trio._core._concat_tb import (
|
||||
# concat_tb,
|
||||
# )
|
||||
|
||||
|
||||
def maybe_collapse_eg(
|
||||
beg: BaseExceptionGroup,
|
||||
# XXX NOTE
|
||||
# taken verbatim from `trio._core._run` except,
|
||||
# - remove the NONSTRICT_EXCEPTIONGROUP_NOTE deprecation-note
|
||||
# guard-check; we know we want an explicit collapse.
|
||||
# - mask out tb rewriting in collapse case, i don't think it really
|
||||
# matters?
|
||||
#
|
||||
def collapse_exception_group(
|
||||
excgroup: BaseExceptionGroup[BaseException],
|
||||
) -> BaseException:
|
||||
"""Recursively collapse any single-exception groups into that single contained
|
||||
exception.
|
||||
|
||||
"""
|
||||
exceptions = list(excgroup.exceptions)
|
||||
modified = False
|
||||
for i, exc in enumerate(exceptions):
|
||||
if isinstance(exc, BaseExceptionGroup):
|
||||
new_exc = collapse_exception_group(exc)
|
||||
if new_exc is not exc:
|
||||
modified = True
|
||||
exceptions[i] = new_exc
|
||||
|
||||
if (
|
||||
len(exceptions) == 1
|
||||
and isinstance(excgroup, BaseExceptionGroup)
|
||||
|
||||
# XXX trio's loose-setting condition..
|
||||
# and NONSTRICT_EXCEPTIONGROUP_NOTE in getattr(excgroup, "__notes__", ())
|
||||
):
|
||||
# exceptions[0].__traceback__ = concat_tb(
|
||||
# excgroup.__traceback__,
|
||||
# exceptions[0].__traceback__,
|
||||
# )
|
||||
return exceptions[0]
|
||||
elif modified:
|
||||
return excgroup.derive(exceptions)
|
||||
else:
|
||||
return excgroup
|
||||
|
||||
|
||||
def get_collapsed_eg(
|
||||
beg: BaseExceptionGroup,
|
||||
|
||||
) -> BaseException|None:
|
||||
'''
|
||||
If the input beg can collapse to a single non-eg sub-exception,
|
||||
return it instead.
|
||||
If the input beg can collapse to a single sub-exception which is
|
||||
itself **not** an eg, return it.
|
||||
|
||||
'''
|
||||
if len(excs := beg.exceptions) == 1:
|
||||
return excs[0]
|
||||
maybe_exc = collapse_exception_group(beg)
|
||||
if maybe_exc is beg:
|
||||
return None
|
||||
|
||||
return beg
|
||||
return maybe_exc
|
||||
|
||||
|
||||
@acm
|
||||
async def collapse_eg():
|
||||
async def collapse_eg(
|
||||
hide_tb: bool = True,
|
||||
|
||||
# XXX, for ex. will always show begs containing single taskc
|
||||
ignore: set[Type[BaseException]] = {
|
||||
# trio.Cancelled,
|
||||
},
|
||||
add_notes: bool = True,
|
||||
|
||||
bp: bool = False,
|
||||
):
|
||||
'''
|
||||
If `BaseExceptionGroup` raised in the body scope is
|
||||
"collapse-able" (in the same way that
|
||||
|
@ -52,15 +110,58 @@ async def collapse_eg():
|
|||
only raise the lone emedded non-eg in in place.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
try:
|
||||
yield
|
||||
except* BaseException as beg:
|
||||
if (
|
||||
exc := maybe_collapse_eg(beg)
|
||||
) is not beg:
|
||||
raise exc
|
||||
except BaseExceptionGroup as _beg:
|
||||
beg = _beg
|
||||
|
||||
raise beg
|
||||
if (
|
||||
bp
|
||||
and
|
||||
len(beg.exceptions) > 1
|
||||
):
|
||||
import tractor
|
||||
if tractor.current_actor(
|
||||
err_on_no_runtime=False,
|
||||
):
|
||||
await tractor.pause(shield=True)
|
||||
else:
|
||||
breakpoint()
|
||||
|
||||
if (
|
||||
(exc := get_collapsed_eg(beg))
|
||||
and
|
||||
type(exc) not in ignore
|
||||
):
|
||||
|
||||
# TODO? report number of nested groups it was collapsed
|
||||
# *from*?
|
||||
if add_notes:
|
||||
from_group_note: str = (
|
||||
'( ^^^ this exc was collapsed from a group ^^^ )\n'
|
||||
)
|
||||
if (
|
||||
from_group_note
|
||||
not in
|
||||
getattr(exc, "__notes__", ())
|
||||
):
|
||||
exc.add_note(from_group_note)
|
||||
|
||||
# raise exc
|
||||
# ^^ this will leave the orig beg tb above with the
|
||||
# "during the handling of <beg> the following.."
|
||||
# So, instead do..
|
||||
#
|
||||
if cause := exc.__cause__:
|
||||
raise exc from cause
|
||||
else:
|
||||
# suppress "during handling of <the beg>"
|
||||
# output in tb/console.
|
||||
raise exc from None
|
||||
|
||||
# keep original
|
||||
raise # beg
|
||||
|
||||
|
||||
def is_multi_cancelled(
|
||||
|
|
Loading…
Reference in New Issue