Compare commits

..

15 Commits

Author SHA1 Message Date
Gud Boi c974ce242c Less newlines in `._rpc` log msg 2026-02-11 22:14:06 -05:00
Gud Boi 7c9be6a772 Use test-harness `loglevel` in inter-peer suite 2026-02-11 22:14:06 -05:00
Gud Boi 1bd2bf9e41 Hide private fields in `Struct.pformat()` output
Skip fields starting with `_` in pretty-printed struct output
to avoid cluttering displays with internal/private state (and/or accessing
private properties which have errors Bp).

Deats,
- add `if k[0] == '_': continue` check to skip private fields
- change nested `if isinstance(v, Struct)` to `elif` since we
  now have early-continue for private fields
- mv `else:` comment to clarify it handles top-level fields
- fix indentation of `yield` statement to only output
  non-private, non-nested fields

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 22:14:06 -05:00
Gud Boi caba8beccd Tried out an alt approach for `.to_asyncio` crashes
This change is masked out now BUT i'm leaving it in for reference.

I was debugging a multi-actor fault where the primary source actor was
an infected-aio-subactor (`brokerd.ib`) and it seemed like the REPL was only
entering on the `trio` side (at a `.open_channel_from()`) and not
eventually breaking in the `asyncio.Task`. But, since (changing
something?) it seems to be working now, it's just that the `trio` side
seems to sometimes handle before the (source/causing and more
child-ish) `asyncio`-task, which is a bit odd and not expected..
We could likely refine (maybe with an inter-loop-task REPL lock?) this
at some point and ensure a child-`asyncio` task which errors always
grabs the REPL **first**?

Lowlevel deats/further-todos,
- add (masked) `maybe_open_crash_handler()` block around
  `asyncio.Task` execution with notes about weird parent-addr
  delivery bug in `test_sync_pause_from_aio_task`
  * yeah dunno what that's about but made a bug; seems to be IPC
    serialization of the `TCPAddress` struct somewhere??
- add inter-loop lock TODO for avoiding aio-task clobbering
  trio-tasks when both crash in debug-mode

Also,
- change import from `tractor.devx.debug` to `tractor.devx`
- adjust `get_logger()` call to use new implicit mod-name detection
  added to `.log.get_logger()`, i.e. sin `name=__name__`.
- some teensie refinements to `open_channel_from()`:
  * swap return type annotation for  to `tuple[LinkedTaskChannel, Any]`
    (was `Any`).
  * update doc-string to clarify started-value delivery
  * add err-log before `.pause()` in what should be an unreachable path.
  * add todo to swap the `(first, chan)` pair to match that of ctx..

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])

[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 22:14:06 -05:00
Gud Boi 40e0cf51cd Add `uds` to `._multiaddr`, tweak typing 2026-02-11 22:14:06 -05:00
Gud Boi dcc187b57e Add `multiaddr` and bump up some deps
Since we're planning to use it for (discovery) addressing, allowing
replacement of the hacky (pretend) attempt in `tractor._multiaddr` Bp

Also pin some deps,
- make us py312+
- use `pdbp` with my frame indexing fix.
- mv to latest `xonsh` for the fancy cmd/suggestion injections.

Bump lock file to match obvi!
2026-02-11 22:14:06 -05:00
Gud Boi 421d13604f Bump `ruff.toml` to target py313 2026-02-11 22:14:06 -05:00
Tyler Goodlet d9f980388b Use `platformdirs` for `.config.get_rt_dir()`
Thanks to the `tox`-dev community for such a lovely pkg which seems to
solves all the current cross-platform user-dir problems B)

Also this,
- now passes `platformdirs.user_runtime_dir(appname='tractor')`
  and allows caller to pass an optional `subdir` under `tractor/`
  if desired.
- drops the `.config._rtdir: Path` mod var.
- bumps the lock file with the new dep.
2026-02-11 22:13:40 -05:00
Tyler Goodlet a7eee7655d Extend `.to_asyncio.LinkedTaskChannel` for aio side
With methods to comms similar to those that exist for the `trio` side,
- `.get()` which proxies verbatim to the `._to_aio: asyncio.Queue`,
- `.send_nowait()` which thin-wraps to `._to_trio: trio.MemorySendChannel`.

Obviously the more correct design is to break up the channel type into
a pair of handle types, one for each "side's" task in each event-loop,
that's hopefully coming shortly in a follow up patch B)

Also,
- fill in some missing doc strings, tweak some explanation comments and
  update todos.
- adjust the `test_aio_errors_and_channel_propagates_and_closes()` suite
  to use the new `chan` fn-sig-API with `.open_channel_from()` including
  the new methods for msg comms; ensures everything added here works e2e.
