Compare commits

..

No commits in common. "d0b92bbebaa44aafbbcb410076d78673c28b244c" and "f8e25688c70f482c9a549b4231c83bb515b91f8f" have entirely different histories.

14 changed files with 98 additions and 198 deletions

View File

@ -17,7 +17,6 @@ from tractor import (
MsgStream,
_testing,
trionics,
TransportClosed,
)
import trio
import pytest
@ -209,16 +208,12 @@ async def main(
# TODO: is this needed or no?
raise
except (
trio.ClosedResourceError,
TransportClosed,
) as _tpt_err:
except trio.ClosedResourceError:
# NOTE: don't send if we already broke the
# connection to avoid raising a closed-error
# such that we drop through to the ctl-c
# mashing by user.
with trio.CancelScope(shield=True):
await trio.sleep(0.01)
await trio.sleep(0.01)
# timeout: int = 1
# with trio.move_on_after(timeout) as cs:
@ -252,7 +247,6 @@ async def main(
await stream.send(i)
pytest.fail('stream not closed?')
except (
TransportClosed,
trio.ClosedResourceError,
trio.EndOfChannel,
) as send_err:

View File

@ -4,7 +4,6 @@
'''
from __future__ import annotations
import time
import signal
from typing import (
Callable,
TYPE_CHECKING,
@ -70,15 +69,12 @@ def spawn(
import os
os.environ['PYTHON_COLORS'] = '0'
spawned: PexpectSpawner|None = None
def _spawn(
cmd: str,
**mkcmd_kwargs,
) -> pty_spawn.spawn:
nonlocal spawned
unset_colors()
spawned = testdir.spawn(
return testdir.spawn(
cmd=mk_cmd(
cmd,
**mkcmd_kwargs,
@ -88,35 +84,9 @@ def spawn(
# ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff?
)
return spawned
# such that test-dep can pass input script name.
yield _spawn # the `PexpectSpawner`, type alias.
if (
spawned
and
(ptyproc := spawned.ptyproc)
):
start: float = time.time()
timeout: float = 5
while (
ptyproc.isalive()
and
(
(_time_took := (time.time() - start))
<
timeout
)
):
ptyproc.kill(signal.SIGINT)
time.sleep(0.01)
if ptyproc.isalive():
ptyproc.kill(signal.SIGKILL)
# TODO? ensure we've cleaned up any UDS-paths?
# breakpoint()
return _spawn # the `PexpectSpawner`, type alias.
@pytest.fixture(

View File

@ -1138,10 +1138,7 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
['peer IPC channel closed abruptly?',
'another task closed this fd',
'Debug lock request was CANCELLED?',
"'MsgpackUDSStream' was already closed locally?",
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
# ?TODO^? match depending on `tpt_proto(s)`?
]
"TransportClosed: 'MsgpackUDSStream' was already closed locally ?",]
# XXX races on whether these show/hit?
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',

View File

@ -98,8 +98,7 @@ def test_ipc_channel_break_during_stream(
expect_final_exc = TransportClosed
mod: ModuleType = import_path(
examples_dir()
/ 'advanced_faults'
examples_dir() / 'advanced_faults'
/ 'ipc_failure_during_stream.py',
root=examples_dir(),
consider_namespace_packages=False,
@ -114,9 +113,8 @@ def test_ipc_channel_break_during_stream(
if (
# only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False
and
# AND we tell the child to call `MsgStream.aclose()`.
pre_aclose_msgstream
and pre_aclose_msgstream
):
# expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
@ -162,8 +160,7 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_child_ipc_after'] is not False
and (
ipc_break['break_parent_ipc_after']
>
ipc_break['break_child_ipc_after']
> ipc_break['break_child_ipc_after']
)
):
if pre_aclose_msgstream:
@ -251,15 +248,8 @@ def test_ipc_channel_break_during_stream(
# get raw instance from pytest wrapper
value = excinfo.value
if isinstance(value, ExceptionGroup):
excs: tuple[Exception] = value.exceptions
assert (
len(excs) <= 2
and
all(
isinstance(exc, TransportClosed)
for exc in excs
)
)
excs = value.exceptions
assert len(excs) == 1
final_exc = excs[0]
assert isinstance(final_exc, expect_final_exc)

View File

@ -11,13 +11,12 @@ import trio
import tractor
from tractor import ( # typing
Actor,
Context,
ContextCancelled,
MsgStream,
Portal,
RemoteActorError,
current_actor,
open_nursery,
Portal,
Context,
ContextCancelled,
RemoteActorError,
)
from tractor._testing import (
# tractor_test,
@ -797,8 +796,8 @@ async def basic_echo_server(
) -> None:
'''
Just the simplest `MsgStream` echo server which resays what you
told it but with its uid in front ;)
Just the simplest `MsgStream` echo server which resays what
you told it but with its uid in front ;)
'''
actor: Actor = tractor.current_actor()
@ -967,14 +966,9 @@ async def tell_little_bro(
caller: str = '',
err_after: float|None = None,
rng_seed: int = 100,
# NOTE, ensure ^ is large enough (on fast hw anyway)
# to ensure the peer cancel req arrives before the
# echoing dialog does itself Bp
rng_seed: int = 50,
):
# contact target actor, do a stream dialog.
lb: Portal
echo_ipc: MsgStream
async with (
tractor.wait_for_actor(
name=actor_name
@ -989,6 +983,7 @@ async def tell_little_bro(
else None
),
) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc,
):
actor: Actor = current_actor()
@ -999,7 +994,6 @@ async def tell_little_bro(
i,
)
await echo_ipc.send(msg)
await trio.sleep(0.001)
resp = await echo_ipc.receive()
print(
f'{caller} => {actor_name}: {msg}\n'
@ -1012,9 +1006,6 @@ async def tell_little_bro(
assert sub_uid != uid
assert _i == i
# XXX, usually should never get here!
# await tractor.pause()
@pytest.mark.parametrize(
'raise_client_error',

View File

@ -70,7 +70,6 @@ from ._exceptions import (
MsgTypeError,
RemoteActorError,
StreamOverrun,
TransportClosed,
pack_from_raise,
unpack_error,
)
@ -2429,7 +2428,10 @@ async def open_context_from_portal(
try:
# await pause(shield=True)
await ctx.cancel()
except TransportClosed:
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
):
log.warning(
'IPC connection for context is broken?\n'
f'task: {ctx.cid}\n'

View File

@ -91,13 +91,10 @@ async def get_registry(
@acm
async def get_root(**kwargs) -> AsyncGenerator[Portal, None]:
'''
Deliver the current actor's "root process" actor (yes in actor
and proc tree terms) by delivering a `Portal` from the spawn-time
provided contact address.
async def get_root(
**kwargs,
) -> AsyncGenerator[Portal, None]:
'''
# TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs?
addr = _runtime_vars['_root_mailbox']
@ -196,11 +193,6 @@ async def maybe_open_portal(
addr: UnwrappedAddress,
name: str,
):
'''
Open a `Portal` to the actor serving @ `addr` or `None` if no
peer can be contacted or found.
'''
async with query_actor(
name=name,
regaddr=addr,

View File

@ -329,7 +329,18 @@ class Portal:
# if we get here some weird cancellation case happened
return False
except TransportClosed as tpt_err:
except (
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError,
trio.BrokenResourceError,
TransportClosed,
) as tpt_err:
ipc_borked_report: str = (
f'IPC for actor already closed/broken?\n\n'
f'\n'

View File

@ -284,14 +284,9 @@ async def _errors_relayed_via_ipc(
try:
yield # run RPC invoke body
# NOTE, never REPL any pseudo-expected tpt-disconnect.
except TransportClosed as err:
rpc_err = err
log.warning(
f'Tpt disconnect during remote-exc relay due to,\n'
f'{err!r}\n'
)
raise err
except TransportClosed:
log.exception('Tpt disconnect during remote-exc relay?')
raise
# box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel.
@ -328,6 +323,9 @@ async def _errors_relayed_via_ipc(
and debug_kbis
)
)
# TODO? better then `debug_filter` below?
and
not isinstance(err, TransportClosed)
):
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
@ -348,6 +346,13 @@ async def _errors_relayed_via_ipc(
entered_debug = await debug._maybe_enter_pm(
err,
api_frame=inspect.currentframe(),
# don't REPL any psuedo-expected tpt-disconnect
# debug_filter=lambda exc: (
# type (exc) not in {
# TransportClosed,
# }
# ),
)
if not entered_debug:
# if we prolly should have entered the REPL but
@ -433,7 +438,7 @@ async def _errors_relayed_via_ipc(
# cancel scope will not have been inserted yet
if is_rpc:
log.warning(
'RPC task likely crashed or cancelled before start?\n'
'RPC task likely errored or cancelled before start?\n'
f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n'
)
@ -689,6 +694,22 @@ async def _invoke(
f'{pretty_struct.pformat(return_msg)}\n'
)
await chan.send(return_msg)
# ?TODO, remove the below since .send() already
# doesn't raise on tpt-closed?
# try:
# await chan.send(return_msg)
# except TransportClosed:
# log.exception(
# f"Failed send final result to 'parent'-side of IPC-ctx!\n"
# f'\n'
# f'{chan}\n'
# f'Channel already disconnected ??\n'
# f'\n'
# f'{pretty_struct.pformat(return_msg)}'
# )
# # ?TODO? will this ever be true though?
# if chan.connected():
# raise
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,
@ -914,11 +935,6 @@ async def try_ship_error_to_remote(
# XXX NOTE XXX in SC terms this is one of the worst things
# that can happen and provides for a 2-general's dilemma..
#
# FURHTER, we should never really have to handle these
# lowlevel excs from `trio` since the `Channel.send()` layers
# downward should be mostly wrapping such cases in a
# tpt-closed; the `.critical()` usage is warranted.
except (
trio.ClosedResourceError,
trio.BrokenResourceError,

View File

@ -183,14 +183,6 @@ class Actor:
def is_registrar(self) -> bool:
return self.is_arbiter
@property
def is_root(self) -> bool:
'''
This actor is the parent most in the tree?
'''
return _state.is_root_process()
msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`,

View File

@ -38,7 +38,6 @@ import trio
from ._exceptions import (
ContextCancelled,
RemoteActorError,
TransportClosed,
)
from .log import get_logger
from .trionics import (
@ -410,8 +409,10 @@ class MsgStream(trio.abc.Channel):
# it).
with trio.CancelScope(shield=True):
await self._ctx.send_stop()
except (
TransportClosed,
trio.BrokenResourceError,
trio.ClosedResourceError
) as re:
# the underlying channel may already have been pulled
# in which case our stop message is meaningless since
@ -592,8 +593,9 @@ class MsgStream(trio.abc.Channel):
),
)
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError,
TransportClosed,
) as _trans_err:
trans_err = _trans_err
if (

View File

@ -1257,26 +1257,3 @@ async def breakpoint(
api_frame=inspect.currentframe(),
**kwargs,
)
async def maybe_pause_bp():
'''
Internal (ONLY for now) `breakpoint()`-er fn which only tries to
use the multi-actor `.pause()` API when the current actor is the
root.
?! BUT WHY !?
-------
This is useful when debugging cases where the tpt layer breaks
(or is intentionally broken, say during resiliency testing) in
the case where a child can no longer contact the root process to
acquire the process-tree-singleton TTY lock.
'''
import tractor
actor = tractor.current_actor()
if actor.aid.name == 'root':
await tractor.pause(shield=True)
else:
tractor.devx.mk_pdb().set_trace()

View File

@ -307,12 +307,7 @@ class Channel:
) -> None:
'''
Send a coded msg-blob over the underlying IPC transport.
This fn raises `TransportClosed` on comms failures and is
normally handled by higher level runtime machinery for the
expected-graceful cases, normally ephemercal
(re/dis)connects.
Send a coded msg-blob over the transport.
'''
__tracebackhide__: bool = hide_tb
@ -339,10 +334,9 @@ class Channel:
except KeyError:
raise err
case TransportClosed():
src_exc_str: str = err.repr_src_exc()
log.transport(
f'Transport stream closed due to,\n'
f'{src_exc_str}'
f'Transport stream closed due to\n'
f'{err.repr_src_exc()}\n'
)
case _:
@ -351,11 +345,6 @@ class Channel:
raise
async def recv(self) -> Any:
'''
Receive the latest (queued) msg-blob from the underlying IPC
transport.
'''
assert self._transport
return await self._transport.recv()
@ -429,18 +418,16 @@ class Channel:
self
) -> AsyncGenerator[Any, None]:
'''
Yield `MsgType` IPC msgs decoded and deliverd from an
underlying `MsgTransport` protocol.
Yield `MsgType` IPC msgs decoded and deliverd from
an underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an
async-generator func (same a `MsgTransport._iter_pkts()`)
gets allocated by a `.__call__()` inside `.__init__()` where
it is assigned to the `._aiter_msgs` attr.
This is a streaming routine alo implemented as an async-gen
func (same a `MsgTransport._iter_pkts()`) gets allocated by
a `.__call__()` inside `.__init__()` where it is assigned to
the `._aiter_msgs` attr.
'''
if not self._transport:
raise RuntimeError('No IPC transport initialized!?')
assert self._transport
while True:
try:
async for msg in self._transport:
@ -475,15 +462,7 @@ class Channel:
# continue
def connected(self) -> bool:
'''
Predicate whether underlying IPC tpt is connected.
'''
return (
self._transport.connected()
if self._transport
else False
)
return self._transport.connected() if self._transport else False
async def _do_handshake(
self,
@ -514,11 +493,8 @@ async def _connect_chan(
addr: UnwrappedAddress
) -> typing.AsyncGenerator[Channel, None]:
'''
Create and connect a `Channel` to the provided `addr`, disconnect
it on cm exit.
NOTE, this is a lowlevel, normally internal-only iface. You
should likely use `.open_portal()` instead.
Create and connect a channel with disconnect on context manager
teardown.
'''
chan = await Channel.from_addr(addr)

View File

@ -154,6 +154,7 @@ class MsgTransport(Protocol):
# ...
class MsgpackTransport(MsgTransport):
# TODO: better naming for this?
@ -277,18 +278,14 @@ class MsgpackTransport(MsgTransport):
except trio.ClosedResourceError as cre:
closure_err = cre
# await tractor.devx._trace.maybe_pause_bp()
raise TransportClosed(
message=(
f'{tpt_name} was already closed locally?'
f'{tpt_name} was already closed locally ?\n'
),
src_exc=closure_err,
loglevel='error',
raise_on_report=(
'another task closed this fd'
in
closure_err.args
'another task closed this fd' in closure_err.args
),
) from closure_err
@ -438,11 +435,6 @@ class MsgpackTransport(MsgTransport):
trans_err = _re
tpt_name: str = f'{type(self).__name__!r}'
trans_err_msg: str = trans_err.args[0]
by_whom: str = {
'another task closed this fd': 'locally',
'this socket was already closed': 'by peer',
}.get(trans_err_msg)
match trans_err:
# XXX, specifc to UDS transport and its,
@ -454,13 +446,13 @@ class MsgpackTransport(MsgTransport):
case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe'
in
trans_err_msg
trans_err.args[0]
):
tpt_closed = TransportClosed.from_src_exc(
message=(
f'{tpt_name} already closed by peer\n'
),
body=f'{self}',
body=f'{self}\n',
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
@ -470,26 +462,24 @@ class MsgpackTransport(MsgTransport):
# ??TODO??, what case in piker does this and HOW
# CAN WE RE-PRODUCE IT?!?!?
case trio.ClosedResourceError() if (
by_whom
'this socket was already closed'
in
trans_err.args[0]
):
tpt_closed = TransportClosed.from_src_exc(
message=(
f'{tpt_name} was already closed {by_whom!r}?\n'
f'{tpt_name} already closed by peer\n'
),
body=f'{self}',
body=f'{self}\n',
src_exc=trans_err,
raise_on_report=True,
loglevel='transport',
)
# await tractor.devx._trace.maybe_pause_bp()
raise tpt_closed from trans_err
# XXX, unless the disconnect condition falls
# under "a normal/expected operating breakage"
# (per the `trans_err_msg` guards in the cases
# above) we usualy console-error about it and
# raise-thru. about it.
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.
case _:
log.exception(
f'{tpt_name} layer failed pre-send ??\n'