Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 8c90521562 Just import `._runtime` ns in `._root`; be a bit more explicit 2025-06-16 15:37:21 -04:00
Tyler Goodlet f23ee3cd22 Use collapse in `._root.open_root_actor()` too
Seems to add one more cancellation suite failure as well as now cause
the discovery test to error instead of fail?
2025-06-16 15:34:06 -04:00
Tyler Goodlet 9295af929c Use collapser around root tn in `.async_main()`
Seems to cause the following test suites to fail however..

- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_advanced_faults.py::test_ipc_channel_break_during_stream'
- 'test_clustering.py::test_empty_mngrs_input_raises'

Also tweak some ctxc request logging content.
2025-06-16 14:27:07 -04:00
Tyler Goodlet 83f53fd0c5 Drop msging-err patt from `subactor_breakpoint` ex
Since the `bdb` module was added to the namespace lookup set in
`._exceptions.get_err_type()` we can now relay a RAE-boxed
`bdb.BdbQuit`.
2025-06-16 13:50:39 -04:00
Tyler Goodlet 9b3af1fa16 Switch to strict-eg nurseries almost everywhere
That is just throughout the core library, not the tests yet. Again, we
simply change over to using our (nearly equivalent?)
`.trionics.collapse_eg()` in place of the already deprecated
`strict_exception_groups=False` flag in the following internals,
- the conc-fan-out tn use in `._discovery.find_actor()`.
- `._portal.open_portal()`'s internal tn used to spawn a bg rpc-msg-loop
  task.
- the daemon and "run-in-actor" layered tn pair allocated in
  `._supervise._open_and_supervise_one_cancels_all_nursery()`.

The remaining loose-eg usage in `._root` and `._runtime` seem to be
necessary to keep the test suite green?? For the moment these are left
out.
2025-06-16 13:23:54 -04:00
Tyler Goodlet f8e4d12494 Use collapser in rent side of `Context` 2025-06-16 11:22:50 -04:00
Tyler Goodlet af3c14b250 Flip to `collapse_eg()` use in `.trionics.gather_contexts()` 2025-06-15 23:29:13 -04:00
Tyler Goodlet 7de7fd0afd Use `Channel.aid: Aid` throughout `.ipc._server` 2025-06-15 22:05:51 -04:00
Tyler Goodlet 79888a31a4 Report `enable_stack_on_sig` on `stackscope` import failure 2025-06-15 22:05:17 -04:00
Tyler Goodlet de16a9ac6f Drop stale comment from inter-peer suite 2025-06-15 22:04:01 -04:00
12 changed files with 90 additions and 79 deletions

View File

