Compare commits
10 Commits
4bc443ccae
...
8c90521562
Author | SHA1 | Date |
---|---|---|
|
8c90521562 | |
|
f23ee3cd22 | |
|
9295af929c | |
|
83f53fd0c5 | |
|
9b3af1fa16 | |
|
f8e4d12494 | |
|
af3c14b250 | |
|
7de7fd0afd | |
|
79888a31a4 | |
|
de16a9ac6f |
|
@ -310,7 +310,6 @@ def test_subactor_breakpoint(
|
||||||
|
|
||||||
assert in_prompt_msg(
|
assert in_prompt_msg(
|
||||||
child, [
|
child, [
|
||||||
'MessagingError:',
|
|
||||||
'RemoteActorError:',
|
'RemoteActorError:',
|
||||||
"('breakpoint_forever'",
|
"('breakpoint_forever'",
|
||||||
'bdb.BdbQuit',
|
'bdb.BdbQuit',
|
||||||
|
|
|
@ -410,7 +410,6 @@ def test_peer_canceller(
|
||||||
'''
|
'''
|
||||||
async def main():
|
async def main():
|
||||||
async with tractor.open_nursery(
|
async with tractor.open_nursery(
|
||||||
# NOTE: to halt the peer tasks on ctxc, uncomment this.
|
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
) as an:
|
) as an:
|
||||||
canceller: Portal = await an.start_actor(
|
canceller: Portal = await an.start_actor(
|
||||||
|
|
|
@ -101,6 +101,9 @@ from ._state import (
|
||||||
debug_mode,
|
debug_mode,
|
||||||
_ctxvar_Context,
|
_ctxvar_Context,
|
||||||
)
|
)
|
||||||
|
from .trionics import (
|
||||||
|
collapse_eg,
|
||||||
|
)
|
||||||
# ------ - ------
|
# ------ - ------
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
|
@ -940,7 +943,7 @@ class Context:
|
||||||
self.cancel_called = True
|
self.cancel_called = True
|
||||||
|
|
||||||
header: str = (
|
header: str = (
|
||||||
f'Cancelling ctx from {side.upper()}-side\n'
|
f'Cancelling ctx from {side!r}-side\n'
|
||||||
)
|
)
|
||||||
reminfo: str = (
|
reminfo: str = (
|
||||||
# ' =>\n'
|
# ' =>\n'
|
||||||
|
@ -2023,10 +2026,8 @@ async def open_context_from_portal(
|
||||||
ctxc_from_callee: ContextCancelled|None = None
|
ctxc_from_callee: ContextCancelled|None = None
|
||||||
try:
|
try:
|
||||||
async with (
|
async with (
|
||||||
trio.open_nursery(
|
collapse_eg(),
|
||||||
strict_exception_groups=False,
|
trio.open_nursery() as tn,
|
||||||
) as tn,
|
|
||||||
|
|
||||||
msgops.maybe_limit_plds(
|
msgops.maybe_limit_plds(
|
||||||
ctx=ctx,
|
ctx=ctx,
|
||||||
spec=ctx_meta.get('pld_spec'),
|
spec=ctx_meta.get('pld_spec'),
|
||||||
|
|
|
@ -28,7 +28,10 @@ from typing import (
|
||||||
from contextlib import asynccontextmanager as acm
|
from contextlib import asynccontextmanager as acm
|
||||||
|
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
from .trionics import gather_contexts
|
from .trionics import (
|
||||||
|
gather_contexts,
|
||||||
|
collapse_eg,
|
||||||
|
)
|
||||||
from .ipc import _connect_chan, Channel
|
from .ipc import _connect_chan, Channel
|
||||||
from ._addr import (
|
from ._addr import (
|
||||||
UnwrappedAddress,
|
UnwrappedAddress,
|
||||||
|
@ -88,7 +91,6 @@ async def get_registry(
|
||||||
yield regstr_ptl
|
yield regstr_ptl
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def get_root(
|
async def get_root(
|
||||||
**kwargs,
|
**kwargs,
|
||||||
|
@ -249,9 +251,12 @@ async def find_actor(
|
||||||
for addr in registry_addrs
|
for addr in registry_addrs
|
||||||
)
|
)
|
||||||
portals: list[Portal]
|
portals: list[Portal]
|
||||||
async with gather_contexts(
|
async with (
|
||||||
|
collapse_eg(),
|
||||||
|
gather_contexts(
|
||||||
mngrs=maybe_portals,
|
mngrs=maybe_portals,
|
||||||
) as portals:
|
) as portals,
|
||||||
|
):
|
||||||
# log.runtime(
|
# log.runtime(
|
||||||
# 'Gathered portals:\n'
|
# 'Gathered portals:\n'
|
||||||
# f'{portals}'
|
# f'{portals}'
|
||||||
|
|
|
@ -39,7 +39,10 @@ import warnings
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from .trionics import maybe_open_nursery
|
from .trionics import (
|
||||||
|
maybe_open_nursery,
|
||||||
|
collapse_eg,
|
||||||
|
)
|
||||||
from ._state import (
|
from ._state import (
|
||||||
current_actor,
|
current_actor,
|
||||||
)
|
)
|
||||||
|
@ -558,14 +561,13 @@ async def open_portal(
|
||||||
assert actor
|
assert actor
|
||||||
was_connected: bool = False
|
was_connected: bool = False
|
||||||
|
|
||||||
async with maybe_open_nursery(
|
async with (
|
||||||
|
collapse_eg(),
|
||||||
|
maybe_open_nursery(
|
||||||
tn,
|
tn,
|
||||||
shield=shield,
|
shield=shield,
|
||||||
strict_exception_groups=False,
|
) as tn,
|
||||||
# ^XXX^ TODO? soo roll our own then ??
|
):
|
||||||
# -> since we kinda want the "if only one `.exception` then
|
|
||||||
# just raise that" interface?
|
|
||||||
) as tn:
|
|
||||||
|
|
||||||
if not channel.connected():
|
if not channel.connected():
|
||||||
await channel.connect()
|
await channel.connect()
|
||||||
|
|
|
@ -37,13 +37,7 @@ import warnings
|
||||||
|
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
from ._runtime import (
|
from . import _runtime
|
||||||
Actor,
|
|
||||||
Arbiter,
|
|
||||||
# TODO: rename and make a non-actor subtype?
|
|
||||||
# Arbiter as Registry,
|
|
||||||
async_main,
|
|
||||||
)
|
|
||||||
from .devx import (
|
from .devx import (
|
||||||
debug,
|
debug,
|
||||||
_frame_stack,
|
_frame_stack,
|
||||||
|
@ -64,6 +58,7 @@ from ._addr import (
|
||||||
)
|
)
|
||||||
from .trionics import (
|
from .trionics import (
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
|
collapse_eg,
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
RuntimeFailure,
|
RuntimeFailure,
|
||||||
|
@ -197,9 +192,13 @@ async def open_root_actor(
|
||||||
# read-only state to sublayers?
|
# read-only state to sublayers?
|
||||||
# extra_rt_vars: dict|None = None,
|
# extra_rt_vars: dict|None = None,
|
||||||
|
|
||||||
) -> Actor:
|
) -> _runtime.Actor:
|
||||||
'''
|
'''
|
||||||
Runtime init entry point for ``tractor``.
|
Initialize the `tractor` runtime by starting a "root actor" in
|
||||||
|
a parent-most Python process.
|
||||||
|
|
||||||
|
All (disjoint) actor-process-trees-as-programs are created via
|
||||||
|
this entrypoint.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
# XXX NEVER allow nested actor-trees!
|
# XXX NEVER allow nested actor-trees!
|
||||||
|
@ -379,7 +378,7 @@ async def open_root_actor(
|
||||||
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
|
f'Registry(s) seem(s) to exist @ {ponged_addrs}'
|
||||||
)
|
)
|
||||||
|
|
||||||
actor = Actor(
|
actor = _runtime.Actor(
|
||||||
name=name or 'anonymous',
|
name=name or 'anonymous',
|
||||||
uuid=mk_uuid(),
|
uuid=mk_uuid(),
|
||||||
registry_addrs=ponged_addrs,
|
registry_addrs=ponged_addrs,
|
||||||
|
@ -414,7 +413,8 @@ async def open_root_actor(
|
||||||
# https://github.com/goodboy/tractor/pull/348
|
# https://github.com/goodboy/tractor/pull/348
|
||||||
# https://github.com/goodboy/tractor/issues/296
|
# https://github.com/goodboy/tractor/issues/296
|
||||||
|
|
||||||
actor = Arbiter(
|
# TODO: rename as `RootActor` or is that even necessary?
|
||||||
|
actor = _runtime.Arbiter(
|
||||||
name=name or 'registrar',
|
name=name or 'registrar',
|
||||||
uuid=mk_uuid(),
|
uuid=mk_uuid(),
|
||||||
registry_addrs=registry_addrs,
|
registry_addrs=registry_addrs,
|
||||||
|
@ -441,13 +441,13 @@ async def open_root_actor(
|
||||||
f'{ml_addrs_str}'
|
f'{ml_addrs_str}'
|
||||||
)
|
)
|
||||||
|
|
||||||
# start the actor runtime in a new task
|
# start runtime in a bg sub-task, yield to caller.
|
||||||
async with trio.open_nursery(
|
async with (
|
||||||
strict_exception_groups=False,
|
collapse_eg(),
|
||||||
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
|
trio.open_nursery() as root_tn,
|
||||||
) as nursery:
|
):
|
||||||
|
|
||||||
# ``_runtime.async_main()`` creates an internal nursery
|
# `_runtime.async_main()` creates an internal nursery
|
||||||
# and blocks here until any underlying actor(-process)
|
# and blocks here until any underlying actor(-process)
|
||||||
# tree has terminated thereby conducting so called
|
# tree has terminated thereby conducting so called
|
||||||
# "end-to-end" structured concurrency throughout an
|
# "end-to-end" structured concurrency throughout an
|
||||||
|
@ -455,9 +455,9 @@ async def open_root_actor(
|
||||||
# "actor runtime" primitives are SC-compat and thus all
|
# "actor runtime" primitives are SC-compat and thus all
|
||||||
# transitively spawned actors/processes must be as
|
# transitively spawned actors/processes must be as
|
||||||
# well.
|
# well.
|
||||||
await nursery.start(
|
await root_tn.start(
|
||||||
partial(
|
partial(
|
||||||
async_main,
|
_runtime.async_main,
|
||||||
actor,
|
actor,
|
||||||
accept_addrs=trans_bind_addrs,
|
accept_addrs=trans_bind_addrs,
|
||||||
parent_addr=None
|
parent_addr=None
|
||||||
|
|
|
@ -756,7 +756,6 @@ async def _invoke(
|
||||||
BaseExceptionGroup,
|
BaseExceptionGroup,
|
||||||
BaseException,
|
BaseException,
|
||||||
trio.Cancelled,
|
trio.Cancelled,
|
||||||
|
|
||||||
) as _scope_err:
|
) as _scope_err:
|
||||||
scope_err = _scope_err
|
scope_err = _scope_err
|
||||||
if (
|
if (
|
||||||
|
|
|
@ -74,6 +74,9 @@ from tractor.msg import (
|
||||||
pretty_struct,
|
pretty_struct,
|
||||||
types as msgtypes,
|
types as msgtypes,
|
||||||
)
|
)
|
||||||
|
from .trionics import (
|
||||||
|
collapse_eg,
|
||||||
|
)
|
||||||
from .ipc import (
|
from .ipc import (
|
||||||
Channel,
|
Channel,
|
||||||
# IPCServer, # causes cycles atm..
|
# IPCServer, # causes cycles atm..
|
||||||
|
@ -1054,6 +1057,7 @@ class Actor:
|
||||||
cid: str,
|
cid: str,
|
||||||
parent_chan: Channel,
|
parent_chan: Channel,
|
||||||
requesting_uid: tuple[str, str]|None,
|
requesting_uid: tuple[str, str]|None,
|
||||||
|
# ^^TODO! use the `Aid` directly here!
|
||||||
|
|
||||||
ipc_msg: dict|None|bool = False,
|
ipc_msg: dict|None|bool = False,
|
||||||
|
|
||||||
|
@ -1099,9 +1103,12 @@ class Actor:
|
||||||
|
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Rxed cancel request for RPC task\n'
|
'Rxed cancel request for RPC task\n'
|
||||||
f'<=c) {requesting_uid}\n'
|
f'{ctx._task!r} <=c) {requesting_uid}\n'
|
||||||
f' |_{ctx._task}\n'
|
f'|_>> {ctx.repr_rpc}\n'
|
||||||
f' >> {ctx.repr_rpc}\n'
|
|
||||||
|
# f'|_{ctx._task}\n'
|
||||||
|
# f' >> {ctx.repr_rpc}\n'
|
||||||
|
|
||||||
# f'=> {ctx._task}\n'
|
# f'=> {ctx._task}\n'
|
||||||
# f' >> Actor._cancel_task() => {ctx._task}\n'
|
# f' >> Actor._cancel_task() => {ctx._task}\n'
|
||||||
# f' |_ {ctx._task}\n\n'
|
# f' |_ {ctx._task}\n\n'
|
||||||
|
@ -1386,10 +1393,12 @@ async def async_main(
|
||||||
# parent is kept alive as a resilient service until
|
# parent is kept alive as a resilient service until
|
||||||
# cancellation steps have (mostly) occurred in
|
# cancellation steps have (mostly) occurred in
|
||||||
# a deterministic way.
|
# a deterministic way.
|
||||||
async with trio.open_nursery(
|
root_tn: trio.Nursery
|
||||||
strict_exception_groups=False,
|
async with (
|
||||||
) as root_nursery:
|
collapse_eg(),
|
||||||
actor._root_n = root_nursery
|
trio.open_nursery() as root_tn,
|
||||||
|
):
|
||||||
|
actor._root_n = root_tn
|
||||||
assert actor._root_n
|
assert actor._root_n
|
||||||
|
|
||||||
ipc_server: _server.IPCServer
|
ipc_server: _server.IPCServer
|
||||||
|
@ -1488,7 +1497,7 @@ async def async_main(
|
||||||
# their root actor over that channel.
|
# their root actor over that channel.
|
||||||
if _state._runtime_vars['_is_root']:
|
if _state._runtime_vars['_is_root']:
|
||||||
for addr in accept_addrs:
|
for addr in accept_addrs:
|
||||||
waddr = wrap_address(addr)
|
waddr: Address = wrap_address(addr)
|
||||||
if waddr == waddr.get_root():
|
if waddr == waddr.get_root():
|
||||||
_state._runtime_vars['_root_mailbox'] = addr
|
_state._runtime_vars['_root_mailbox'] = addr
|
||||||
|
|
||||||
|
@ -1533,7 +1542,7 @@ async def async_main(
|
||||||
# start processing parent requests until our channel
|
# start processing parent requests until our channel
|
||||||
# server is 100% up and running.
|
# server is 100% up and running.
|
||||||
if actor._parent_chan:
|
if actor._parent_chan:
|
||||||
await root_nursery.start(
|
await root_tn.start(
|
||||||
partial(
|
partial(
|
||||||
_rpc.process_messages,
|
_rpc.process_messages,
|
||||||
chan=actor._parent_chan,
|
chan=actor._parent_chan,
|
||||||
|
|
|
@ -42,6 +42,7 @@ from ._runtime import Actor
|
||||||
from ._portal import Portal
|
from ._portal import Portal
|
||||||
from .trionics import (
|
from .trionics import (
|
||||||
is_multi_cancelled,
|
is_multi_cancelled,
|
||||||
|
collapse_eg,
|
||||||
)
|
)
|
||||||
from ._exceptions import (
|
from ._exceptions import (
|
||||||
ContextCancelled,
|
ContextCancelled,
|
||||||
|
@ -324,9 +325,10 @@ class ActorNursery:
|
||||||
server: IPCServer = self._actor.ipc_server
|
server: IPCServer = self._actor.ipc_server
|
||||||
|
|
||||||
with trio.move_on_after(3) as cs:
|
with trio.move_on_after(3) as cs:
|
||||||
async with trio.open_nursery(
|
async with (
|
||||||
strict_exception_groups=False,
|
collapse_eg(),
|
||||||
) as tn:
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
|
|
||||||
subactor: Actor
|
subactor: Actor
|
||||||
proc: trio.Process
|
proc: trio.Process
|
||||||
|
@ -419,10 +421,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# `ActorNursery.start_actor()`).
|
# `ActorNursery.start_actor()`).
|
||||||
|
|
||||||
# errors from this daemon actor nursery bubble up to caller
|
# errors from this daemon actor nursery bubble up to caller
|
||||||
async with trio.open_nursery(
|
async with (
|
||||||
strict_exception_groups=False,
|
collapse_eg(),
|
||||||
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
|
trio.open_nursery() as da_nursery,
|
||||||
) as da_nursery:
|
):
|
||||||
try:
|
try:
|
||||||
# This is the inner level "run in actor" nursery. It is
|
# This is the inner level "run in actor" nursery. It is
|
||||||
# awaited first since actors spawned in this way (using
|
# awaited first since actors spawned in this way (using
|
||||||
|
@ -432,11 +434,10 @@ async def _open_and_supervise_one_cancels_all_nursery(
|
||||||
# immediately raised for handling by a supervisor strategy.
|
# immediately raised for handling by a supervisor strategy.
|
||||||
# As such if the strategy propagates any error(s) upwards
|
# As such if the strategy propagates any error(s) upwards
|
||||||
# the above "daemon actor" nursery will be notified.
|
# the above "daemon actor" nursery will be notified.
|
||||||
async with trio.open_nursery(
|
async with (
|
||||||
strict_exception_groups=False,
|
collapse_eg(),
|
||||||
# ^XXX^ TODO? instead unpack any RAE as per "loose" style?
|
trio.open_nursery() as ria_nursery,
|
||||||
) as ria_nursery:
|
):
|
||||||
|
|
||||||
an = ActorNursery(
|
an = ActorNursery(
|
||||||
actor,
|
actor,
|
||||||
ria_nursery,
|
ria_nursery,
|
||||||
|
|
|
@ -238,7 +238,8 @@ def enable_stack_on_sig(
|
||||||
import stackscope
|
import stackscope
|
||||||
except ImportError:
|
except ImportError:
|
||||||
log.error(
|
log.error(
|
||||||
'`stackscope` not installed for use in debug mode!'
|
'`stackscope` not installed for use in debug mode!\n'
|
||||||
|
'`Ignoring {enable_stack_on_sig!r} call!\n'
|
||||||
)
|
)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
|
@ -143,7 +143,7 @@ async def maybe_wait_on_canced_subs(
|
||||||
log.cancel(
|
log.cancel(
|
||||||
'Waiting on cancel request to peer..\n'
|
'Waiting on cancel request to peer..\n'
|
||||||
f'c)=>\n'
|
f'c)=>\n'
|
||||||
f' |_{chan.uid}\n'
|
f' |_{chan.aid}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# XXX: this is a soft wait on the channel (and its
|
# XXX: this is a soft wait on the channel (and its
|
||||||
|
@ -156,7 +156,7 @@ async def maybe_wait_on_canced_subs(
|
||||||
# local runtime here is now cancelled while
|
# local runtime here is now cancelled while
|
||||||
# (presumably) in the middle of msg loop processing.
|
# (presumably) in the middle of msg loop processing.
|
||||||
chan_info: str = (
|
chan_info: str = (
|
||||||
f'{chan.uid}\n'
|
f'{chan.aid}\n'
|
||||||
f'|_{chan}\n'
|
f'|_{chan}\n'
|
||||||
f' |_{chan.transport}\n\n'
|
f' |_{chan.transport}\n\n'
|
||||||
)
|
)
|
||||||
|
@ -279,7 +279,7 @@ async def maybe_wait_on_canced_subs(
|
||||||
log.runtime(
|
log.runtime(
|
||||||
f'Peer IPC broke but subproc is alive?\n\n'
|
f'Peer IPC broke but subproc is alive?\n\n'
|
||||||
|
|
||||||
f'<=x {chan.uid}@{chan.raddr}\n'
|
f'<=x {chan.aid}@{chan.raddr}\n'
|
||||||
f' |_{proc}\n'
|
f' |_{proc}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -460,7 +460,7 @@ async def handle_stream_from_peer(
|
||||||
# drop ref to channel so it can be gc-ed and disconnected
|
# drop ref to channel so it can be gc-ed and disconnected
|
||||||
con_teardown_status: str = (
|
con_teardown_status: str = (
|
||||||
f'IPC channel disconnected:\n'
|
f'IPC channel disconnected:\n'
|
||||||
f'<=x uid: {chan.uid}\n'
|
f'<=x uid: {chan.aid}\n'
|
||||||
f' |_{pformat(chan)}\n\n'
|
f' |_{pformat(chan)}\n\n'
|
||||||
)
|
)
|
||||||
chans.remove(chan)
|
chans.remove(chan)
|
||||||
|
@ -468,7 +468,7 @@ async def handle_stream_from_peer(
|
||||||
# TODO: do we need to be this pedantic?
|
# TODO: do we need to be this pedantic?
|
||||||
if not chans:
|
if not chans:
|
||||||
con_teardown_status += (
|
con_teardown_status += (
|
||||||
f'-> No more channels with {chan.uid}'
|
f'-> No more channels with {chan.aid}'
|
||||||
)
|
)
|
||||||
server._peers.pop(uid, None)
|
server._peers.pop(uid, None)
|
||||||
|
|
||||||
|
@ -519,7 +519,7 @@ async def handle_stream_from_peer(
|
||||||
and
|
and
|
||||||
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
(ctx_in_debug := pdb_lock.ctx_in_debug)
|
||||||
and
|
and
|
||||||
(pdb_user_uid := ctx_in_debug.chan.uid)
|
(pdb_user_uid := ctx_in_debug.chan.aid)
|
||||||
):
|
):
|
||||||
entry: tuple|None = local_nursery._children.get(
|
entry: tuple|None = local_nursery._children.get(
|
||||||
tuple(pdb_user_uid)
|
tuple(pdb_user_uid)
|
||||||
|
|
|
@ -40,7 +40,7 @@ from typing import (
|
||||||
import trio
|
import trio
|
||||||
from tractor._state import current_actor
|
from tractor._state import current_actor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
# from ._beg import collapse_eg
|
from ._beg import collapse_eg
|
||||||
|
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
|
@ -151,13 +151,8 @@ async def gather_contexts(
|
||||||
)
|
)
|
||||||
|
|
||||||
async with (
|
async with (
|
||||||
# collapse_eg(),
|
collapse_eg(),
|
||||||
trio.open_nursery(
|
trio.open_nursery() as tn,
|
||||||
# strict_exception_groups=False,
|
|
||||||
# ^XXX^ TODO? soo roll our own then ??
|
|
||||||
# -> since we kinda want the "if only one `.exception` then
|
|
||||||
# just raise that" interface?
|
|
||||||
) as tn,
|
|
||||||
):
|
):
|
||||||
for mngr in mngrs:
|
for mngr in mngrs:
|
||||||
tn.start_soon(
|
tn.start_soon(
|
||||||
|
|
Loading…
Reference in New Issue