2026-02-11 22:13:40 -05:00
Tyler Goodlet 2dddedada9 Hide `._rpc._invoke()` frame, again.. 2026-02-11 22:13:40 -05:00
Tyler Goodlet 1772b8ec2c Explain the `infect_asyncio: bool` param to pass in RTE msg 2026-02-11 22:13:40 -05:00
Tyler Goodlet 5836c7e88c Toss in masked `.set_trace()` for unshielded `.pause()` debug 2026-02-11 22:13:40 -05:00
Gud Boi f03730efa2 Unmask `ClosedResourceError` handling in `._transport`
Unmask the CRE case block for peer-closed socket errors which already
had a TODO about reproducing the condition. It appears this case can
happen during inter-actor comms teardowns in `piker`, but i haven't been
able to figure out exactly what reproduces it yet..

So activate the block again for that 'socket already closed'-msg case,
and add a TODO questioning how to reproduce it.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-11 22:13:35 -05:00
Tyler Goodlet d6c405ec6b Mask tpt-closed handling of `chan.send(return_msg)`
A partial revert of commit c05d08e426 since it seem we already
suppress tpt-closed errors lower down in `.ipc.Channel.send()`; given
that i'm pretty sure this new handler code should basically never run?

Left in a todo to remove the masked content once i'm done more
thoroughly testing under `piker`.
2026-02-11 22:13:35 -05:00
Tyler Goodlet ab7b75a9d6 More `TransportClosed`-handling around IPC-IO
For IPC-disconnects-during-teardown edge cases, augment some `._rpc`
machinery,
- in `._invoke()` around the `await chan.send(return_msg)` where we
  suppress if the underlying `Channel` already disconnected.
- add a disjoint handler in `_errors_relayed_via_ipc()` which just
  reports-n-reraises the exc (same as prior behaviour).
  * originally i thought it needed to be handled specially (to avoid
    being crash handled) but turns out that isn't necessary?
  * hence the also-added-bu-masked-out `debug_filter` / guard expression
    around the `await debug._maybe_enter_pm()` line.
- show the `._invoke()` frame for the moment.
2026-02-11 22:13:35 -05:00
17 changed files with 142 additions and 413 deletions

View File