@ -310,7 +310,6 @@ def test_subactor_breakpoint(
assert in_prompt_msg( assert in_prompt_msg(
child, [ child, [
'MessagingError:',
'RemoteActorError:', 'RemoteActorError:',
"('breakpoint_forever'", "('breakpoint_forever'",
'bdb.BdbQuit', 'bdb.BdbQuit',

View File

@ -410,7 +410,6 @@ def test_peer_canceller(
''' '''
async def main(): async def main():
async with tractor.open_nursery( async with tractor.open_nursery(
# NOTE: to halt the peer tasks on ctxc, uncomment this.
debug_mode=debug_mode, debug_mode=debug_mode,
) as an: ) as an:
canceller: Portal = await an.start_actor( canceller: Portal = await an.start_actor(

View File

@ -101,6 +101,9 @@ from ._state import (
debug_mode, debug_mode,
_ctxvar_Context, _ctxvar_Context,
) )
from .trionics import (
collapse_eg,
)
# ------ - ------ # ------ - ------
if TYPE_CHECKING: if TYPE_CHECKING:
from ._portal import Portal from ._portal import Portal
@ -940,7 +943,7 @@ class Context:
self.cancel_called = True self.cancel_called = True
header: str = ( header: str = (
f'Cancelling ctx from {side.upper()}-side\n' f'Cancelling ctx from {side!r}-side\n'
) )
reminfo: str = ( reminfo: str = (
# ' =>\n' # ' =>\n'
@ -948,7 +951,7 @@ class Context:
f'\n' f'\n'
f'c)=> {self.chan.uid}\n' f'c)=> {self.chan.uid}\n'
f' |_[{self.dst_maddr}\n' f' |_[{self.dst_maddr}\n'
f' >>{self.repr_rpc}\n' f' >> {self.repr_rpc}\n'
# f' >> {self._nsf}() -> {codec}[dict]:\n\n' # f' >> {self._nsf}() -> {codec}[dict]:\n\n'
# TODO: pull msg-type from spec re #320 # TODO: pull msg-type from spec re #320
) )
@ -2023,10 +2026,8 @@ async def open_context_from_portal(
ctxc_from_callee: ContextCancelled|None = None ctxc_from_callee: ContextCancelled|None = None
try: try:
async with ( async with (
trio.open_nursery( collapse_eg(),
strict_exception_groups=False, trio.open_nursery() as tn,
) as tn,
msgops.maybe_limit_plds( msgops.maybe_limit_plds(
ctx=ctx, ctx=ctx,
spec=ctx_meta.get('pld_spec'), spec=ctx_meta.get('pld_spec'),

View File

@ -28,7 +28,10 @@ from typing import (
from contextlib import asynccontextmanager as acm from contextlib import asynccontextmanager as acm
from tractor.log import get_logger from tractor.log import get_logger
from .trionics import gather_contexts from .trionics import (
gather_contexts,
collapse_eg,
)
from .ipc import _connect_chan, Channel from .ipc import _connect_chan, Channel
from ._addr import ( from ._addr import (
UnwrappedAddress, UnwrappedAddress,
@ -88,7 +91,6 @@ async def get_registry(
yield regstr_ptl yield regstr_ptl
@acm @acm
async def get_root( async def get_root(
**kwargs, **kwargs,
@ -249,9 +251,12 @@ async def find_actor(
for addr in registry_addrs for addr in registry_addrs
) )
portals: list[Portal] portals: list[Portal]
async with gather_contexts( async with (
collapse_eg(),
gather_contexts(
mngrs=maybe_portals, mngrs=maybe_portals,
) as portals: ) as portals,
):
# log.runtime( # log.runtime(
# 'Gathered portals:\n' # 'Gathered portals:\n'
# f'{portals}' # f'{portals}'

View File

@ -39,7 +39,10 @@ import warnings
import trio import trio
from .trionics import maybe_open_nursery from .trionics import (
maybe_open_nursery,
collapse_eg,
)
from ._state import ( from ._state import (
current_actor, current_actor,
) )
@ -558,14 +561,13 @@ async def open_portal(
assert actor assert actor
was_connected: bool = False was_connected: bool = False
async with maybe_open_nursery( async with (
collapse_eg(),
maybe_open_nursery(
tn, tn,
shield=shield, shield=shield,
strict_exception_groups=False, ) as tn,
# ^XXX^ TODO? soo roll our own then ?? ):
# -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
) as tn:
if not channel.connected(): if not channel.connected():
await channel.connect() await channel.connect()

View File

@ -37,13 +37,7 @@ import warnings
import trio import trio
from ._runtime import ( from . import _runtime
Actor,
Arbiter,
# TODO: rename and make a non-actor subtype?
# Arbiter as Registry,
async_main,
)
from .devx import ( from .devx import (
debug, debug,
_frame_stack, _frame_stack,
@ -64,6 +58,7 @@ from ._addr import (
) )
from .trionics import ( from .trionics import (
is_multi_cancelled, is_multi_cancelled,
collapse_eg,
) )
from ._exceptions import ( from ._exceptions import (
RuntimeFailure, RuntimeFailure,
@ -197,9 +192,13 @@ async def open_root_actor(
# read-only state to sublayers? # read-only state to sublayers?
# extra_rt_vars: dict|None = None, # extra_rt_vars: dict|None = None,
) -> Actor: ) -> _runtime.Actor:
''' '''
Runtime init entry point for ``tractor``. Initialize the `tractor` runtime by starting a "root actor" in
a parent-most Python process.
All (disjoint) actor-process-trees-as-programs are created via
this entrypoint.
''' '''
# XXX NEVER allow nested actor-trees! # XXX NEVER allow nested actor-trees!
@ -379,7 +378,7 @@ async def open_root_actor(
f'Registry(s) seem(s) to exist @ {ponged_addrs}' f'Registry(s) seem(s) to exist @ {ponged_addrs}'
) )
actor = Actor( actor = _runtime.Actor(
name=name or 'anonymous', name=name or 'anonymous',
uuid=mk_uuid(), uuid=mk_uuid(),
registry_addrs=ponged_addrs, registry_addrs=ponged_addrs,
@ -414,7 +413,8 @@ async def open_root_actor(
# https://github.com/goodboy/tractor/pull/348 # https://github.com/goodboy/tractor/pull/348
# https://github.com/goodboy/tractor/issues/296 # https://github.com/goodboy/tractor/issues/296
actor = Arbiter( # TODO: rename as `RootActor` or is that even necessary?
actor = _runtime.Arbiter(
name=name or 'registrar', name=name or 'registrar',
uuid=mk_uuid(), uuid=mk_uuid(),
registry_addrs=registry_addrs, registry_addrs=registry_addrs,
@ -441,13 +441,13 @@ async def open_root_actor(
f'{ml_addrs_str}' f'{ml_addrs_str}'
) )
# start the actor runtime in a new task # start runtime in a bg sub-task, yield to caller.
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
# ^XXX^ TODO? instead unpack any RAE as per "loose" style? trio.open_nursery() as root_tn,
) as nursery: ):
# ``_runtime.async_main()`` creates an internal nursery # `_runtime.async_main()` creates an internal nursery
# and blocks here until any underlying actor(-process) # and blocks here until any underlying actor(-process)
# tree has terminated thereby conducting so called # tree has terminated thereby conducting so called
# "end-to-end" structured concurrency throughout an # "end-to-end" structured concurrency throughout an
@ -455,9 +455,9 @@ async def open_root_actor(
# "actor runtime" primitives are SC-compat and thus all # "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as # transitively spawned actors/processes must be as
# well. # well.
await nursery.start( await root_tn.start(
partial( partial(
async_main, _runtime.async_main,
actor, actor,
accept_addrs=trans_bind_addrs, accept_addrs=trans_bind_addrs,
parent_addr=None parent_addr=None

View File

@ -756,7 +756,6 @@ async def _invoke(
BaseExceptionGroup, BaseExceptionGroup,
BaseException, BaseException,
trio.Cancelled, trio.Cancelled,
) as _scope_err: ) as _scope_err:
scope_err = _scope_err scope_err = _scope_err
if ( if (

View File

@ -74,6 +74,9 @@ from tractor.msg import (
pretty_struct, pretty_struct,
types as msgtypes, types as msgtypes,
) )
from .trionics import (
collapse_eg,
)
from .ipc import ( from .ipc import (
Channel, Channel,
# IPCServer, # causes cycles atm.. # IPCServer, # causes cycles atm..
@ -345,7 +348,7 @@ class Actor:
def pformat( def pformat(
self, self,
ds: str = ':', ds: str = ': ',
indent: int = 0, indent: int = 0,
) -> str: ) -> str:
fields_sect_prefix: str = ' |_' fields_sect_prefix: str = ' |_'
@ -1054,6 +1057,7 @@ class Actor:
cid: str, cid: str,
parent_chan: Channel, parent_chan: Channel,
requesting_uid: tuple[str, str]|None, requesting_uid: tuple[str, str]|None,
# ^^TODO! use the `Aid` directly here!
ipc_msg: dict|None|bool = False, ipc_msg: dict|None|bool = False,
@ -1099,9 +1103,12 @@ class Actor:
log.cancel( log.cancel(
'Rxed cancel request for RPC task\n' 'Rxed cancel request for RPC task\n'
f'<=c) {requesting_uid}\n' f'{ctx._task!r} <=c) {requesting_uid}\n'
f' |_{ctx._task}\n' f'|_>> {ctx.repr_rpc}\n'
f' >> {ctx.repr_rpc}\n'
# f'|_{ctx._task}\n'
# f' >> {ctx.repr_rpc}\n'
# f'=> {ctx._task}\n' # f'=> {ctx._task}\n'
# f' >> Actor._cancel_task() => {ctx._task}\n' # f' >> Actor._cancel_task() => {ctx._task}\n'
# f' |_ {ctx._task}\n\n' # f' |_ {ctx._task}\n\n'
@ -1386,10 +1393,12 @@ async def async_main(
# parent is kept alive as a resilient service until # parent is kept alive as a resilient service until
# cancellation steps have (mostly) occurred in # cancellation steps have (mostly) occurred in
# a deterministic way. # a deterministic way.
async with trio.open_nursery( root_tn: trio.Nursery
strict_exception_groups=False, async with (
) as root_nursery: collapse_eg(),
actor._root_n = root_nursery trio.open_nursery() as root_tn,
):
actor._root_n = root_tn
assert actor._root_n assert actor._root_n
ipc_server: _server.IPCServer ipc_server: _server.IPCServer
@ -1488,7 +1497,7 @@ async def async_main(
# their root actor over that channel. # their root actor over that channel.
if _state._runtime_vars['_is_root']: if _state._runtime_vars['_is_root']:
for addr in accept_addrs: for addr in accept_addrs:
waddr = wrap_address(addr) waddr: Address = wrap_address(addr)
if waddr == waddr.get_root(): if waddr == waddr.get_root():
_state._runtime_vars['_root_mailbox'] = addr _state._runtime_vars['_root_mailbox'] = addr
@ -1533,7 +1542,7 @@ async def async_main(
# start processing parent requests until our channel # start processing parent requests until our channel
# server is 100% up and running. # server is 100% up and running.
if actor._parent_chan: if actor._parent_chan:
await root_nursery.start( await root_tn.start(
partial( partial(
_rpc.process_messages, _rpc.process_messages,
chan=actor._parent_chan, chan=actor._parent_chan,

View File

@ -42,6 +42,7 @@ from ._runtime import Actor
from ._portal import Portal from ._portal import Portal
from .trionics import ( from .trionics import (
is_multi_cancelled, is_multi_cancelled,
collapse_eg,
) )
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
@ -324,9 +325,10 @@ class ActorNursery:
server: IPCServer = self._actor.ipc_server server: IPCServer = self._actor.ipc_server
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
) as tn: trio.open_nursery() as tn,
):
subactor: Actor subactor: Actor
proc: trio.Process proc: trio.Process
@ -419,10 +421,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
# `ActorNursery.start_actor()`). # `ActorNursery.start_actor()`).
# errors from this daemon actor nursery bubble up to caller # errors from this daemon actor nursery bubble up to caller
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
# ^XXX^ TODO? instead unpack any RAE as per "loose" style? trio.open_nursery() as da_nursery,
) as da_nursery: ):
try: try:
# This is the inner level "run in actor" nursery. It is # This is the inner level "run in actor" nursery. It is
# awaited first since actors spawned in this way (using # awaited first since actors spawned in this way (using
@ -432,11 +434,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
# immediately raised for handling by a supervisor strategy. # immediately raised for handling by a supervisor strategy.
# As such if the strategy propagates any error(s) upwards # As such if the strategy propagates any error(s) upwards
# the above "daemon actor" nursery will be notified. # the above "daemon actor" nursery will be notified.
async with trio.open_nursery( async with (
strict_exception_groups=False, collapse_eg(),
# ^XXX^ TODO? instead unpack any RAE as per "loose" style? trio.open_nursery() as ria_nursery,
) as ria_nursery: ):
an = ActorNursery( an = ActorNursery(
actor, actor,
ria_nursery, ria_nursery,

View File

@ -238,7 +238,8 @@ def enable_stack_on_sig(
import stackscope import stackscope
except ImportError: except ImportError:
log.error( log.error(
'`stackscope` not installed for use in debug mode!' '`stackscope` not installed for use in debug mode!\n'
'`Ignoring {enable_stack_on_sig!r} call!\n'
) )
return None return None

View File

@ -143,7 +143,7 @@ async def maybe_wait_on_canced_subs(
log.cancel( log.cancel(
'Waiting on cancel request to peer..\n' 'Waiting on cancel request to peer..\n'
f'c)=>\n' f'c)=>\n'
f' |_{chan.uid}\n' f' |_{chan.aid}\n'
) )
# XXX: this is a soft wait on the channel (and its # XXX: this is a soft wait on the channel (and its
@ -156,7 +156,7 @@ async def maybe_wait_on_canced_subs(
# local runtime here is now cancelled while # local runtime here is now cancelled while
# (presumably) in the middle of msg loop processing. # (presumably) in the middle of msg loop processing.
chan_info: str = ( chan_info: str = (
f'{chan.uid}\n' f'{chan.aid}\n'
f'|_{chan}\n' f'|_{chan}\n'
f' |_{chan.transport}\n\n' f' |_{chan.transport}\n\n'
) )
@ -279,7 +279,7 @@ async def maybe_wait_on_canced_subs(
log.runtime( log.runtime(
f'Peer IPC broke but subproc is alive?\n\n' f'Peer IPC broke but subproc is alive?\n\n'
f'<=x {chan.uid}@{chan.raddr}\n' f'<=x {chan.aid}@{chan.raddr}\n'
f' |_{proc}\n' f' |_{proc}\n'
) )
@ -460,7 +460,7 @@ async def handle_stream_from_peer(
# drop ref to channel so it can be gc-ed and disconnected # drop ref to channel so it can be gc-ed and disconnected
con_teardown_status: str = ( con_teardown_status: str = (
f'IPC channel disconnected:\n' f'IPC channel disconnected:\n'
f'<=x uid: {chan.uid}\n' f'<=x uid: {chan.aid}\n'
f' |_{pformat(chan)}\n\n' f' |_{pformat(chan)}\n\n'
) )
chans.remove(chan) chans.remove(chan)
@ -468,7 +468,7 @@ async def handle_stream_from_peer(
# TODO: do we need to be this pedantic? # TODO: do we need to be this pedantic?
if not chans: if not chans:
con_teardown_status += ( con_teardown_status += (
f'-> No more channels with {chan.uid}' f'-> No more channels with {chan.aid}'
) )
server._peers.pop(uid, None) server._peers.pop(uid, None)
@ -519,7 +519,7 @@ async def handle_stream_from_peer(
and and
(ctx_in_debug := pdb_lock.ctx_in_debug) (ctx_in_debug := pdb_lock.ctx_in_debug)
and and
(pdb_user_uid := ctx_in_debug.chan.uid) (pdb_user_uid := ctx_in_debug.chan.aid)
): ):
entry: tuple|None = local_nursery._children.get( entry: tuple|None = local_nursery._children.get(
tuple(pdb_user_uid) tuple(pdb_user_uid)

View File

@ -40,7 +40,7 @@ from typing import (
import trio import trio
from tractor._state import current_actor from tractor._state import current_actor
from tractor.log import get_logger from tractor.log import get_logger
# from ._beg import collapse_eg from ._beg import collapse_eg
if TYPE_CHECKING: if TYPE_CHECKING:
@ -151,13 +151,8 @@ async def gather_contexts(
) )
async with ( async with (
# collapse_eg(), collapse_eg(),
trio.open_nursery( trio.open_nursery() as tn,
# strict_exception_groups=False,
# ^XXX^ TODO? soo roll our own then ??
# -> since we kinda want the "if only one `.exception` then
# just raise that" interface?
) as tn,
): ):
for mngr in mngrs: for mngr in mngrs:
tn.start_soon( tn.start_soon(