Compare commits
34 Commits
c974ce242c
...
edf0af51c2
| Author | SHA1 | Date |
|---|---|---|
|
|
edf0af51c2 | |
|
|
fcc253f6ce | |
|
|
d704f99223 | |
|
|
a120f378d0 | |
|
|
36307c5917 | |
|
|
30c2c3cb30 | |
|
|
0f6a0676eb | |
|
|
2616f4b976 | |
|
|
b5fd2a40b1 | |
|
|
277ddc1625 | |
|
|
70bb77280e | |
|
|
916f88a070 | |
|
|
91f2f3ec10 | |
|
|
3e5124e184 | |
|
|
fa86269e30 | |
|
|
d0b92bbeba | |
|
|
9470815f5a | |
|
|
592d918394 | |
|
|
0cddc67bdb | |
|
|
052fe2435f | |
|
|
28819bf5d3 | |
|
|
07c2ba5c0d | |
|
|
50f40f427b | |
|
|
bf6de55865 | |
|
|
5ded99a886 | |
|
|
7145fa364f | |
|
|
f8e25688c7 | |
|
|
c3f455a8ec | |
|
|
f78e842fba | |
|
|
3638b80c9d | |
|
|
2ed9e65530 | |
|
|
6cab363c51 | |
|
|
8aee24e83f | |
|
|
cdcc1b42fc |
|
|
@ -17,6 +17,7 @@ from tractor import (
|
|||
MsgStream,
|
||||
_testing,
|
||||
trionics,
|
||||
TransportClosed,
|
||||
)
|
||||
import trio
|
||||
import pytest
|
||||
|
|
@ -208,12 +209,16 @@ async def main(
|
|||
# TODO: is this needed or no?
|
||||
raise
|
||||
|
||||
except trio.ClosedResourceError:
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
TransportClosed,
|
||||
) as _tpt_err:
|
||||
# NOTE: don't send if we already broke the
|
||||
# connection to avoid raising a closed-error
|
||||
# such that we drop through to the ctl-c
|
||||
# mashing by user.
|
||||
await trio.sleep(0.01)
|
||||
with trio.CancelScope(shield=True):
|
||||
await trio.sleep(0.01)
|
||||
|
||||
# timeout: int = 1
|
||||
# with trio.move_on_after(timeout) as cs:
|
||||
|
|
@ -247,6 +252,7 @@ async def main(
|
|||
await stream.send(i)
|
||||
pytest.fail('stream not closed?')
|
||||
except (
|
||||
TransportClosed,
|
||||
trio.ClosedResourceError,
|
||||
trio.EndOfChannel,
|
||||
) as send_err:
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@
|
|||
'''
|
||||
from __future__ import annotations
|
||||
import time
|
||||
import signal
|
||||
from typing import (
|
||||
Callable,
|
||||
TYPE_CHECKING,
|
||||
|
|
@ -69,12 +70,15 @@ def spawn(
|
|||
import os
|
||||
os.environ['PYTHON_COLORS'] = '0'
|
||||
|
||||
spawned: PexpectSpawner|None = None
|
||||
|
||||
def _spawn(
|
||||
cmd: str,
|
||||
**mkcmd_kwargs,
|
||||
) -> pty_spawn.spawn:
|
||||
nonlocal spawned
|
||||
unset_colors()
|
||||
return testdir.spawn(
|
||||
spawned = testdir.spawn(
|
||||
cmd=mk_cmd(
|
||||
cmd,
|
||||
**mkcmd_kwargs,
|
||||
|
|
@ -84,9 +88,35 @@ def spawn(
|
|||
# ^TODO? get `pytest` core to expose underlying
|
||||
# `pexpect.spawn()` stuff?
|
||||
)
|
||||
return spawned
|
||||
|
||||
# such that test-dep can pass input script name.
|
||||
return _spawn # the `PexpectSpawner`, type alias.
|
||||
yield _spawn # the `PexpectSpawner`, type alias.
|
||||
|
||||
if (
|
||||
spawned
|
||||
and
|
||||
(ptyproc := spawned.ptyproc)
|
||||
):
|
||||
start: float = time.time()
|
||||
timeout: float = 5
|
||||
while (
|
||||
ptyproc.isalive()
|
||||
and
|
||||
(
|
||||
(_time_took := (time.time() - start))
|
||||
<
|
||||
timeout
|
||||
)
|
||||
):
|
||||
ptyproc.kill(signal.SIGINT)
|
||||
time.sleep(0.01)
|
||||
|
||||
if ptyproc.isalive():
|
||||
ptyproc.kill(signal.SIGKILL)
|
||||
|
||||
# TODO? ensure we've cleaned up any UDS-paths?
|
||||
# breakpoint()
|
||||
|
||||
|
||||
@pytest.fixture(
|
||||
|
|
|
|||
|
|
@ -1138,7 +1138,10 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
|
|||
['peer IPC channel closed abruptly?',
|
||||
'another task closed this fd',
|
||||
'Debug lock request was CANCELLED?',
|
||||
"TransportClosed: 'MsgpackUDSStream' was already closed locally ?",]
|
||||
"'MsgpackUDSStream' was already closed locally?",
|
||||
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
|
||||
# ?TODO^? match depending on `tpt_proto(s)`?
|
||||
]
|
||||
|
||||
# XXX races on whether these show/hit?
|
||||
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',
|
||||
|
|
|
|||
|
|
@ -98,7 +98,8 @@ def test_ipc_channel_break_during_stream(
|
|||
expect_final_exc = TransportClosed
|
||||
|
||||
mod: ModuleType = import_path(
|
||||
examples_dir() / 'advanced_faults'
|
||||
examples_dir()
|
||||
/ 'advanced_faults'
|
||||
/ 'ipc_failure_during_stream.py',
|
||||
root=examples_dir(),
|
||||
consider_namespace_packages=False,
|
||||
|
|
@ -113,8 +114,9 @@ def test_ipc_channel_break_during_stream(
|
|||
if (
|
||||
# only expect EoC if trans is broken on the child side,
|
||||
ipc_break['break_child_ipc_after'] is not False
|
||||
and
|
||||
# AND we tell the child to call `MsgStream.aclose()`.
|
||||
and pre_aclose_msgstream
|
||||
pre_aclose_msgstream
|
||||
):
|
||||
# expect_final_exc = trio.EndOfChannel
|
||||
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
|
||||
|
|
@ -160,7 +162,8 @@ def test_ipc_channel_break_during_stream(
|
|||
ipc_break['break_child_ipc_after'] is not False
|
||||
and (
|
||||
ipc_break['break_parent_ipc_after']
|
||||
> ipc_break['break_child_ipc_after']
|
||||
>
|
||||
ipc_break['break_child_ipc_after']
|
||||
)
|
||||
):
|
||||
if pre_aclose_msgstream:
|
||||
|
|
@ -248,8 +251,15 @@ def test_ipc_channel_break_during_stream(
|
|||
# get raw instance from pytest wrapper
|
||||
value = excinfo.value
|
||||
if isinstance(value, ExceptionGroup):
|
||||
excs = value.exceptions
|
||||
assert len(excs) == 1
|
||||
excs: tuple[Exception] = value.exceptions
|
||||
assert (
|
||||
len(excs) <= 2
|
||||
and
|
||||
all(
|
||||
isinstance(exc, TransportClosed)
|
||||
for exc in excs
|
||||
)
|
||||
)
|
||||
final_exc = excs[0]
|
||||
assert isinstance(final_exc, expect_final_exc)
|
||||
|
||||
|
|
|
|||
|
|
@ -11,12 +11,13 @@ import trio
|
|||
import tractor
|
||||
from tractor import ( # typing
|
||||
Actor,
|
||||
current_actor,
|
||||
open_nursery,
|
||||
Portal,
|
||||
Context,
|
||||
ContextCancelled,
|
||||
MsgStream,
|
||||
Portal,
|
||||
RemoteActorError,
|
||||
current_actor,
|
||||
open_nursery,
|
||||
)
|
||||
from tractor._testing import (
|
||||
# tractor_test,
|
||||
|
|
@ -796,8 +797,8 @@ async def basic_echo_server(
|
|||
|
||||
) -> None:
|
||||
'''
|
||||
Just the simplest `MsgStream` echo server which resays what
|
||||
you told it but with its uid in front ;)
|
||||
Just the simplest `MsgStream` echo server which resays what you
|
||||
told it but with its uid in front ;)
|
||||
|
||||
'''
|
||||
actor: Actor = tractor.current_actor()
|
||||
|
|
@ -966,9 +967,14 @@ async def tell_little_bro(
|
|||
|
||||
caller: str = '',
|
||||
err_after: float|None = None,
|
||||
rng_seed: int = 50,
|
||||
rng_seed: int = 100,
|
||||
# NOTE, ensure ^ is large enough (on fast hw anyway)
|
||||
# to ensure the peer cancel req arrives before the
|
||||
# echoing dialog does itself Bp
|
||||
):
|
||||
# contact target actor, do a stream dialog.
|
||||
lb: Portal
|
||||
echo_ipc: MsgStream
|
||||
async with (
|
||||
tractor.wait_for_actor(
|
||||
name=actor_name
|
||||
|
|
@ -983,7 +989,6 @@ async def tell_little_bro(
|
|||
else None
|
||||
),
|
||||
) as (sub_ctx, first),
|
||||
|
||||
sub_ctx.open_stream() as echo_ipc,
|
||||
):
|
||||
actor: Actor = current_actor()
|
||||
|
|
@ -994,6 +999,7 @@ async def tell_little_bro(
|
|||
i,
|
||||
)
|
||||
await echo_ipc.send(msg)
|
||||
await trio.sleep(0.001)
|
||||
resp = await echo_ipc.receive()
|
||||
print(
|
||||
f'{caller} => {actor_name}: {msg}\n'
|
||||
|
|
@ -1006,6 +1012,9 @@ async def tell_little_bro(
|
|||
assert sub_uid != uid
|
||||
assert _i == i
|
||||
|
||||
# XXX, usually should never get here!
|
||||
# await tractor.pause()
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
'raise_client_error',
|
||||
|
|
|
|||
|
|
@ -1,8 +1,13 @@
|
|||
"""
|
||||
Multiple python programs invoking the runtime.
|
||||
"""
|
||||
from __future__ import annotations
|
||||
import platform
|
||||
import subprocess
|
||||
import time
|
||||
from typing import (
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
|
|
@ -10,14 +15,29 @@ import tractor
|
|||
from tractor._testing import (
|
||||
tractor_test,
|
||||
)
|
||||
from tractor import (
|
||||
current_actor,
|
||||
_state,
|
||||
Actor,
|
||||
Context,
|
||||
Portal,
|
||||
)
|
||||
from .conftest import (
|
||||
sig_prog,
|
||||
_INT_SIGNAL,
|
||||
_INT_RETURN_CODE,
|
||||
)
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tractor.msg import Aid
|
||||
from tractor._addr import (
|
||||
UnwrappedAddress,
|
||||
)
|
||||
|
||||
def test_abort_on_sigint(daemon):
|
||||
|
||||
def test_abort_on_sigint(
|
||||
daemon: subprocess.Popen,
|
||||
):
|
||||
assert daemon.returncode is None
|
||||
time.sleep(0.1)
|
||||
sig_prog(daemon, _INT_SIGNAL)
|
||||
|
|
@ -30,8 +50,11 @@ def test_abort_on_sigint(daemon):
|
|||
|
||||
|
||||
@tractor_test
|
||||
async def test_cancel_remote_arbiter(daemon, reg_addr):
|
||||
assert not tractor.current_actor().is_arbiter
|
||||
async def test_cancel_remote_arbiter(
|
||||
daemon: subprocess.Popen,
|
||||
reg_addr: UnwrappedAddress,
|
||||
):
|
||||
assert not current_actor().is_arbiter
|
||||
async with tractor.get_registry(reg_addr) as portal:
|
||||
await portal.cancel_actor()
|
||||
|
||||
|
|
@ -45,24 +68,106 @@ async def test_cancel_remote_arbiter(daemon, reg_addr):
|
|||
pass
|
||||
|
||||
|
||||
def test_register_duplicate_name(daemon, reg_addr):
|
||||
|
||||
def test_register_duplicate_name(
|
||||
daemon: subprocess.Popen,
|
||||
reg_addr: UnwrappedAddress,
|
||||
):
|
||||
async def main():
|
||||
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
) as n:
|
||||
) as an:
|
||||
|
||||
assert not tractor.current_actor().is_arbiter
|
||||
assert not current_actor().is_arbiter
|
||||
|
||||
p1 = await n.start_actor('doggy')
|
||||
p2 = await n.start_actor('doggy')
|
||||
p1 = await an.start_actor('doggy')
|
||||
p2 = await an.start_actor('doggy')
|
||||
|
||||
async with tractor.wait_for_actor('doggy') as portal:
|
||||
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
|
||||
|
||||
await n.cancel()
|
||||
await an.cancel()
|
||||
|
||||
# run it manually since we want to start **after**
|
||||
# the other "daemon" program
|
||||
# XXX, run manually since we want to start this root **after**
|
||||
# the other "daemon" program with it's own root.
|
||||
trio.run(main)
|
||||
|
||||
|
||||
@tractor.context
|
||||
async def get_root_portal(
|
||||
ctx: Context,
|
||||
):
|
||||
'''
|
||||
Connect back to the root actor manually (using `._discovery` API)
|
||||
and ensure it's contact info is the same as our immediate parent.
|
||||
|
||||
'''
|
||||
sub: Actor = current_actor()
|
||||
rtvs: dict = _state._runtime_vars
|
||||
raddrs: list[UnwrappedAddress] = rtvs['_root_addrs']
|
||||
|
||||
# await tractor.pause()
|
||||
# XXX, in case the sub->root discovery breaks you might need
|
||||
# this (i know i did Xp)!!
|
||||
# from tractor.devx import mk_pdb
|
||||
# mk_pdb().set_trace()
|
||||
|
||||
assert (
|
||||
len(raddrs) == 1
|
||||
and
|
||||
list(sub._parent_chan.raddr.unwrap()) in raddrs
|
||||
)
|
||||
|
||||
# connect back to our immediate parent which should also
|
||||
# be the actor-tree's root.
|
||||
from tractor._discovery import get_root
|
||||
ptl: Portal
|
||||
async with get_root() as ptl:
|
||||
root_aid: Aid = ptl.chan.aid
|
||||
parent_ptl: Portal = current_actor().get_parent()
|
||||
assert (
|
||||
root_aid.name == 'root'
|
||||
and
|
||||
parent_ptl.chan.aid == root_aid
|
||||
)
|
||||
await ctx.started()
|
||||
|
||||
|
||||
def test_non_registrar_spawns_child(
|
||||
daemon: subprocess.Popen,
|
||||
reg_addr: UnwrappedAddress,
|
||||
loglevel: str,
|
||||
debug_mode: bool,
|
||||
):
|
||||
'''
|
||||
Ensure a non-regristar (serving) root actor can spawn a sub and
|
||||
that sub can connect back (manually) to it's rent that is the
|
||||
root without issue.
|
||||
|
||||
More or less this audits the global contact info in
|
||||
`._state._runtime_vars`.
|
||||
|
||||
'''
|
||||
async def main():
|
||||
async with tractor.open_nursery(
|
||||
registry_addrs=[reg_addr],
|
||||
loglevel=loglevel,
|
||||
debug_mode=debug_mode,
|
||||
) as an:
|
||||
|
||||
actor: Actor = tractor.current_actor()
|
||||
assert not actor.is_registrar
|
||||
sub_ptl: Portal = await an.start_actor(
|
||||
name='sub',
|
||||
enable_modules=[__name__],
|
||||
)
|
||||
|
||||
async with sub_ptl.open_context(
|
||||
get_root_portal,
|
||||
) as (ctx, _):
|
||||
print('Waiting for `sub` to connect back to us..')
|
||||
|
||||
await an.cancel()
|
||||
|
||||
# XXX, run manually since we want to start this root **after**
|
||||
# the other "daemon" program with it's own root.
|
||||
trio.run(main)
|
||||
|
|
|
|||
|
|
@ -70,6 +70,7 @@ from ._exceptions import (
|
|||
MsgTypeError,
|
||||
RemoteActorError,
|
||||
StreamOverrun,
|
||||
TransportClosed,
|
||||
pack_from_raise,
|
||||
unpack_error,
|
||||
)
|
||||
|
|
@ -2428,10 +2429,7 @@ async def open_context_from_portal(
|
|||
try:
|
||||
# await pause(shield=True)
|
||||
await ctx.cancel()
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError,
|
||||
):
|
||||
except TransportClosed:
|
||||
log.warning(
|
||||
'IPC connection for context is broken?\n'
|
||||
f'task: {ctx.cid}\n'
|
||||
|
|
|
|||
|
|
@ -91,10 +91,13 @@ async def get_registry(
|
|||
|
||||
|
||||
@acm
|
||||
async def get_root(
|
||||
**kwargs,
|
||||
) -> AsyncGenerator[Portal, None]:
|
||||
async def get_root(**kwargs) -> AsyncGenerator[Portal, None]:
|
||||
'''
|
||||
Deliver the current actor's "root process" actor (yes in actor
|
||||
and proc tree terms) by delivering a `Portal` from the spawn-time
|
||||
provided contact address.
|
||||
|
||||
'''
|
||||
# TODO: rename mailbox to `_root_maddr` when we finally
|
||||
# add and impl libp2p multi-addrs?
|
||||
addr = _runtime_vars['_root_mailbox']
|
||||
|
|
@ -193,6 +196,11 @@ async def maybe_open_portal(
|
|||
addr: UnwrappedAddress,
|
||||
name: str,
|
||||
):
|
||||
'''
|
||||
Open a `Portal` to the actor serving @ `addr` or `None` if no
|
||||
peer can be contacted or found.
|
||||
|
||||
'''
|
||||
async with query_actor(
|
||||
name=name,
|
||||
regaddr=addr,
|
||||
|
|
|
|||
|
|
@ -329,18 +329,7 @@ class Portal:
|
|||
# if we get here some weird cancellation case happened
|
||||
return False
|
||||
|
||||
except (
|
||||
# XXX, should never really get raised unless we aren't
|
||||
# wrapping them in the below type by mistake?
|
||||
#
|
||||
# Leaving the catch here for now until we're very sure
|
||||
# all the cases (for various tpt protos) have indeed been
|
||||
# re-wrapped ;p
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
|
||||
TransportClosed,
|
||||
) as tpt_err:
|
||||
except TransportClosed as tpt_err:
|
||||
ipc_borked_report: str = (
|
||||
f'IPC for actor already closed/broken?\n\n'
|
||||
f'\n'
|
||||
|
|
|
|||
|
|
@ -88,7 +88,8 @@ async def maybe_block_bp(
|
|||
bp_blocked: bool
|
||||
if (
|
||||
debug_mode
|
||||
and maybe_enable_greenback
|
||||
and
|
||||
maybe_enable_greenback
|
||||
and (
|
||||
maybe_mod := await debug.maybe_init_greenback(
|
||||
raise_not_found=False,
|
||||
|
|
@ -385,10 +386,13 @@ async def open_root_actor(
|
|||
addr,
|
||||
)
|
||||
|
||||
trans_bind_addrs: list[UnwrappedAddress] = []
|
||||
tpt_bind_addrs: list[
|
||||
Address # `Address.get_random()` case
|
||||
|UnwrappedAddress # registrar case `= uw_reg_addrs`
|
||||
] = []
|
||||
|
||||
# Create a new local root-actor instance which IS NOT THE
|
||||
# REGISTRAR
|
||||
# ------ NON-REGISTRAR ------
|
||||
# create a new root-actor instance.
|
||||
if ponged_addrs:
|
||||
if ensure_registry:
|
||||
raise RuntimeError(
|
||||
|
|
@ -415,12 +419,21 @@ async def open_root_actor(
|
|||
# XXX INSTEAD, bind random addrs using the same tpt
|
||||
# proto.
|
||||
for addr in ponged_addrs:
|
||||
trans_bind_addrs.append(
|
||||
tpt_bind_addrs.append(
|
||||
# XXX, these are `Address` NOT `UnwrappedAddress`.
|
||||
#
|
||||
# NOTE, in the case of posix/berkley socket
|
||||
# protos we allocate port=0 such that the system
|
||||
# allocates a random value at bind time; this
|
||||
# happens in the `.ipc.*` stack's backend.
|
||||
addr.get_random(
|
||||
bindspace=addr.bindspace,
|
||||
)
|
||||
)
|
||||
|
||||
# ------ REGISTRAR ------
|
||||
# create a new "registry providing" root-actor instance.
|
||||
#
|
||||
# Start this local actor as the "registrar", aka a regular
|
||||
# actor who manages the local registry of "mailboxes" of
|
||||
# other process-tree-local sub-actors.
|
||||
|
|
@ -429,7 +442,7 @@ async def open_root_actor(
|
|||
# following init steps are taken:
|
||||
# - the tranport layer server is bound to each addr
|
||||
# pair defined in provided registry_addrs, or the default.
|
||||
trans_bind_addrs = uw_reg_addrs
|
||||
tpt_bind_addrs = uw_reg_addrs
|
||||
|
||||
# - it is normally desirable for any registrar to stay up
|
||||
# indefinitely until either all registered (child/sub)
|
||||
|
|
@ -449,20 +462,10 @@ async def open_root_actor(
|
|||
enable_modules=enable_modules,
|
||||
)
|
||||
# XXX, in case the root actor runtime was actually run from
|
||||
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
|
||||
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOT
|
||||
# `.trio.run()`.
|
||||
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
|
||||
|
||||
# NOTE, only set the loopback addr for the
|
||||
# process-tree-global "root" mailbox since all sub-actors
|
||||
# should be able to speak to their root actor over that
|
||||
# channel.
|
||||
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
|
||||
raddrs.extend(trans_bind_addrs)
|
||||
# TODO, remove once we have also removed all usage;
|
||||
# eventually all (root-)registry apis should expect > 1 addr.
|
||||
_state._runtime_vars['_root_mailbox'] = raddrs[0]
|
||||
|
||||
# Start up main task set via core actor-runtime nurseries.
|
||||
try:
|
||||
# assign process-local actor
|
||||
|
|
@ -499,14 +502,39 @@ async def open_root_actor(
|
|||
# "actor runtime" primitives are SC-compat and thus all
|
||||
# transitively spawned actors/processes must be as
|
||||
# well.
|
||||
await root_tn.start(
|
||||
accept_addrs: list[UnwrappedAddress]
|
||||
reg_addrs: list[UnwrappedAddress]
|
||||
(
|
||||
accept_addrs,
|
||||
reg_addrs,
|
||||
) = await root_tn.start(
|
||||
partial(
|
||||
_runtime.async_main,
|
||||
actor,
|
||||
accept_addrs=trans_bind_addrs,
|
||||
accept_addrs=tpt_bind_addrs,
|
||||
parent_addr=None
|
||||
)
|
||||
)
|
||||
# NOTE, only set a local-host addr (i.e. like
|
||||
# `lo`-loopback for TCP) for the process-tree-global
|
||||
# "root"-process (its tree-wide "mailbox") since all
|
||||
# sub-actors should be able to speak to their root
|
||||
# actor over that channel.
|
||||
#
|
||||
# ?TODO, per-OS non-network-proto alt options?
|
||||
# -[ ] on linux we should be able to always use UDS?
|
||||
#
|
||||
raddrs: list[UnwrappedAddress] = _state._runtime_vars['_root_addrs']
|
||||
raddrs.extend(
|
||||
accept_addrs,
|
||||
)
|
||||
# TODO, remove once we have also removed all usage;
|
||||
# eventually all (root-)registry apis should expect > 1 addr.
|
||||
_state._runtime_vars['_root_mailbox'] = raddrs[0]
|
||||
# if 'chart' in actor.aid.name:
|
||||
# from tractor.devx import mk_pdb
|
||||
# mk_pdb().set_trace()
|
||||
|
||||
try:
|
||||
yield actor
|
||||
except (
|
||||
|
|
@ -588,6 +616,13 @@ async def open_root_actor(
|
|||
):
|
||||
_state._runtime_vars['_debug_mode'] = False
|
||||
|
||||
# !XXX, clear ALL prior contact info state, this is MEGA
|
||||
# important if you are opening the runtime multiple times
|
||||
# from the same parent process (like in our test
|
||||
# harness)!
|
||||
_state._runtime_vars['_root_addrs'].clear()
|
||||
_state._runtime_vars['_root_mailbox'] = None
|
||||
|
||||
_state._current_actor = None
|
||||
_state._last_actor_terminated = actor
|
||||
|
||||
|
|
|
|||
|
|
@ -284,9 +284,14 @@ async def _errors_relayed_via_ipc(
|
|||
try:
|
||||
yield # run RPC invoke body
|
||||
|
||||
except TransportClosed:
|
||||
log.exception('Tpt disconnect during remote-exc relay?')
|
||||
raise
|
||||
# NOTE, never REPL any pseudo-expected tpt-disconnect.
|
||||
except TransportClosed as err:
|
||||
rpc_err = err
|
||||
log.warning(
|
||||
f'Tpt disconnect during remote-exc relay due to,\n'
|
||||
f'{err!r}\n'
|
||||
)
|
||||
raise err
|
||||
|
||||
# box and ship RPC errors for wire-transit via
|
||||
# the task's requesting parent IPC-channel.
|
||||
|
|
@ -323,9 +328,6 @@ async def _errors_relayed_via_ipc(
|
|||
and debug_kbis
|
||||
)
|
||||
)
|
||||
# TODO? better then `debug_filter` below?
|
||||
and
|
||||
not isinstance(err, TransportClosed)
|
||||
):
|
||||
# XXX QUESTION XXX: is there any case where we'll
|
||||
# want to debug IPC disconnects as a default?
|
||||
|
|
@ -346,13 +348,6 @@ async def _errors_relayed_via_ipc(
|
|||
entered_debug = await debug._maybe_enter_pm(
|
||||
err,
|
||||
api_frame=inspect.currentframe(),
|
||||
|
||||
# don't REPL any psuedo-expected tpt-disconnect
|
||||
# debug_filter=lambda exc: (
|
||||
# type (exc) not in {
|
||||
# TransportClosed,
|
||||
# }
|
||||
# ),
|
||||
)
|
||||
if not entered_debug:
|
||||
# if we prolly should have entered the REPL but
|
||||
|
|
@ -438,7 +433,7 @@ async def _errors_relayed_via_ipc(
|
|||
# cancel scope will not have been inserted yet
|
||||
if is_rpc:
|
||||
log.warning(
|
||||
'RPC task likely errored or cancelled before start?\n'
|
||||
'RPC task likely crashed or cancelled before start?\n'
|
||||
f'|_{ctx._task}\n'
|
||||
f' >> {ctx.repr_rpc}\n'
|
||||
)
|
||||
|
|
@ -694,22 +689,6 @@ async def _invoke(
|
|||
f'{pretty_struct.pformat(return_msg)}\n'
|
||||
)
|
||||
await chan.send(return_msg)
|
||||
# ?TODO, remove the below since .send() already
|
||||
# doesn't raise on tpt-closed?
|
||||
# try:
|
||||
# await chan.send(return_msg)
|
||||
# except TransportClosed:
|
||||
# log.exception(
|
||||
# f"Failed send final result to 'parent'-side of IPC-ctx!\n"
|
||||
# f'\n'
|
||||
# f'{chan}\n'
|
||||
# f'Channel already disconnected ??\n'
|
||||
# f'\n'
|
||||
# f'{pretty_struct.pformat(return_msg)}'
|
||||
# )
|
||||
# # ?TODO? will this ever be true though?
|
||||
# if chan.connected():
|
||||
# raise
|
||||
|
||||
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
||||
# called by any of,
|
||||
|
|
@ -935,6 +914,11 @@ async def try_ship_error_to_remote(
|
|||
|
||||
# XXX NOTE XXX in SC terms this is one of the worst things
|
||||
# that can happen and provides for a 2-general's dilemma..
|
||||
#
|
||||
# FURHTER, we should never really have to handle these
|
||||
# lowlevel excs from `trio` since the `Channel.send()` layers
|
||||
# downward should be mostly wrapping such cases in a
|
||||
# tpt-closed; the `.critical()` usage is warranted.
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
|
|
|
|||
|
|
@ -147,6 +147,8 @@ def get_mod_nsps2fps(mod_ns_paths: list[str]) -> dict[str, str]:
|
|||
return nsp2fp
|
||||
|
||||
|
||||
_bp = False
|
||||
|
||||
class Actor:
|
||||
'''
|
||||
The fundamental "runtime" concurrency primitive.
|
||||
|
|
@ -181,6 +183,14 @@ class Actor:
|
|||
def is_registrar(self) -> bool:
|
||||
return self.is_arbiter
|
||||
|
||||
@property
|
||||
def is_root(self) -> bool:
|
||||
'''
|
||||
This actor is the parent most in the tree?
|
||||
|
||||
'''
|
||||
return _state.is_root_process()
|
||||
|
||||
msg_buffer_size: int = 2**6
|
||||
|
||||
# nursery placeholders filled in by `async_main()`,
|
||||
|
|
@ -272,7 +282,9 @@ class Actor:
|
|||
stacklevel=2,
|
||||
)
|
||||
|
||||
registry_addrs: list[Address] = [wrap_address(arbiter_addr)]
|
||||
registry_addrs: list[Address] = [
|
||||
wrap_address(arbiter_addr)
|
||||
]
|
||||
|
||||
# marked by the process spawning backend at startup
|
||||
# will be None for the parent most process started manually
|
||||
|
|
@ -959,6 +971,21 @@ class Actor:
|
|||
|
||||
rvs['_is_root'] = False # obvi XD
|
||||
|
||||
# TODO, remove! left in just while protoing init fix!
|
||||
# global _bp
|
||||
# if (
|
||||
# 'chart' in self.aid.name
|
||||
# and
|
||||
# isinstance(
|
||||
# rvs['_root_addrs'][0],
|
||||
# dict,
|
||||
# )
|
||||
# and
|
||||
# not _bp
|
||||
# ):
|
||||
# _bp = True
|
||||
# breakpoint()
|
||||
|
||||
_state._runtime_vars.update(rvs)
|
||||
|
||||
# `SpawnSpec.reg_addrs`
|
||||
|
|
@ -1455,7 +1482,12 @@ async def async_main(
|
|||
# be False when running as root actor and True when as
|
||||
# a subactor.
|
||||
parent_addr: UnwrappedAddress|None = None,
|
||||
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
|
||||
task_status: TaskStatus[
|
||||
tuple[
|
||||
list[UnwrappedAddress], # accept_addrs
|
||||
list[UnwrappedAddress], # reg_addrs
|
||||
]
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
|
|
@ -1634,6 +1666,7 @@ async def async_main(
|
|||
# if addresses point to the same actor..
|
||||
# So we need a way to detect that? maybe iterate
|
||||
# only on unique actor uids?
|
||||
addr: UnwrappedAddress
|
||||
for addr in actor.reg_addrs:
|
||||
try:
|
||||
waddr = wrap_address(addr)
|
||||
|
|
@ -1642,7 +1675,9 @@ async def async_main(
|
|||
await debug.pause()
|
||||
|
||||
# !TODO, get rid of the local-portal crap XD
|
||||
reg_portal: Portal
|
||||
async with get_registry(addr) as reg_portal:
|
||||
accept_addr: UnwrappedAddress
|
||||
for accept_addr in accept_addrs:
|
||||
accept_addr = wrap_address(accept_addr)
|
||||
|
||||
|
|
@ -1658,8 +1693,12 @@ async def async_main(
|
|||
|
||||
is_registered: bool = True
|
||||
|
||||
# init steps complete
|
||||
task_status.started()
|
||||
# init steps complete, deliver IPC-server and
|
||||
# registrar addrs back to caller.
|
||||
task_status.started((
|
||||
accept_addrs,
|
||||
actor.reg_addrs,
|
||||
))
|
||||
|
||||
# Begin handling our new connection back to our
|
||||
# parent. This is done last since we don't want to
|
||||
|
|
|
|||
|
|
@ -38,6 +38,7 @@ import trio
|
|||
from ._exceptions import (
|
||||
ContextCancelled,
|
||||
RemoteActorError,
|
||||
TransportClosed,
|
||||
)
|
||||
from .log import get_logger
|
||||
from .trionics import (
|
||||
|
|
@ -409,10 +410,8 @@ class MsgStream(trio.abc.Channel):
|
|||
# it).
|
||||
with trio.CancelScope(shield=True):
|
||||
await self._ctx.send_stop()
|
||||
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError
|
||||
TransportClosed,
|
||||
) as re:
|
||||
# the underlying channel may already have been pulled
|
||||
# in which case our stop message is meaningless since
|
||||
|
|
@ -593,9 +592,8 @@ class MsgStream(trio.abc.Channel):
|
|||
),
|
||||
)
|
||||
except (
|
||||
trio.ClosedResourceError,
|
||||
trio.BrokenResourceError,
|
||||
BrokenPipeError,
|
||||
TransportClosed,
|
||||
) as _trans_err:
|
||||
trans_err = _trans_err
|
||||
if (
|
||||
|
|
|
|||
|
|
@ -1260,3 +1260,26 @@ async def breakpoint(
|
|||
api_frame=inspect.currentframe(),
|
||||
**kwargs,
|
||||
)
|
||||
|
||||
|
||||
async def maybe_pause_bp():
|
||||
'''
|
||||
Internal (ONLY for now) `breakpoint()`-er fn which only tries to
|
||||
use the multi-actor `.pause()` API when the current actor is the
|
||||
root.
|
||||
|
||||
?! BUT WHY !?
|
||||
-------
|
||||
|
||||
This is useful when debugging cases where the tpt layer breaks
|
||||
(or is intentionally broken, say during resiliency testing) in
|
||||
the case where a child can no longer contact the root process to
|
||||
acquire the process-tree-singleton TTY lock.
|
||||
|
||||
'''
|
||||
import tractor
|
||||
actor = tractor.current_actor()
|
||||
if actor.aid.name == 'root':
|
||||
await tractor.pause(shield=True)
|
||||
else:
|
||||
tractor.devx.mk_pdb().set_trace()
|
||||
|
|
|
|||
|
|
@ -307,7 +307,12 @@ class Channel:
|
|||
|
||||
) -> None:
|
||||
'''
|
||||
Send a coded msg-blob over the transport.
|
||||
Send a coded msg-blob over the underlying IPC transport.
|
||||
|
||||
This fn raises `TransportClosed` on comms failures and is
|
||||
normally handled by higher level runtime machinery for the
|
||||
expected-graceful cases, normally ephemercal
|
||||
(re/dis)connects.
|
||||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
|
|
@ -334,9 +339,10 @@ class Channel:
|
|||
except KeyError:
|
||||
raise err
|
||||
case TransportClosed():
|
||||
src_exc_str: str = err.repr_src_exc()
|
||||
log.transport(
|
||||
f'Transport stream closed due to\n'
|
||||
f'{err.repr_src_exc()}\n'
|
||||
f'Transport stream closed due to,\n'
|
||||
f'{src_exc_str}'
|
||||
)
|
||||
|
||||
case _:
|
||||
|
|
@ -345,6 +351,11 @@ class Channel:
|
|||
raise
|
||||
|
||||
async def recv(self) -> Any:
|
||||
'''
|
||||
Receive the latest (queued) msg-blob from the underlying IPC
|
||||
transport.
|
||||
|
||||
'''
|
||||
assert self._transport
|
||||
return await self._transport.recv()
|
||||
|
||||
|
|
@ -418,16 +429,18 @@ class Channel:
|
|||
self
|
||||
) -> AsyncGenerator[Any, None]:
|
||||
'''
|
||||
Yield `MsgType` IPC msgs decoded and deliverd from
|
||||
an underlying `MsgTransport` protocol.
|
||||
Yield `MsgType` IPC msgs decoded and deliverd from an
|
||||
underlying `MsgTransport` protocol.
|
||||
|
||||
This is a streaming routine alo implemented as an async-gen
|
||||
func (same a `MsgTransport._iter_pkts()`) gets allocated by
|
||||
a `.__call__()` inside `.__init__()` where it is assigned to
|
||||
the `._aiter_msgs` attr.
|
||||
This is a streaming routine alo implemented as an
|
||||
async-generator func (same a `MsgTransport._iter_pkts()`)
|
||||
gets allocated by a `.__call__()` inside `.__init__()` where
|
||||
it is assigned to the `._aiter_msgs` attr.
|
||||
|
||||
'''
|
||||
assert self._transport
|
||||
if not self._transport:
|
||||
raise RuntimeError('No IPC transport initialized!?')
|
||||
|
||||
while True:
|
||||
try:
|
||||
async for msg in self._transport:
|
||||
|
|
@ -462,7 +475,15 @@ class Channel:
|
|||
# continue
|
||||
|
||||
def connected(self) -> bool:
|
||||
return self._transport.connected() if self._transport else False
|
||||
'''
|
||||
Predicate whether underlying IPC tpt is connected.
|
||||
|
||||
'''
|
||||
return (
|
||||
self._transport.connected()
|
||||
if self._transport
|
||||
else False
|
||||
)
|
||||
|
||||
async def _do_handshake(
|
||||
self,
|
||||
|
|
@ -493,8 +514,11 @@ async def _connect_chan(
|
|||
addr: UnwrappedAddress
|
||||
) -> typing.AsyncGenerator[Channel, None]:
|
||||
'''
|
||||
Create and connect a channel with disconnect on context manager
|
||||
teardown.
|
||||
Create and connect a `Channel` to the provided `addr`, disconnect
|
||||
it on cm exit.
|
||||
|
||||
NOTE, this is a lowlevel, normally internal-only iface. You
|
||||
should likely use `.open_portal()` instead.
|
||||
|
||||
'''
|
||||
chan = await Channel.from_addr(addr)
|
||||
|
|
|
|||
|
|
@ -154,7 +154,6 @@ class MsgTransport(Protocol):
|
|||
# ...
|
||||
|
||||
|
||||
|
||||
class MsgpackTransport(MsgTransport):
|
||||
|
||||
# TODO: better naming for this?
|
||||
|
|
@ -278,14 +277,18 @@ class MsgpackTransport(MsgTransport):
|
|||
except trio.ClosedResourceError as cre:
|
||||
closure_err = cre
|
||||
|
||||
# await tractor.devx._trace.maybe_pause_bp()
|
||||
|
||||
raise TransportClosed(
|
||||
message=(
|
||||
f'{tpt_name} was already closed locally ?\n'
|
||||
f'{tpt_name} was already closed locally?'
|
||||
),
|
||||
src_exc=closure_err,
|
||||
loglevel='error',
|
||||
raise_on_report=(
|
||||
'another task closed this fd' in closure_err.args
|
||||
'another task closed this fd'
|
||||
in
|
||||
closure_err.args
|
||||
),
|
||||
) from closure_err
|
||||
|
||||
|
|
@ -435,6 +438,11 @@ class MsgpackTransport(MsgTransport):
|
|||
trans_err = _re
|
||||
tpt_name: str = f'{type(self).__name__!r}'
|
||||
|
||||
trans_err_msg: str = trans_err.args[0]
|
||||
by_whom: str = {
|
||||
'another task closed this fd': 'locally',
|
||||
'this socket was already closed': 'by peer',
|
||||
}.get(trans_err_msg)
|
||||
match trans_err:
|
||||
|
||||
# XXX, specifc to UDS transport and its,
|
||||
|
|
@ -446,13 +454,13 @@ class MsgpackTransport(MsgTransport):
|
|||
case trio.BrokenResourceError() if (
|
||||
'[Errno 32] Broken pipe'
|
||||
in
|
||||
trans_err.args[0]
|
||||
trans_err_msg
|
||||
):
|
||||
tpt_closed = TransportClosed.from_src_exc(
|
||||
message=(
|
||||
f'{tpt_name} already closed by peer\n'
|
||||
),
|
||||
body=f'{self}\n',
|
||||
body=f'{self}',
|
||||
src_exc=trans_err,
|
||||
raise_on_report=True,
|
||||
loglevel='transport',
|
||||
|
|
@ -462,24 +470,26 @@ class MsgpackTransport(MsgTransport):
|
|||
# ??TODO??, what case in piker does this and HOW
|
||||
# CAN WE RE-PRODUCE IT?!?!?
|
||||
case trio.ClosedResourceError() if (
|
||||
'this socket was already closed'
|
||||
in
|
||||
trans_err.args[0]
|
||||
by_whom
|
||||
):
|
||||
tpt_closed = TransportClosed.from_src_exc(
|
||||
message=(
|
||||
f'{tpt_name} already closed by peer\n'
|
||||
f'{tpt_name} was already closed {by_whom!r}?\n'
|
||||
),
|
||||
body=f'{self}\n',
|
||||
body=f'{self}',
|
||||
src_exc=trans_err,
|
||||
raise_on_report=True,
|
||||
loglevel='transport',
|
||||
)
|
||||
|
||||
# await tractor.devx._trace.maybe_pause_bp()
|
||||
raise tpt_closed from trans_err
|
||||
|
||||
# unless the disconnect condition falls under "a
|
||||
# normal operation breakage" we usualy console warn
|
||||
# about it.
|
||||
# XXX, unless the disconnect condition falls
|
||||
# under "a normal/expected operating breakage"
|
||||
# (per the `trans_err_msg` guards in the cases
|
||||
# above) we usualy console-error about it and
|
||||
# raise-thru. about it.
|
||||
case _:
|
||||
log.exception(
|
||||
f'{tpt_name} layer failed pre-send ??\n'
|
||||
|
|
|
|||
16
uv.lock
16
uv.lock
|
|
@ -500,7 +500,7 @@ wheels = [
|
|||
|
||||
[[package]]
|
||||
name = "py-cid"
|
||||
version = "0.4.0"
|
||||
version = "0.5.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "morphys" },
|
||||
|
|
@ -508,9 +508,9 @@ dependencies = [
|
|||
{ name = "py-multicodec" },
|
||||
{ name = "py-multihash" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/e7/09/c0ca25eac91c62f6f22f5ac6accd0bfa957e77adfdffd0eccc0700f2ea07/py_cid-0.4.0.tar.gz", hash = "sha256:7c15d6a83f59c3a4c7fbff793f1d4cbfc831e90355fd0e2c5cfe927c21733cc3", size = 25970, upload-time = "2025-12-19T16:55:01.057Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/96/8e/68c2bd0346247570e8e01e8c170a0237884e95cdfa43989527b71adaa978/py_cid-0.5.0.tar.gz", hash = "sha256:93c62586c672353a9862f3fce13c9848ea39a00378e0980e2f0eed91631f3d28", size = 38028, upload-time = "2026-02-13T19:03:28.603Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/80/39/d5c1828e79526002f1bf87b9daba01c7db445960daf341e1dd84a5ff0469/py_cid-0.4.0-py3-none-any.whl", hash = "sha256:6a3183a3088b219dbf3cb37eec7d47a644be3f3ebabdf38347c2e9312621d6cc", size = 8833, upload-time = "2025-12-19T16:54:59.233Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/2b/18/eaea1571ae8b4fa490793a4b78a9641c4579a884f7a26f3d1b019d7e91c2/py_cid-0.5.0-py3-none-any.whl", hash = "sha256:2fbad437384534e2a0ab0c4068aac3e510c4cb710c89c8f6bf98f4b07ed54e3e", size = 16046, upload-time = "2026-02-13T19:03:27.516Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -889,13 +889,13 @@ wheels = [
|
|||
|
||||
[[package]]
|
||||
name = "xonsh"
|
||||
version = "0.22.2"
|
||||
version = "0.22.4"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/6a/1b/0298e083542044e9c8a5cf95bfae6f2ec90574dc8442982a12224cb00096/xonsh-0.22.2.tar.gz", hash = "sha256:a3ceb8dc2111bb383e464b46b59e5a1d7811ee8d947d2227d64200d6788ff815", size = 826228, upload-time = "2026-02-03T09:25:41.692Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/48/df/1fc9ed62b3d7c14612e1713e9eb7bd41d54f6ad1028a8fbb6b7cddebc345/xonsh-0.22.4.tar.gz", hash = "sha256:6be346563fec2db75778ba5d2caee155525e634e99d9cc8cc347626025c0b3fa", size = 826665, upload-time = "2026-02-17T07:53:39.424Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/c8/77/f6827a9a09eab5c61add38449223386b5593dbc6abc84e40d9d0cc8fb383/xonsh-0.22.2-py311-none-any.whl", hash = "sha256:4e4c982035e5109c00a4fc5966b0191cec4a8794ea4d589f99e9656796395653", size = 654278, upload-time = "2026-02-03T09:25:39.391Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/f4/22/335e1b327144403457aaeadc20dfcaba8399cbf66b0b1e1c5d5d342e3008/xonsh-0.22.2-py312-none-any.whl", hash = "sha256:03f36cd4f49d4c4d3cf9927d2ae1f51690dc6faf5591013e2bec80fd9796b9ca", size = 654287, upload-time = "2026-02-03T09:26:04.663Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/9e/ef/be5937405380c73f07b2f68f26092eef0f4000f49a0e30697b8feca38fb1/xonsh-0.22.2-py313-none-any.whl", hash = "sha256:1abbb33324859dd3bb69e99e8a043281c2e5e1f779c3c482756ffc7c0723a825", size = 654765, upload-time = "2026-02-03T09:25:41.852Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/2e/00/7cbc0c1fb64365a0a317c54ce3a151c9644eea5a509d9cbaae61c9fd1426/xonsh-0.22.4-py311-none-any.whl", hash = "sha256:38b29b29fa85aa756462d9d9bbcaa1d85478c2108da3de6cc590a69a4bcd1a01", size = 654375, upload-time = "2026-02-17T07:53:37.702Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/2e/c2/3dd498dc28d8f89cdd52e39950c5e591499ae423f61694c0bb4d03ed1d82/xonsh-0.22.4-py312-none-any.whl", hash = "sha256:4e538fac9f4c3d866ddbdeca068f0c0515469c997ed58d3bfee963878c6df5a5", size = 654300, upload-time = "2026-02-17T07:53:35.813Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/82/7d/1f9c7147518e9f03f6ce081b5bfc4f1aceb6ec5caba849024d005e41d3be/xonsh-0.22.4-py313-none-any.whl", hash = "sha256:cc5fabf0ad0c56a2a11bed1e6a43c4ec6416a5b30f24f126b8e768547c3793e2", size = 654818, upload-time = "2026-02-17T07:53:33.477Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
Loading…
Reference in New Issue