@ -17,7 +17,6 @@ from tractor import (
MsgStream,
_testing,
trionics,
TransportClosed,
)
import trio
import pytest
@ -209,16 +208,12 @@ async def main(
# TODO: is this needed or no?
raise
except (
trio.ClosedResourceError,
TransportClosed,
) as _tpt_err:
except trio.ClosedResourceError:
# NOTE: don't send if we already broke the
# connection to avoid raising a closed-error
# such that we drop through to the ctl-c
# mashing by user.
with trio.CancelScope(shield=True):
await trio.sleep(0.01)
await trio.sleep(0.01)
# timeout: int = 1
# with trio.move_on_after(timeout) as cs:
@ -252,7 +247,6 @@ async def main(
await stream.send(i)
pytest.fail('stream not closed?')
except (
TransportClosed,
trio.ClosedResourceError,
trio.EndOfChannel,
) as send_err:

View File

@ -4,7 +4,6 @@
'''
from __future__ import annotations
import time
import signal
from typing import (
Callable,
TYPE_CHECKING,
@ -70,15 +69,12 @@ def spawn(
import os
os.environ['PYTHON_COLORS'] = '0'
spawned: PexpectSpawner|None = None
def _spawn(
cmd: str,
**mkcmd_kwargs,
) -> pty_spawn.spawn:
nonlocal spawned
unset_colors()
spawned = testdir.spawn(
return testdir.spawn(
cmd=mk_cmd(
cmd,
**mkcmd_kwargs,
@ -88,35 +84,9 @@ def spawn(
# ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff?
)
return spawned
# such that test-dep can pass input script name.
yield _spawn # the `PexpectSpawner`, type alias.
if (
spawned
and
(ptyproc := spawned.ptyproc)
):
start: float = time.time()
timeout: float = 5
while (
ptyproc.isalive()
and
(
(_time_took := (time.time() - start))
<
timeout
)
):
ptyproc.kill(signal.SIGINT)
time.sleep(0.01)
if ptyproc.isalive():
ptyproc.kill(signal.SIGKILL)
# TODO? ensure we've cleaned up any UDS-paths?
# breakpoint()
return _spawn # the `PexpectSpawner`, type alias.
@pytest.fixture(

View File

@ -1138,10 +1138,7 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
['peer IPC channel closed abruptly?',
'another task closed this fd',
'Debug lock request was CANCELLED?',
"'MsgpackUDSStream' was already closed locally?",
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
# ?TODO^? match depending on `tpt_proto(s)`?
]
"TransportClosed: 'MsgpackUDSStream' was already closed locally ?",]
# XXX races on whether these show/hit?
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',

View File

@ -98,8 +98,7 @@ def test_ipc_channel_break_during_stream(
expect_final_exc = TransportClosed
mod: ModuleType = import_path(
examples_dir()
/ 'advanced_faults'
examples_dir() / 'advanced_faults'
/ 'ipc_failure_during_stream.py',
root=examples_dir(),
consider_namespace_packages=False,
@ -114,9 +113,8 @@ def test_ipc_channel_break_during_stream(
if (
# only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False
and
# AND we tell the child to call `MsgStream.aclose()`.
pre_aclose_msgstream
and pre_aclose_msgstream
):
# expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
@ -162,8 +160,7 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_child_ipc_after'] is not False
and (
ipc_break['break_parent_ipc_after']
>
ipc_break['break_child_ipc_after']
> ipc_break['break_child_ipc_after']
)
):
if pre_aclose_msgstream:
@ -251,15 +248,8 @@ def test_ipc_channel_break_during_stream(
# get raw instance from pytest wrapper
value = excinfo.value
if isinstance(value, ExceptionGroup):
excs: tuple[Exception] = value.exceptions
assert (
len(excs) <= 2
and
all(
isinstance(exc, TransportClosed)
for exc in excs
)
)
excs = value.exceptions
assert len(excs) == 1
final_exc = excs[0]
assert isinstance(final_exc, expect_final_exc)

View File

@ -11,13 +11,12 @@ import trio
import tractor
from tractor import ( # typing
Actor,
Context,
ContextCancelled,
MsgStream,
Portal,
RemoteActorError,
current_actor,
open_nursery,
Portal,
Context,
ContextCancelled,
RemoteActorError,
)
from tractor._testing import (
# tractor_test,
@ -797,8 +796,8 @@ async def basic_echo_server(
) -> None:
'''
Just the simplest `MsgStream` echo server which resays what you
told it but with its uid in front ;)
Just the simplest `MsgStream` echo server which resays what
you told it but with its uid in front ;)
'''
actor: Actor = tractor.current_actor()
@ -967,14 +966,9 @@ async def tell_little_bro(
caller: str = '',
err_after: float|None = None,
rng_seed: int = 100,
# NOTE, ensure ^ is large enough (on fast hw anyway)
# to ensure the peer cancel req arrives before the
# echoing dialog does itself Bp
rng_seed: int = 50,
):
# contact target actor, do a stream dialog.
lb: Portal
echo_ipc: MsgStream
async with (
tractor.wait_for_actor(
name=actor_name
@ -989,6 +983,7 @@ async def tell_little_bro(
else None
),
) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc,
):
actor: Actor = current_actor()
@ -999,7 +994,6 @@ async def tell_little_bro(
i,
)
await echo_ipc.send(msg)
await trio.sleep(0.001)
resp = await echo_ipc.receive()
print(
f'{caller} => {actor_name}: {msg}\n'
@ -1012,9 +1006,6 @@ async def tell_little_bro(
assert sub_uid != uid
assert _i == i
# XXX, usually should never get here!
# await tractor.pause()
@pytest.mark.parametrize(
'raise_client_error',

View File

@ -1,13 +1,8 @@
"""
Multiple python programs invoking the runtime.
"""
from __future__ import annotations
import platform
import subprocess
import time
from typing import (
TYPE_CHECKING,
)
import pytest
import trio
@ -15,29 +10,14 @@ import tractor
from tractor._testing import (
tractor_test,
)
from tractor import (
current_actor,
_state,
Actor,
Context,
Portal,
)
from .conftest import (
sig_prog,
_INT_SIGNAL,
_INT_RETURN_CODE,
)
if TYPE_CHECKING:
from tractor.msg import Aid
from tractor._addr import (
UnwrappedAddress,
)
def test_abort_on_sigint(
daemon: subprocess.Popen,
):
def test_abort_on_sigint(daemon):
assert daemon.returncode is None
time.sleep(0.1)
sig_prog(daemon, _INT_SIGNAL)
@ -50,11 +30,8 @@ def test_abort_on_sigint(
@tractor_test
async def test_cancel_remote_arbiter(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
assert not current_actor().is_arbiter
async def test_cancel_remote_arbiter(daemon, reg_addr):
assert not tractor.current_actor().is_arbiter
async with tractor.get_registry(reg_addr) as portal:
await portal.cancel_actor()
@ -68,106 +45,24 @@ async def test_cancel_remote_arbiter(
pass
def test_register_duplicate_name(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
):
def test_register_duplicate_name(daemon, reg_addr):
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
) as an:
) as n:
assert not current_actor().is_arbiter
assert not tractor.current_actor().is_arbiter
p1 = await an.start_actor('doggy')
p2 = await an.start_actor('doggy')
p1 = await n.start_actor('doggy')
p2 = await n.start_actor('doggy')
async with tractor.wait_for_actor('doggy') as portal:
assert portal.channel.uid in (p2.channel.uid, p1.channel.uid)
await an.cancel()
await n.cancel()
# XXX, run manually since we want to start this root **after**
# the other "daemon" program with it's own root.
trio.run(main)
@tractor.context
async def get_root_portal(
ctx: Context,
):
'''
Connect back to the root actor manually (using `._discovery` API)
and ensure it's contact info is the same as our immediate parent.
'''
sub: Actor = current_actor()
rtvs: dict = _state._runtime_vars
raddrs: list[UnwrappedAddress] = rtvs['_root_addrs']
# await tractor.pause()
# XXX, in case the sub->root discovery breaks you might need
# this (i know i did Xp)!!
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
assert (
len(raddrs) == 1
and
list(sub._parent_chan.raddr.unwrap()) in raddrs
)
# connect back to our immediate parent which should also
# be the actor-tree's root.
from tractor._discovery import get_root
ptl: Portal
async with get_root() as ptl:
root_aid: Aid = ptl.chan.aid
parent_ptl: Portal = current_actor().get_parent()
assert (
root_aid.name == 'root'
and
parent_ptl.chan.aid == root_aid
)
await ctx.started()
def test_non_registrar_spawns_child(
daemon: subprocess.Popen,
reg_addr: UnwrappedAddress,
loglevel: str,
debug_mode: bool,
):
'''
Ensure a non-regristar (serving) root actor can spawn a sub and
that sub can connect back (manually) to it's rent that is the
root without issue.
More or less this audits the global contact info in
`._state._runtime_vars`.
'''
async def main():
async with tractor.open_nursery(
registry_addrs=[reg_addr],
loglevel=loglevel,
debug_mode=debug_mode,
) as an:
actor: Actor = tractor.current_actor()
assert not actor.is_registrar
sub_ptl: Portal = await an.start_actor(
name='sub',
enable_modules=[__name__],
)
async with sub_ptl.open_context(
get_root_portal,
) as (ctx, _):
print('Waiting for `sub` to connect back to us..')
await an.cancel()
# XXX, run manually since we want to start this root **after**
# the other "daemon" program with it's own root.
# run it manually since we want to start **after**
# the other "daemon" program
trio.run(main)

View File

@ -70,7 +70,6 @@ from ._exceptions import (
MsgTypeError,
RemoteActorError,
StreamOverrun,
TransportClosed,
pack_from_raise,
unpack_error,
)
@ -2429,7 +2428,10 @@ async def open_context_from_portal(
try:
# await pause(shield=True)
await ctx.cancel()
except TransportClosed:
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
):
log.warning(
'IPC connection for context is broken?\n'
f'task: {ctx.cid}\n'

View File

@ -91,13 +91,10 @@ async def get_registry(
@acm
async def get_root(**kwargs) -> AsyncGenerator[Portal, None]:
'''
Deliver the current actor's "root process" actor (yes in actor
and proc tree terms) by delivering a `Portal` from the spawn-time
provided contact address.
async def get_root(
**kwargs,
) -> AsyncGenerator[Portal, None]:
'''
# TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs?
addr = _runtime_vars['_root_mailbox']
@ -196,11 +193,6 @@ async def maybe_open_portal(
addr: UnwrappedAddress,
name: str,
):
'''
Open a `Portal` to the actor serving @ `addr` or `None` if no
peer can be contacted or found.
'''
async with query_actor(
name=name,
regaddr=addr,

View File

@ -329,7 +329,18 @@ class Portal:
# if we get here some weird cancellation case happened
return False
except TransportClosed as tpt_err:
except (
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError,
trio.BrokenResourceError,
TransportClosed,
) as tpt_err:
ipc_borked_report: str = (
f'IPC for actor already closed/broken?\n\n'
f'\n'

View File

@ -88,8 +88,7 @@ async def maybe_block_bp(
bp_blocked: bool
if (
debug_mode
and
maybe_enable_greenback
and maybe_enable_greenback
and (
maybe_mod := await debug.maybe_init_greenback(
raise_not_found=False,
@ -386,13 +385,10 @@ async def open_root_actor(
addr,
)
tpt_bind_addrs: list[
Address # `Address.get_random()` case
|UnwrappedAddress # registrar case `= uw_reg_addrs`
] = []
trans_bind_addrs: list[UnwrappedAddress] = []
# ------ NON-REGISTRAR ------
# create a new root-actor instance.
# Create a new local root-actor instance which IS NOT THE
# REGISTRAR
if ponged_addrs:
if ensure_registry:
raise RuntimeError(
@ -419,21 +415,12 @@ async def open_root_actor(
# XXX INSTEAD, bind random addrs using the same tpt
# proto.
for addr in ponged_addrs:
tpt_bind_addrs.append(
# XXX, these are `Address` NOT `UnwrappedAddress`.
#
# NOTE, in the case of posix/berkley socket
# protos we allocate port=0 such that the system
# allocates a random value at bind time; this
# happens in the `.ipc.*` stack's backend.
trans_bind_addrs.append(
addr.get_random(
bindspace=addr.bindspace,
)
)
# ------ REGISTRAR ------
# create a new "registry providing" root-actor instance.
#
# Start this local actor as the "registrar", aka a regular
# actor who manages the local registry of "mailboxes" of
# other process-tree-local sub-actors.
@ -442,7 +429,7 @@ async def open_root_actor(
# following init steps are taken:
# - the tranport layer server is bound to each addr
# pair defined in provided registry_addrs, or the default.
tpt_bind_addrs = uw_reg_addrs
trans_bind_addrs = uw_reg_addrs
# - it is normally desirable for any registrar to stay up
# indefinitely until either all registered (child/sub)
@ -462,10 +449,20 @@ async def open_root_actor(
enable_modules=enable_modules,
)
# XXX, in case the root actor runtime was actually run from
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOT
# `tractor.to_asyncio.run_as_asyncio_guest()` and NOt
# `.trio.run()`.
actor._infected_aio = _state._runtime_vars['_is_infected_aio']
# NOTE, only set the loopback addr for the
# process-tree-global "root" mailbox since all sub-actors
# should be able to speak to their root actor over that
# channel.
raddrs: list[Address] = _state._runtime_vars['_root_addrs']
raddrs.extend(trans_bind_addrs)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# Start up main task set via core actor-runtime nurseries.
try:
# assign process-local actor
@ -502,39 +499,14 @@ async def open_root_actor(
# "actor runtime" primitives are SC-compat and thus all
# transitively spawned actors/processes must be as
# well.
accept_addrs: list[UnwrappedAddress]
reg_addrs: list[UnwrappedAddress]
(
accept_addrs,
reg_addrs,
) = await root_tn.start(
await root_tn.start(
partial(
_runtime.async_main,
actor,
accept_addrs=tpt_bind_addrs,
accept_addrs=trans_bind_addrs,
parent_addr=None
)
)
# NOTE, only set a local-host addr (i.e. like
# `lo`-loopback for TCP) for the process-tree-global
# "root"-process (its tree-wide "mailbox") since all
# sub-actors should be able to speak to their root
# actor over that channel.
#
# ?TODO, per-OS non-network-proto alt options?
# -[ ] on linux we should be able to always use UDS?
#
raddrs: list[UnwrappedAddress] = _state._runtime_vars['_root_addrs']
raddrs.extend(
accept_addrs,
)
# TODO, remove once we have also removed all usage;
# eventually all (root-)registry apis should expect > 1 addr.
_state._runtime_vars['_root_mailbox'] = raddrs[0]
# if 'chart' in actor.aid.name:
# from tractor.devx import mk_pdb
# mk_pdb().set_trace()
try:
yield actor
except (
@ -616,13 +588,6 @@ async def open_root_actor(
):
_state._runtime_vars['_debug_mode'] = False
# !XXX, clear ALL prior contact info state, this is MEGA
# important if you are opening the runtime multiple times
# from the same parent process (like in our test
# harness)!
_state._runtime_vars['_root_addrs'].clear()
_state._runtime_vars['_root_mailbox'] = None
_state._current_actor = None
_state._last_actor_terminated = actor

View File

@ -284,14 +284,9 @@ async def _errors_relayed_via_ipc(
try:
yield # run RPC invoke body
# NOTE, never REPL any pseudo-expected tpt-disconnect.
except TransportClosed as err:
rpc_err = err
log.warning(
f'Tpt disconnect during remote-exc relay due to,\n'
f'{err!r}\n'
)
raise err
except TransportClosed:
log.exception('Tpt disconnect during remote-exc relay?')
raise
# box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel.
@ -328,6 +323,9 @@ async def _errors_relayed_via_ipc(
and debug_kbis
)
)
# TODO? better then `debug_filter` below?
and
not isinstance(err, TransportClosed)
):
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
@ -348,6 +346,13 @@ async def _errors_relayed_via_ipc(
entered_debug = await debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
# don't REPL any psuedo-expected tpt-disconnect
# debug_filter=lambda exc: (
# type (exc) not in {
# TransportClosed,
# }
# ),
)
if not entered_debug:
# if we prolly should have entered the REPL but
@ -433,7 +438,7 @@ async def _errors_relayed_via_ipc(
# cancel scope will not have been inserted yet
if is_rpc:
log.warning(
'RPC task likely crashed or cancelled before start?\n'
'RPC task likely errored or cancelled before start?\n'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
@ -689,6 +694,22 @@ async def _invoke(
f'{pretty_struct.pformat(return_msg)}\n'
)
await chan.send(return_msg)
# ?TODO, remove the below since .send() already
# doesn't raise on tpt-closed?
# try:
# await chan.send(return_msg)
# except TransportClosed:
# log.exception(
# f"Failed send final result to 'parent'-side of IPC-ctx!\n"
# f'\n'
# f'{chan}\n'
# f'Channel already disconnected ??\n'
# f'\n'
# f'{pretty_struct.pformat(return_msg)}'
# )
# # ?TODO? will this ever be true though?
# if chan.connected():
# raise
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,
@ -914,11 +935,6 @@ async def try_ship_error_to_remote(
# XXX NOTE XXX in SC terms this is one of the worst things
# that can happen and provides for a 2-general's dilemma..
#
# FURHTER, we should never really have to handle these
# lowlevel excs from `trio` since the `Channel.send()` layers
# downward should be mostly wrapping such cases in a
# tpt-closed; the `.critical()` usage is warranted.
except (
trio.ClosedResourceError,
trio.BrokenResourceError,

View File

@ -147,8 +147,6 @@ def get_mod_nsps2fps(mod_ns_paths: list[str]) -> dict[str, str]:
return nsp2fp
_bp = False
class Actor:
'''
The fundamental "runtime" concurrency primitive.
@ -183,14 +181,6 @@ class Actor:
def is_registrar(self) -> bool:
return self.is_arbiter
@property
def is_root(self) -> bool:
'''
This actor is the parent most in the tree?
'''
return _state.is_root_process()
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`,
@ -282,9 +272,7 @@ class Actor:
stacklevel=2,
)
registry_addrs: list[Address] = [
wrap_address(arbiter_addr)
]
registry_addrs: list[Address] = [wrap_address(arbiter_addr)]
# marked by the process spawning backend at startup
# will be None for the parent most process started manually
@ -971,21 +959,6 @@ class Actor:
rvs['_is_root'] = False # obvi XD
# TODO, remove! left in just while protoing init fix!
# global _bp
# if (
# 'chart' in self.aid.name
# and
# isinstance(
# rvs['_root_addrs'][0],
# dict,
# )
# and
# not _bp
# ):
# _bp = True
# breakpoint()
_state._runtime_vars.update(rvs)
# `SpawnSpec.reg_addrs`
@ -1482,12 +1455,7 @@ async def async_main(
# be False when running as root actor and True when as
# a subactor.
parent_addr: UnwrappedAddress|None = None,
task_status: TaskStatus[
tuple[
list[UnwrappedAddress], # accept_addrs
list[UnwrappedAddress], # reg_addrs
]
] = trio.TASK_STATUS_IGNORED,
task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED,
) -> None:
'''
@ -1666,7 +1634,6 @@ async def async_main(
# if addresses point to the same actor..
# So we need a way to detect that? maybe iterate
# only on unique actor uids?
addr: UnwrappedAddress
for addr in actor.reg_addrs:
try:
waddr = wrap_address(addr)
@ -1675,9 +1642,7 @@ async def async_main(
await debug.pause()
# !TODO, get rid of the local-portal crap XD
reg_portal: Portal
async with get_registry(addr) as reg_portal:
accept_addr: UnwrappedAddress
for accept_addr in accept_addrs:
accept_addr = wrap_address(accept_addr)
@ -1693,12 +1658,8 @@ async def async_main(
is_registered: bool = True
# init steps complete, deliver IPC-server and
# registrar addrs back to caller.
task_status.started((
accept_addrs,
actor.reg_addrs,
))
# init steps complete
task_status.started()
# Begin handling our new connection back to our
# parent. This is done last since we don't want to

View File

@ -38,7 +38,6 @@ import trio
from ._exceptions import (
ContextCancelled,
RemoteActorError,
TransportClosed,
)
from .log import get_logger
from .trionics import (
@ -410,8 +409,10 @@ class MsgStream(trio.abc.Channel):
# it).
with trio.CancelScope(shield=True):
await self._ctx.send_stop()
except (
TransportClosed,
trio.BrokenResourceError,
trio.ClosedResourceError
) as re:
# the underlying channel may already have been pulled
# in which case our stop message is meaningless since
@ -592,8 +593,9 @@ class MsgStream(trio.abc.Channel):
),
)
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
TransportClosed,
) as _trans_err:
trans_err = _trans_err
if (

View File

@ -1260,26 +1260,3 @@ async def breakpoint(
api_frame=inspect.currentframe(),
**kwargs,
)
async def maybe_pause_bp():
'''
Internal (ONLY for now) `breakpoint()`-er fn which only tries to
use the multi-actor `.pause()` API when the current actor is the
root.
?! BUT WHY !?
-------
This is useful when debugging cases where the tpt layer breaks
(or is intentionally broken, say during resiliency testing) in
the case where a child can no longer contact the root process to
acquire the process-tree-singleton TTY lock.
'''
import tractor
actor = tractor.current_actor()
if actor.aid.name == 'root':
await tractor.pause(shield=True)
else:
tractor.devx.mk_pdb().set_trace()

View File

@ -307,12 +307,7 @@ class Channel:
) -> None:
'''
Send a coded msg-blob over the underlying IPC transport.
This fn raises `TransportClosed` on comms failures and is
normally handled by higher level runtime machinery for the
expected-graceful cases, normally ephemercal
(re/dis)connects.
Send a coded msg-blob over the transport.
'''
__tracebackhide__: bool = hide_tb
@ -339,10 +334,9 @@ class Channel:
except KeyError:
raise err
case TransportClosed():
src_exc_str: str = err.repr_src_exc()
log.transport(
f'Transport stream closed due to,\n'
f'{src_exc_str}'
f'Transport stream closed due to\n'
f'{err.repr_src_exc()}\n'
)
case _:
@ -351,11 +345,6 @@ class Channel:
raise
async def recv(self) -> Any:
'''
Receive the latest (queued) msg-blob from the underlying IPC
transport.
'''
assert self._transport
return await self._transport.recv()
@ -429,18 +418,16 @@ class Channel:
self
) -> AsyncGenerator[Any, None]:
'''
Yield `MsgType` IPC msgs decoded and deliverd from an
underlying `MsgTransport` protocol.
Yield `MsgType` IPC msgs decoded and deliverd from
an underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an
async-generator func (same a `MsgTransport._iter_pkts()`)
gets allocated by a `.__call__()` inside `.__init__()` where
it is assigned to the `._aiter_msgs` attr.
This is a streaming routine alo implemented as an async-gen
func (same a `MsgTransport._iter_pkts()`) gets allocated by
a `.__call__()` inside `.__init__()` where it is assigned to
the `._aiter_msgs` attr.
'''
if not self._transport:
raise RuntimeError('No IPC transport initialized!?')
assert self._transport
while True:
try:
async for msg in self._transport:
@ -475,15 +462,7 @@ class Channel:
# continue
def connected(self) -> bool:
'''
Predicate whether underlying IPC tpt is connected.
'''
return (
self._transport.connected()
if self._transport
else False
)
return self._transport.connected() if self._transport else False
async def _do_handshake(
self,
@ -514,11 +493,8 @@ async def _connect_chan(
addr: UnwrappedAddress
) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a `Channel` to the provided `addr`, disconnect
it on cm exit.
NOTE, this is a lowlevel, normally internal-only iface. You
should likely use `.open_portal()` instead.
Create and connect a channel with disconnect on context manager
teardown.
'''
chan = await Channel.from_addr(addr)

View File

@ -154,6 +154,7 @@ class MsgTransport(Protocol):
# ...
class MsgpackTransport(MsgTransport):
# TODO: better naming for this?
@ -277,18 +278,14 @@ class MsgpackTransport(MsgTransport):
except trio.ClosedResourceError as cre:
closure_err = cre
# await tractor.devx._trace.maybe_pause_bp()
raise TransportClosed(
message=(
f'{tpt_name} was already closed locally?'
f'{tpt_name} was already closed locally ?\n'
),
src_exc=closure_err,
loglevel='error',
raise_on_report=(
'another task closed this fd'
in
closure_err.args
'another task closed this fd' in closure_err.args
),
) from closure_err
@ -438,11 +435,6 @@ class MsgpackTransport(MsgTransport):
trans_err = _re
tpt_name: str = f'{type(self).__name__!r}'
trans_err_msg: str = trans_err.args[0]
by_whom: str = {
'another task closed this fd': 'locally',
'this socket was already closed': 'by peer',
}.get(trans_err_msg)
match trans_err:
# XXX, specifc to UDS transport and its,
@ -454,13 +446,13 @@ class MsgpackTransport(MsgTransport):
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe'
in
trans_err_msg
trans_err.args[0]
):
tpt_closed = TransportClosed.from_src_exc(
message=(
f'{tpt_name} already closed by peer\n'
),
body=f'{self}',
body=f'{self}\n',
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
@ -470,26 +462,24 @@ class MsgpackTransport(MsgTransport):
# ??TODO??, what case in piker does this and HOW
# CAN WE RE-PRODUCE IT?!?!?
case trio.ClosedResourceError() if (
by_whom
'this socket was already closed'
in
trans_err.args[0]
):
tpt_closed = TransportClosed.from_src_exc(
message=(
f'{tpt_name} was already closed {by_whom!r}?\n'
f'{tpt_name} already closed by peer\n'
),
body=f'{self}',
body=f'{self}\n',
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
)
# await tractor.devx._trace.maybe_pause_bp()
raise tpt_closed from trans_err
# XXX, unless the disconnect condition falls
# under "a normal/expected operating breakage"
# (per the `trans_err_msg` guards in the cases
# above) we usualy console-error about it and
# raise-thru. about it.
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
f'{tpt_name} layer failed pre-send ??\n'

16
uv.lock
View File

@ -500,7 +500,7 @@ wheels = [
[[package]]
name = "py-cid"
version = "0.5.0"
version = "0.4.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "morphys" },
@ -508,9 +508,9 @@ dependencies = [
{ name = "py-multicodec" },
{ name = "py-multihash" },
]
sdist = { url = "https://files.pythonhosted.org/packages/96/8e/68c2bd0346247570e8e01e8c170a0237884e95cdfa43989527b71adaa978/py_cid-0.5.0.tar.gz", hash = "sha256:93c62586c672353a9862f3fce13c9848ea39a00378e0980e2f0eed91631f3d28", size = 38028, upload-time = "2026-02-13T19:03:28.603Z" }
sdist = { url = "https://files.pythonhosted.org/packages/e7/09/c0ca25eac91c62f6f22f5ac6accd0bfa957e77adfdffd0eccc0700f2ea07/py_cid-0.4.0.tar.gz", hash = "sha256:7c15d6a83f59c3a4c7fbff793f1d4cbfc831e90355fd0e2c5cfe927c21733cc3", size = 25970, upload-time = "2025-12-19T16:55:01.057Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2b/18/eaea1571ae8b4fa490793a4b78a9641c4579a884f7a26f3d1b019d7e91c2/py_cid-0.5.0-py3-none-any.whl", hash = "sha256:2fbad437384534e2a0ab0c4068aac3e510c4cb710c89c8f6bf98f4b07ed54e3e", size = 16046, upload-time = "2026-02-13T19:03:27.516Z" },
{ url = "https://files.pythonhosted.org/packages/80/39/d5c1828e79526002f1bf87b9daba01c7db445960daf341e1dd84a5ff0469/py_cid-0.4.0-py3-none-any.whl", hash = "sha256:6a3183a3088b219dbf3cb37eec7d47a644be3f3ebabdf38347c2e9312621d6cc", size = 8833, upload-time = "2025-12-19T16:54:59.233Z" },
]
[[package]]
@ -889,13 +889,13 @@ wheels = [
[[package]]
name = "xonsh"
version = "0.22.4"
version = "0.22.2"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/48/df/1fc9ed62b3d7c14612e1713e9eb7bd41d54f6ad1028a8fbb6b7cddebc345/xonsh-0.22.4.tar.gz", hash = "sha256:6be346563fec2db75778ba5d2caee155525e634e99d9cc8cc347626025c0b3fa", size = 826665, upload-time = "2026-02-17T07:53:39.424Z" }
sdist = { url = "https://files.pythonhosted.org/packages/6a/1b/0298e083542044e9c8a5cf95bfae6f2ec90574dc8442982a12224cb00096/xonsh-0.22.2.tar.gz", hash = "sha256:a3ceb8dc2111bb383e464b46b59e5a1d7811ee8d947d2227d64200d6788ff815", size = 826228, upload-time = "2026-02-03T09:25:41.692Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/2e/00/7cbc0c1fb64365a0a317c54ce3a151c9644eea5a509d9cbaae61c9fd1426/xonsh-0.22.4-py311-none-any.whl", hash = "sha256:38b29b29fa85aa756462d9d9bbcaa1d85478c2108da3de6cc590a69a4bcd1a01", size = 654375, upload-time = "2026-02-17T07:53:37.702Z" },
{ url = "https://files.pythonhosted.org/packages/2e/c2/3dd498dc28d8f89cdd52e39950c5e591499ae423f61694c0bb4d03ed1d82/xonsh-0.22.4-py312-none-any.whl", hash = "sha256:4e538fac9f4c3d866ddbdeca068f0c0515469c997ed58d3bfee963878c6df5a5", size = 654300, upload-time = "2026-02-17T07:53:35.813Z" },
{ url = "https://files.pythonhosted.org/packages/82/7d/1f9c7147518e9f03f6ce081b5bfc4f1aceb6ec5caba849024d005e41d3be/xonsh-0.22.4-py313-none-any.whl", hash = "sha256:cc5fabf0ad0c56a2a11bed1e6a43c4ec6416a5b30f24f126b8e768547c3793e2", size = 654818, upload-time = "2026-02-17T07:53:33.477Z" },
{ url = "https://files.pythonhosted.org/packages/c8/77/f6827a9a09eab5c61add38449223386b5593dbc6abc84e40d9d0cc8fb383/xonsh-0.22.2-py311-none-any.whl", hash = "sha256:4e4c982035e5109c00a4fc5966b0191cec4a8794ea4d589f99e9656796395653", size = 654278, upload-time = "2026-02-03T09:25:39.391Z" },
{ url = "https://files.pythonhosted.org/packages/f4/22/335e1b327144403457aaeadc20dfcaba8399cbf66b0b1e1c5d5d342e3008/xonsh-0.22.2-py312-none-any.whl", hash = "sha256:03f36cd4f49d4c4d3cf9927d2ae1f51690dc6faf5591013e2bec80fd9796b9ca", size = 654287, upload-time = "2026-02-03T09:26:04.663Z" },
{ url = "https://files.pythonhosted.org/packages/9e/ef/be5937405380c73f07b2f68f26092eef0f4000f49a0e30697b8feca38fb1/xonsh-0.22.2-py313-none-any.whl", hash = "sha256:1abbb33324859dd3bb69e99e8a043281c2e5e1f779c3c482756ffc7c0723a825", size = 654765, upload-time = "2026-02-03T09:25:41.852Z" },
]
[[package]]