Compare commits

..

11 Commits

Author SHA1 Message Date
Gud Boi d0b92bbeba Clean up `._transport` error-case comment
Expand and clarify the comment for the default `case _`
block in the `.send()` error matcher, noting that we
console-error and raise-thru for unexpected disconnect
conditions.

(this patch was suggested by copilot in,
 https://github.com/goodboy/tractor/pull/411)

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 16:18:39 -05:00
Gud Boi 9470815f5a Fix `spawn` fixture cleanup + test assertions
Improve the `spawn` fixture teardown logic in
`tests/devx/conftest.py` fixing the while-else bug, and fix
`test_advanced_faults` genexp for `TransportClosed` exc type
checking.

Deats,
- replace broken `while-else` pattern with direct
  `if ptyproc.isalive()` check after the SIGINT loop.
- fix undefined `spawned` ref -> `ptyproc.isalive()` in
  while condition.
- improve walrus expr formatting in timeout check (multiline
  style).

Also fix `test_ipc_channel_break_during_stream()` assertion,
- wrap genexp in `all()` call so it actually checks all excs
  are `TransportClosed` instead of just creating an unused
  generator.

(this patch was suggested by copilot in,
 https://github.com/goodboy/tractor/pull/411)

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 16:14:11 -05:00
Gud Boi 592d918394 Tweak `test_inter_peer_cancellation` for races
Adjust `basic_echo_server()` default sequence len to avoid the race
where the 'tell_little_bro()` finished streaming **before** the
echo-server sub is cancelled by its peer subactor (which is the whole
thing we're testing!).

Deats,
- bump `rng_seed` default from 50 -> 100 to ensure peer
  cancel req arrives before echo dialog completes on fast hw.
- add `trio.sleep(0.001)` between send/receive in msg loop on the
  "client" streamer side to give cancel request transit more time to
  arrive.

Also,
- add more native `tractor`-type hints.
- reflow `basic_echo_server()` doc-string for 67 char limit
- add masked `pause()` call with comment about unreachable
  code path
- alphabetize imports: mv `current_actor` and `open_nursery`
  below typed imports

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 15:24:42 -05:00
Gud Boi 0cddc67bdb Add doc-strs to `get_root()` + `maybe_open_portal()`
Brief descriptions for both fns in `._discovery` clarifying
what each delivers and under what conditions.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 052fe2435f Improve `Channel` doc-strs + minor cleanups
Flesh out missing method doc-strings, improve log msg formatting and
assert -> `RuntimeError` for un-inited tpt layer.

Deats,
- add doc-string to `.send()` noting `TransportClosed` raise
  on comms failures.
- add doc-string to `.recv()`.
- expand `._aiter_msgs()` doc-string, line-len reflow.
- add doc-string to `.connected()`.
- convert `assert self._transport` -> `RuntimeError` raise
  in `._aiter_msgs()` for more explicit crashing.
- expand `_connect_chan()` doc-string, note it's lowlevel
  and suggest `.open_portal()` to user instead.
- factor out `src_exc_str` in `TransportClosed` log handler
  to avoid double-call
- use multiline style for `.connected()` return expr.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 28819bf5d3 Add `Actor.is_root()` convenience predicate meth 2026-02-19 13:55:02 -05:00
Gud Boi 07c2ba5c0d Drop `trio`-exc-catching if tpt-closed covers them
Remove the `trio.ClosedResourceError` and `trio.BrokenResourceError`
handling that should now be subsumed by `TransportClosed` re-raising out
of the `.ipc` stack.

Deats,
- drop CRE and BRE from `._streaming.MsgStream.aclose()/.send()` blocks.
- similarly rm from `._context.open_context_from_portal()`.
- also from `._portal.Portal.cancel_actor()` and drop the
  (now-completed-todo) comment about this exact thing.

Also add comment in `._rpc.try_ship_error_to_remote()` noting the
remaining `trio` catches there are bc the `.ipc` layers *should* be
wrapping them; thus `log.critical()` use is warranted.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 50f40f427b Include `TransportClosed` in tpt-layer err handling
Add `TransportClosed` to except clauses where `trio`'s own
resource-closed errors are already caught, ensuring our
higher-level tpt exc is also tolerated in those same spots.
Likely i will follow up with a removal of the `trio` variants since most
*should be* caught and re-raised as tpt-closed out of the `.ipc` stack
now?

Add `TransportClosed` to various handler blocks,
- `._streaming.MsgStream.aclose()/.send()` except blocks.
- the broken-channel except in `._context.open_context_from_portal()`.
- obvi import it where necessary in those ^ mods.

Adjust `test_advanced_faults` suite + exs-script to match,
- update `ipc_failure_during_stream.py` example to catch
  `TransportClosed` alongside `trio.ClosedResourceError`
  in both the break and send-check paths.
- shield the `trio.sleep(0.01)` after tpt close in example to avoid
  taskc-raise/masking on that checkpoint since we want to simulate
  waiting for a user to send a KBI.
- loosen `ExceptionGroup` assertion to `len(excs) <= 2` and ensure all
  excs are `TransportClosed`.
- improve multi-line formatting, minor style/formatting fixes in
  condition expressions.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi bf6de55865 Improve tpt-closed msg-fmt/content and CRE case matching
Refine tpt-error reporting to include closure attribution (`'locally'`
vs `'by peer'`), tighten match conditions and reduce needless newlines
in exc reprs.

Deats,
- factor out `trans_err_msg: str` and `by_whom: str` into a `dict`
  lookup before the `match:` block to pair specific err msgs to closure
  attribution strings.
- use `by_whom` directly as `CRE` case guard condition
  (truthy when msg matches known underlying CRE msg content).
- conveniently include `by_whom!r` in `TransportClosed` message.
- fix `'locally ?'` -> `'locally?'` in send-side `CRE`
  handler (drop errant space).
- add masked `maybe_pause_bp()` calls at both `CRE` sites (from when
  i was tracing a test harness issue where the UDS socket path wasn't
  being cleaned up on teardown).
- drop trailing `\n` from `body=` args to `TransportClosed`.
- reuse `trans_err_msg` for the `BRE`/broken-pipe guard.

Also adjust testing, namely `test_ctxep_pauses_n_maybe_ipc_breaks`'s
expected patts-set for new msg formats to be raised out of
`.ipc._transport`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 5ded99a886 Add a `._trace.maybe_pause_bp()` for tpt-broken cases
Internal helper which falls back to sync `pdb` when the
child actor can't reach root to acquire the TTY lock.

Useful when debugging tpt layer failures (intentional or
otherwise) where a sub-actor can no longer IPC-contact the
root to coordinate REPL access; root uses `.pause()` as
normal while non-root falls back to `mk_pdb().set_trace()`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
Gud Boi 7145fa364f Add `SIGINT` cleanup to `spawn` fixture in `devx/conftest`
Convert `spawn` fixture to a generator and add post-test graceful
subproc cleanup via `SIGINT`/`SIGKILL` to avoid leaving stale `pexpect`
child procs around between test runs as well as any UDS-tpt socket files
under the system runtime-dir.

Deats,
- convert `return _spawn` -> `yield _spawn` to enable
  post-yield teardown logic.
- add a new `nonlocal spawned` ref so teardown logic can access the last
  spawned child from outside the delivered spawner fn-closure.
- add `SIGINT`-loop after yield with 5s timeout, then
  `SIGKILL` if proc still alive.
- add masked `breakpoint()` and TODO about UDS path cleanup

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-19 13:55:02 -05:00
14 changed files with 197 additions and 97 deletions

View File

@ -17,6 +17,7 @@ from tractor import (
MsgStream, MsgStream,
_testing, _testing,
trionics, trionics,
TransportClosed,
) )
import trio import trio
import pytest import pytest
@ -208,11 +209,15 @@ async def main(
# TODO: is this needed or no? # TODO: is this needed or no?
raise raise
except trio.ClosedResourceError: except (
trio.ClosedResourceError,
TransportClosed,
) as _tpt_err:
# NOTE: don't send if we already broke the # NOTE: don't send if we already broke the
# connection to avoid raising a closed-error # connection to avoid raising a closed-error
# such that we drop through to the ctl-c # such that we drop through to the ctl-c
# mashing by user. # mashing by user.
with trio.CancelScope(shield=True):
await trio.sleep(0.01) await trio.sleep(0.01)
# timeout: int = 1 # timeout: int = 1
@ -247,6 +252,7 @@ async def main(
await stream.send(i) await stream.send(i)
pytest.fail('stream not closed?') pytest.fail('stream not closed?')
except ( except (
TransportClosed,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.EndOfChannel, trio.EndOfChannel,
) as send_err: ) as send_err:

View File

@ -4,6 +4,7 @@
''' '''
from __future__ import annotations from __future__ import annotations
import time import time
import signal
from typing import ( from typing import (
Callable, Callable,
TYPE_CHECKING, TYPE_CHECKING,
@ -69,12 +70,15 @@ def spawn(
import os import os
os.environ['PYTHON_COLORS'] = '0' os.environ['PYTHON_COLORS'] = '0'
spawned: PexpectSpawner|None = None
def _spawn( def _spawn(
cmd: str, cmd: str,
**mkcmd_kwargs, **mkcmd_kwargs,
) -> pty_spawn.spawn: ) -> pty_spawn.spawn:
nonlocal spawned
unset_colors() unset_colors()
return testdir.spawn( spawned = testdir.spawn(
cmd=mk_cmd( cmd=mk_cmd(
cmd, cmd,
**mkcmd_kwargs, **mkcmd_kwargs,
@ -84,9 +88,35 @@ def spawn(
# ^TODO? get `pytest` core to expose underlying # ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff? # `pexpect.spawn()` stuff?
) )
return spawned
# such that test-dep can pass input script name. # such that test-dep can pass input script name.
return _spawn # the `PexpectSpawner`, type alias. yield _spawn # the `PexpectSpawner`, type alias.
if (
spawned
and
(ptyproc := spawned.ptyproc)
):
start: float = time.time()
timeout: float = 5
while (
ptyproc.isalive()
and
(
(_time_took := (time.time() - start))
<
timeout
)
):
ptyproc.kill(signal.SIGINT)
time.sleep(0.01)
if ptyproc.isalive():
ptyproc.kill(signal.SIGKILL)
# TODO? ensure we've cleaned up any UDS-paths?
# breakpoint()
@pytest.fixture( @pytest.fixture(

View File

@ -1138,7 +1138,10 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
['peer IPC channel closed abruptly?', ['peer IPC channel closed abruptly?',
'another task closed this fd', 'another task closed this fd',
'Debug lock request was CANCELLED?', 'Debug lock request was CANCELLED?',
"TransportClosed: 'MsgpackUDSStream' was already closed locally ?",] "'MsgpackUDSStream' was already closed locally?",
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
# ?TODO^? match depending on `tpt_proto(s)`?
]
# XXX races on whether these show/hit? # XXX races on whether these show/hit?
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!', # 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',

View File

@ -98,7 +98,8 @@ def test_ipc_channel_break_during_stream(
expect_final_exc = TransportClosed expect_final_exc = TransportClosed
mod: ModuleType = import_path( mod: ModuleType = import_path(
examples_dir() / 'advanced_faults' examples_dir()
/ 'advanced_faults'
/ 'ipc_failure_during_stream.py', / 'ipc_failure_during_stream.py',
root=examples_dir(), root=examples_dir(),
consider_namespace_packages=False, consider_namespace_packages=False,
@ -113,8 +114,9 @@ def test_ipc_channel_break_during_stream(
if ( if (
# only expect EoC if trans is broken on the child side, # only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False ipc_break['break_child_ipc_after'] is not False
and
# AND we tell the child to call `MsgStream.aclose()`. # AND we tell the child to call `MsgStream.aclose()`.
and pre_aclose_msgstream pre_aclose_msgstream
): ):
# expect_final_exc = trio.EndOfChannel # expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this # ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
@ -160,7 +162,8 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_child_ipc_after'] is not False ipc_break['break_child_ipc_after'] is not False
and ( and (
ipc_break['break_parent_ipc_after'] ipc_break['break_parent_ipc_after']
> ipc_break['break_child_ipc_after'] >
ipc_break['break_child_ipc_after']
) )
): ):
if pre_aclose_msgstream: if pre_aclose_msgstream:
@ -248,8 +251,15 @@ def test_ipc_channel_break_during_stream(
# get raw instance from pytest wrapper # get raw instance from pytest wrapper
value = excinfo.value value = excinfo.value
if isinstance(value, ExceptionGroup): if isinstance(value, ExceptionGroup):
excs = value.exceptions excs: tuple[Exception] = value.exceptions
assert len(excs) == 1 assert (
len(excs) <= 2
and
all(
isinstance(exc, TransportClosed)
for exc in excs
)
)
final_exc = excs[0] final_exc = excs[0]
assert isinstance(final_exc, expect_final_exc) assert isinstance(final_exc, expect_final_exc)

View File

@ -11,12 +11,13 @@ import trio
import tractor import tractor
from tractor import ( # typing from tractor import ( # typing
Actor, Actor,
current_actor,
open_nursery,
Portal,
Context, Context,
ContextCancelled, ContextCancelled,
MsgStream,
Portal,
RemoteActorError, RemoteActorError,
current_actor,
open_nursery,
) )
from tractor._testing import ( from tractor._testing import (
# tractor_test, # tractor_test,
@ -796,8 +797,8 @@ async def basic_echo_server(
) -> None: ) -> None:
''' '''
Just the simplest `MsgStream` echo server which resays what Just the simplest `MsgStream` echo server which resays what you
you told it but with its uid in front ;) told it but with its uid in front ;)
''' '''
actor: Actor = tractor.current_actor() actor: Actor = tractor.current_actor()
@ -966,9 +967,14 @@ async def tell_little_bro(
caller: str = '', caller: str = '',
err_after: float|None = None, err_after: float|None = None,
rng_seed: int = 50, rng_seed: int = 100,
# NOTE, ensure ^ is large enough (on fast hw anyway)
# to ensure the peer cancel req arrives before the
# echoing dialog does itself Bp
): ):
# contact target actor, do a stream dialog. # contact target actor, do a stream dialog.
lb: Portal
echo_ipc: MsgStream
async with ( async with (
tractor.wait_for_actor( tractor.wait_for_actor(
name=actor_name name=actor_name
@ -983,7 +989,6 @@ async def tell_little_bro(
else None else None
), ),
) as (sub_ctx, first), ) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc, sub_ctx.open_stream() as echo_ipc,
): ):
actor: Actor = current_actor() actor: Actor = current_actor()
@ -994,6 +999,7 @@ async def tell_little_bro(
i, i,
) )
await echo_ipc.send(msg) await echo_ipc.send(msg)
await trio.sleep(0.001)
resp = await echo_ipc.receive() resp = await echo_ipc.receive()
print( print(
f'{caller} => {actor_name}: {msg}\n' f'{caller} => {actor_name}: {msg}\n'
@ -1006,6 +1012,9 @@ async def tell_little_bro(
assert sub_uid != uid assert sub_uid != uid
assert _i == i assert _i == i
# XXX, usually should never get here!
# await tractor.pause()
@pytest.mark.parametrize( @pytest.mark.parametrize(
'raise_client_error', 'raise_client_error',

View File

@ -70,6 +70,7 @@ from ._exceptions import (
MsgTypeError, MsgTypeError,
RemoteActorError, RemoteActorError,
StreamOverrun, StreamOverrun,
TransportClosed,
pack_from_raise, pack_from_raise,
unpack_error, unpack_error,
) )
@ -2428,10 +2429,7 @@ async def open_context_from_portal(
try: try:
# await pause(shield=True) # await pause(shield=True)
await ctx.cancel() await ctx.cancel()
except ( except TransportClosed:
trio.BrokenResourceError,
trio.ClosedResourceError,
):
log.warning( log.warning(
'IPC connection for context is broken?\n' 'IPC connection for context is broken?\n'
f'task: {ctx.cid}\n' f'task: {ctx.cid}\n'

View File

@ -91,10 +91,13 @@ async def get_registry(
@acm @acm
async def get_root( async def get_root(**kwargs) -> AsyncGenerator[Portal, None]:
**kwargs, '''
) -> AsyncGenerator[Portal, None]: Deliver the current actor's "root process" actor (yes in actor
and proc tree terms) by delivering a `Portal` from the spawn-time
provided contact address.
'''
# TODO: rename mailbox to `_root_maddr` when we finally # TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs? # add and impl libp2p multi-addrs?
addr = _runtime_vars['_root_mailbox'] addr = _runtime_vars['_root_mailbox']
@ -193,6 +196,11 @@ async def maybe_open_portal(
addr: UnwrappedAddress, addr: UnwrappedAddress,
name: str, name: str,
): ):
'''
Open a `Portal` to the actor serving @ `addr` or `None` if no
peer can be contacted or found.
'''
async with query_actor( async with query_actor(
name=name, name=name,
regaddr=addr, regaddr=addr,

View File

@ -329,18 +329,7 @@ class Portal:
# if we get here some weird cancellation case happened # if we get here some weird cancellation case happened
return False return False
except ( except TransportClosed as tpt_err:
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError,
trio.BrokenResourceError,
TransportClosed,
) as tpt_err:
ipc_borked_report: str = ( ipc_borked_report: str = (
f'IPC for actor already closed/broken?\n\n' f'IPC for actor already closed/broken?\n\n'
f'\n' f'\n'

View File

@ -284,9 +284,14 @@ async def _errors_relayed_via_ipc(
try: try:
yield # run RPC invoke body yield # run RPC invoke body
except TransportClosed: # NOTE, never REPL any pseudo-expected tpt-disconnect.
log.exception('Tpt disconnect during remote-exc relay?') except TransportClosed as err:
raise rpc_err = err
log.warning(
f'Tpt disconnect during remote-exc relay due to,\n'
f'{err!r}\n'
)
raise err
# box and ship RPC errors for wire-transit via # box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel. # the task's requesting parent IPC-channel.
@ -323,9 +328,6 @@ async def _errors_relayed_via_ipc(
and debug_kbis and debug_kbis
) )
) )
# TODO? better then `debug_filter` below?
and
not isinstance(err, TransportClosed)
): ):
# XXX QUESTION XXX: is there any case where we'll # XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default? # want to debug IPC disconnects as a default?
@ -346,13 +348,6 @@ async def _errors_relayed_via_ipc(
entered_debug = await debug._maybe_enter_pm( entered_debug = await debug._maybe_enter_pm(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
# don't REPL any psuedo-expected tpt-disconnect
# debug_filter=lambda exc: (
# type (exc) not in {
# TransportClosed,
# }
# ),
) )
if not entered_debug: if not entered_debug:
# if we prolly should have entered the REPL but # if we prolly should have entered the REPL but
@ -438,7 +433,7 @@ async def _errors_relayed_via_ipc(
# cancel scope will not have been inserted yet # cancel scope will not have been inserted yet
if is_rpc: if is_rpc:
log.warning( log.warning(
'RPC task likely errored or cancelled before start?\n' 'RPC task likely crashed or cancelled before start?\n'
f'|_{ctx._task}\n' f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n' f' >> {ctx.repr_rpc}\n'
) )
@ -694,22 +689,6 @@ async def _invoke(
f'{pretty_struct.pformat(return_msg)}\n' f'{pretty_struct.pformat(return_msg)}\n'
) )
await chan.send(return_msg) await chan.send(return_msg)
# ?TODO, remove the below since .send() already
# doesn't raise on tpt-closed?
# try:
# await chan.send(return_msg)
# except TransportClosed:
# log.exception(
# f"Failed send final result to 'parent'-side of IPC-ctx!\n"
# f'\n'
# f'{chan}\n'
# f'Channel already disconnected ??\n'
# f'\n'
# f'{pretty_struct.pformat(return_msg)}'
# )
# # ?TODO? will this ever be true though?
# if chan.connected():
# raise
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of, # called by any of,
@ -935,6 +914,11 @@ async def try_ship_error_to_remote(
# XXX NOTE XXX in SC terms this is one of the worst things # XXX NOTE XXX in SC terms this is one of the worst things
# that can happen and provides for a 2-general's dilemma.. # that can happen and provides for a 2-general's dilemma..
#
# FURHTER, we should never really have to handle these
# lowlevel excs from `trio` since the `Channel.send()` layers
# downward should be mostly wrapping such cases in a
# tpt-closed; the `.critical()` usage is warranted.
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,

View File

@ -183,6 +183,14 @@ class Actor:
def is_registrar(self) -> bool: def is_registrar(self) -> bool:
return self.is_arbiter return self.is_arbiter
@property
def is_root(self) -> bool:
'''
This actor is the parent most in the tree?
'''
return _state.is_root_process()
msg_buffer_size: int = 2**6 msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`, # nursery placeholders filled in by `async_main()`,

View File

@ -38,6 +38,7 @@ import trio
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
RemoteActorError, RemoteActorError,
TransportClosed,
) )
from .log import get_logger from .log import get_logger
from .trionics import ( from .trionics import (
@ -409,10 +410,8 @@ class MsgStream(trio.abc.Channel):
# it). # it).
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await self._ctx.send_stop() await self._ctx.send_stop()
except ( except (
trio.BrokenResourceError, TransportClosed,
trio.ClosedResourceError
) as re: ) as re:
# the underlying channel may already have been pulled # the underlying channel may already have been pulled
# in which case our stop message is meaningless since # in which case our stop message is meaningless since
@ -593,9 +592,8 @@ class MsgStream(trio.abc.Channel):
), ),
) )
except ( except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError, BrokenPipeError,
TransportClosed,
) as _trans_err: ) as _trans_err:
trans_err = _trans_err trans_err = _trans_err
if ( if (

View File

@ -1257,3 +1257,26 @@ async def breakpoint(
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
**kwargs, **kwargs,
) )
async def maybe_pause_bp():
'''
Internal (ONLY for now) `breakpoint()`-er fn which only tries to
use the multi-actor `.pause()` API when the current actor is the
root.
?! BUT WHY !?
-------
This is useful when debugging cases where the tpt layer breaks
(or is intentionally broken, say during resiliency testing) in
the case where a child can no longer contact the root process to
acquire the process-tree-singleton TTY lock.
'''
import tractor
actor = tractor.current_actor()
if actor.aid.name == 'root':
await tractor.pause(shield=True)
else:
tractor.devx.mk_pdb().set_trace()

View File

@ -307,7 +307,12 @@ class Channel:
) -> None: ) -> None:
''' '''
Send a coded msg-blob over the transport. Send a coded msg-blob over the underlying IPC transport.
This fn raises `TransportClosed` on comms failures and is
normally handled by higher level runtime machinery for the
expected-graceful cases, normally ephemercal
(re/dis)connects.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -334,9 +339,10 @@ class Channel:
except KeyError: except KeyError:
raise err raise err
case TransportClosed(): case TransportClosed():
src_exc_str: str = err.repr_src_exc()
log.transport( log.transport(
f'Transport stream closed due to\n' f'Transport stream closed due to,\n'
f'{err.repr_src_exc()}\n' f'{src_exc_str}'
) )
case _: case _:
@ -345,6 +351,11 @@ class Channel:
raise raise
async def recv(self) -> Any: async def recv(self) -> Any:
'''
Receive the latest (queued) msg-blob from the underlying IPC
transport.
'''
assert self._transport assert self._transport
return await self._transport.recv() return await self._transport.recv()
@ -418,16 +429,18 @@ class Channel:
self self
) -> AsyncGenerator[Any, None]: ) -> AsyncGenerator[Any, None]:
''' '''
Yield `MsgType` IPC msgs decoded and deliverd from Yield `MsgType` IPC msgs decoded and deliverd from an
an underlying `MsgTransport` protocol. underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an async-gen This is a streaming routine alo implemented as an
func (same a `MsgTransport._iter_pkts()`) gets allocated by async-generator func (same a `MsgTransport._iter_pkts()`)
a `.__call__()` inside `.__init__()` where it is assigned to gets allocated by a `.__call__()` inside `.__init__()` where
the `._aiter_msgs` attr. it is assigned to the `._aiter_msgs` attr.
''' '''
assert self._transport if not self._transport:
raise RuntimeError('No IPC transport initialized!?')
while True: while True:
try: try:
async for msg in self._transport: async for msg in self._transport:
@ -462,7 +475,15 @@ class Channel:
# continue # continue
def connected(self) -> bool: def connected(self) -> bool:
return self._transport.connected() if self._transport else False '''
Predicate whether underlying IPC tpt is connected.
'''
return (
self._transport.connected()
if self._transport
else False
)
async def _do_handshake( async def _do_handshake(
self, self,
@ -493,8 +514,11 @@ async def _connect_chan(
addr: UnwrappedAddress addr: UnwrappedAddress
) -> typing.AsyncGenerator[Channel, None]: ) -> typing.AsyncGenerator[Channel, None]:
''' '''
Create and connect a channel with disconnect on context manager Create and connect a `Channel` to the provided `addr`, disconnect
teardown. it on cm exit.
NOTE, this is a lowlevel, normally internal-only iface. You
should likely use `.open_portal()` instead.
''' '''
chan = await Channel.from_addr(addr) chan = await Channel.from_addr(addr)

View File

@ -154,7 +154,6 @@ class MsgTransport(Protocol):
# ... # ...
class MsgpackTransport(MsgTransport): class MsgpackTransport(MsgTransport):
# TODO: better naming for this? # TODO: better naming for this?
@ -278,14 +277,18 @@ class MsgpackTransport(MsgTransport):
except trio.ClosedResourceError as cre: except trio.ClosedResourceError as cre:
closure_err = cre closure_err = cre
# await tractor.devx._trace.maybe_pause_bp()
raise TransportClosed( raise TransportClosed(
message=( message=(
f'{tpt_name} was already closed locally ?\n' f'{tpt_name} was already closed locally?'
), ),
src_exc=closure_err, src_exc=closure_err,
loglevel='error', loglevel='error',
raise_on_report=( raise_on_report=(
'another task closed this fd' in closure_err.args 'another task closed this fd'
in
closure_err.args
), ),
) from closure_err ) from closure_err
@ -435,6 +438,11 @@ class MsgpackTransport(MsgTransport):
trans_err = _re trans_err = _re
tpt_name: str = f'{type(self).__name__!r}' tpt_name: str = f'{type(self).__name__!r}'
trans_err_msg: str = trans_err.args[0]
by_whom: str = {
'another task closed this fd': 'locally',
'this socket was already closed': 'by peer',
}.get(trans_err_msg)
match trans_err: match trans_err:
# XXX, specifc to UDS transport and its, # XXX, specifc to UDS transport and its,
@ -446,13 +454,13 @@ class MsgpackTransport(MsgTransport):
case trio.BrokenResourceError() if ( case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' '[Errno 32] Broken pipe'
in in
trans_err.args[0] trans_err_msg
): ):
tpt_closed = TransportClosed.from_src_exc( tpt_closed = TransportClosed.from_src_exc(
message=( message=(
f'{tpt_name} already closed by peer\n' f'{tpt_name} already closed by peer\n'
), ),
body=f'{self}\n', body=f'{self}',
src_exc=trans_err, src_exc=trans_err,
raise_on_report=True, raise_on_report=True,
loglevel='transport', loglevel='transport',
@ -462,24 +470,26 @@ class MsgpackTransport(MsgTransport):
# ??TODO??, what case in piker does this and HOW # ??TODO??, what case in piker does this and HOW
# CAN WE RE-PRODUCE IT?!?!? # CAN WE RE-PRODUCE IT?!?!?
case trio.ClosedResourceError() if ( case trio.ClosedResourceError() if (
'this socket was already closed' by_whom
in
trans_err.args[0]
): ):
tpt_closed = TransportClosed.from_src_exc( tpt_closed = TransportClosed.from_src_exc(
message=( message=(
f'{tpt_name} already closed by peer\n' f'{tpt_name} was already closed {by_whom!r}?\n'
), ),
body=f'{self}\n', body=f'{self}',
src_exc=trans_err, src_exc=trans_err,
raise_on_report=True, raise_on_report=True,
loglevel='transport', loglevel='transport',
) )
# await tractor.devx._trace.maybe_pause_bp()
raise tpt_closed from trans_err raise tpt_closed from trans_err
# unless the disconnect condition falls under "a # XXX, unless the disconnect condition falls
# normal operation breakage" we usualy console warn # under "a normal/expected operating breakage"
# about it. # (per the `trans_err_msg` guards in the cases
# above) we usualy console-error about it and
# raise-thru. about it.
case _: case _:
log.exception( log.exception(
f'{tpt_name} layer failed pre-send ??\n' f'{tpt_name} layer failed pre-send ??\n'