Compare commits

..

No commits in common. "d0b92bbebaa44aafbbcb410076d78673c28b244c" and "f8e25688c70f482c9a549b4231c83bb515b91f8f" have entirely different histories.

14 changed files with 98 additions and 198 deletions

View File

@ -17,7 +17,6 @@ from tractor import (
MsgStream, MsgStream,
_testing, _testing,
trionics, trionics,
TransportClosed,
) )
import trio import trio
import pytest import pytest
@ -209,16 +208,12 @@ async def main(
# TODO: is this needed or no? # TODO: is this needed or no?
raise raise
except ( except trio.ClosedResourceError:
trio.ClosedResourceError,
TransportClosed,
) as _tpt_err:
# NOTE: don't send if we already broke the # NOTE: don't send if we already broke the
# connection to avoid raising a closed-error # connection to avoid raising a closed-error
# such that we drop through to the ctl-c # such that we drop through to the ctl-c
# mashing by user. # mashing by user.
with trio.CancelScope(shield=True): await trio.sleep(0.01)
await trio.sleep(0.01)
# timeout: int = 1 # timeout: int = 1
# with trio.move_on_after(timeout) as cs: # with trio.move_on_after(timeout) as cs:
@ -252,7 +247,6 @@ async def main(
await stream.send(i) await stream.send(i)
pytest.fail('stream not closed?') pytest.fail('stream not closed?')
except ( except (
TransportClosed,
trio.ClosedResourceError, trio.ClosedResourceError,
trio.EndOfChannel, trio.EndOfChannel,
) as send_err: ) as send_err:

View File

@ -4,7 +4,6 @@
''' '''
from __future__ import annotations from __future__ import annotations
import time import time
import signal
from typing import ( from typing import (
Callable, Callable,
TYPE_CHECKING, TYPE_CHECKING,
@ -70,15 +69,12 @@ def spawn(
import os import os
os.environ['PYTHON_COLORS'] = '0' os.environ['PYTHON_COLORS'] = '0'
spawned: PexpectSpawner|None = None
def _spawn( def _spawn(
cmd: str, cmd: str,
**mkcmd_kwargs, **mkcmd_kwargs,
) -> pty_spawn.spawn: ) -> pty_spawn.spawn:
nonlocal spawned
unset_colors() unset_colors()
spawned = testdir.spawn( return testdir.spawn(
cmd=mk_cmd( cmd=mk_cmd(
cmd, cmd,
**mkcmd_kwargs, **mkcmd_kwargs,
@ -88,35 +84,9 @@ def spawn(
# ^TODO? get `pytest` core to expose underlying # ^TODO? get `pytest` core to expose underlying
# `pexpect.spawn()` stuff? # `pexpect.spawn()` stuff?
) )
return spawned
# such that test-dep can pass input script name. # such that test-dep can pass input script name.
yield _spawn # the `PexpectSpawner`, type alias. return _spawn # the `PexpectSpawner`, type alias.
if (
spawned
and
(ptyproc := spawned.ptyproc)
):
start: float = time.time()
timeout: float = 5
while (
ptyproc.isalive()
and
(
(_time_took := (time.time() - start))
<
timeout
)
):
ptyproc.kill(signal.SIGINT)
time.sleep(0.01)
if ptyproc.isalive():
ptyproc.kill(signal.SIGKILL)
# TODO? ensure we've cleaned up any UDS-paths?
# breakpoint()
@pytest.fixture( @pytest.fixture(

View File

@ -1138,10 +1138,7 @@ def test_ctxep_pauses_n_maybe_ipc_breaks(
['peer IPC channel closed abruptly?', ['peer IPC channel closed abruptly?',
'another task closed this fd', 'another task closed this fd',
'Debug lock request was CANCELLED?', 'Debug lock request was CANCELLED?',
"'MsgpackUDSStream' was already closed locally?", "TransportClosed: 'MsgpackUDSStream' was already closed locally ?",]
"TransportClosed: 'MsgpackUDSStream' was already closed 'by peer'?",
# ?TODO^? match depending on `tpt_proto(s)`?
]
# XXX races on whether these show/hit? # XXX races on whether these show/hit?
# 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!', # 'Failed to REPl via `_pause()` You called `tractor.pause()` from an already cancelled scope!',

View File

@ -98,8 +98,7 @@ def test_ipc_channel_break_during_stream(
expect_final_exc = TransportClosed expect_final_exc = TransportClosed
mod: ModuleType = import_path( mod: ModuleType = import_path(
examples_dir() examples_dir() / 'advanced_faults'
/ 'advanced_faults'
/ 'ipc_failure_during_stream.py', / 'ipc_failure_during_stream.py',
root=examples_dir(), root=examples_dir(),
consider_namespace_packages=False, consider_namespace_packages=False,
@ -114,9 +113,8 @@ def test_ipc_channel_break_during_stream(
if ( if (
# only expect EoC if trans is broken on the child side, # only expect EoC if trans is broken on the child side,
ipc_break['break_child_ipc_after'] is not False ipc_break['break_child_ipc_after'] is not False
and
# AND we tell the child to call `MsgStream.aclose()`. # AND we tell the child to call `MsgStream.aclose()`.
pre_aclose_msgstream and pre_aclose_msgstream
): ):
# expect_final_exc = trio.EndOfChannel # expect_final_exc = trio.EndOfChannel
# ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this # ^XXX NOPE! XXX^ since now `.open_stream()` absorbs this
@ -162,8 +160,7 @@ def test_ipc_channel_break_during_stream(
ipc_break['break_child_ipc_after'] is not False ipc_break['break_child_ipc_after'] is not False
and ( and (
ipc_break['break_parent_ipc_after'] ipc_break['break_parent_ipc_after']
> > ipc_break['break_child_ipc_after']
ipc_break['break_child_ipc_after']
) )
): ):
if pre_aclose_msgstream: if pre_aclose_msgstream:
@ -251,15 +248,8 @@ def test_ipc_channel_break_during_stream(
# get raw instance from pytest wrapper # get raw instance from pytest wrapper
value = excinfo.value value = excinfo.value
if isinstance(value, ExceptionGroup): if isinstance(value, ExceptionGroup):
excs: tuple[Exception] = value.exceptions excs = value.exceptions
assert ( assert len(excs) == 1
len(excs) <= 2
and
all(
isinstance(exc, TransportClosed)
for exc in excs
)
)
final_exc = excs[0] final_exc = excs[0]
assert isinstance(final_exc, expect_final_exc) assert isinstance(final_exc, expect_final_exc)

View File

@ -11,13 +11,12 @@ import trio
import tractor import tractor
from tractor import ( # typing from tractor import ( # typing
Actor, Actor,
Context,
ContextCancelled,
MsgStream,
Portal,
RemoteActorError,
current_actor, current_actor,
open_nursery, open_nursery,
Portal,
Context,
ContextCancelled,
RemoteActorError,
) )
from tractor._testing import ( from tractor._testing import (
# tractor_test, # tractor_test,
@ -797,8 +796,8 @@ async def basic_echo_server(
) -> None: ) -> None:
''' '''
Just the simplest `MsgStream` echo server which resays what you Just the simplest `MsgStream` echo server which resays what
told it but with its uid in front ;) you told it but with its uid in front ;)
''' '''
actor: Actor = tractor.current_actor() actor: Actor = tractor.current_actor()
@ -967,14 +966,9 @@ async def tell_little_bro(
caller: str = '', caller: str = '',
err_after: float|None = None, err_after: float|None = None,
rng_seed: int = 100, rng_seed: int = 50,
# NOTE, ensure ^ is large enough (on fast hw anyway)
# to ensure the peer cancel req arrives before the
# echoing dialog does itself Bp
): ):
# contact target actor, do a stream dialog. # contact target actor, do a stream dialog.
lb: Portal
echo_ipc: MsgStream
async with ( async with (
tractor.wait_for_actor( tractor.wait_for_actor(
name=actor_name name=actor_name
@ -989,6 +983,7 @@ async def tell_little_bro(
else None else None
), ),
) as (sub_ctx, first), ) as (sub_ctx, first),
sub_ctx.open_stream() as echo_ipc, sub_ctx.open_stream() as echo_ipc,
): ):
actor: Actor = current_actor() actor: Actor = current_actor()
@ -999,7 +994,6 @@ async def tell_little_bro(
i, i,
) )
await echo_ipc.send(msg) await echo_ipc.send(msg)
await trio.sleep(0.001)
resp = await echo_ipc.receive() resp = await echo_ipc.receive()
print( print(
f'{caller} => {actor_name}: {msg}\n' f'{caller} => {actor_name}: {msg}\n'
@ -1012,9 +1006,6 @@ async def tell_little_bro(
assert sub_uid != uid assert sub_uid != uid
assert _i == i assert _i == i
# XXX, usually should never get here!
# await tractor.pause()
@pytest.mark.parametrize( @pytest.mark.parametrize(
'raise_client_error', 'raise_client_error',

View File

@ -70,7 +70,6 @@ from ._exceptions import (
MsgTypeError, MsgTypeError,
RemoteActorError, RemoteActorError,
StreamOverrun, StreamOverrun,
TransportClosed,
pack_from_raise, pack_from_raise,
unpack_error, unpack_error,
) )
@ -2429,7 +2428,10 @@ async def open_context_from_portal(
try: try:
# await pause(shield=True) # await pause(shield=True)
await ctx.cancel() await ctx.cancel()
except TransportClosed: except (
trio.BrokenResourceError,
trio.ClosedResourceError,
):
log.warning( log.warning(
'IPC connection for context is broken?\n' 'IPC connection for context is broken?\n'
f'task: {ctx.cid}\n' f'task: {ctx.cid}\n'

View File

@ -91,13 +91,10 @@ async def get_registry(
@acm @acm
async def get_root(**kwargs) -> AsyncGenerator[Portal, None]: async def get_root(
''' **kwargs,
Deliver the current actor's "root process" actor (yes in actor ) -> AsyncGenerator[Portal, None]:
and proc tree terms) by delivering a `Portal` from the spawn-time
provided contact address.
'''
# TODO: rename mailbox to `_root_maddr` when we finally # TODO: rename mailbox to `_root_maddr` when we finally
# add and impl libp2p multi-addrs? # add and impl libp2p multi-addrs?
addr = _runtime_vars['_root_mailbox'] addr = _runtime_vars['_root_mailbox']
@ -196,11 +193,6 @@ async def maybe_open_portal(
addr: UnwrappedAddress, addr: UnwrappedAddress,
name: str, name: str,
): ):
'''
Open a `Portal` to the actor serving @ `addr` or `None` if no
peer can be contacted or found.
'''
async with query_actor( async with query_actor(
name=name, name=name,
regaddr=addr, regaddr=addr,

View File

@ -329,7 +329,18 @@ class Portal:
# if we get here some weird cancellation case happened # if we get here some weird cancellation case happened
return False return False
except TransportClosed as tpt_err: except (
# XXX, should never really get raised unless we aren't
# wrapping them in the below type by mistake?
#
# Leaving the catch here for now until we're very sure
# all the cases (for various tpt protos) have indeed been
# re-wrapped ;p
trio.ClosedResourceError,
trio.BrokenResourceError,
TransportClosed,
) as tpt_err:
ipc_borked_report: str = ( ipc_borked_report: str = (
f'IPC for actor already closed/broken?\n\n' f'IPC for actor already closed/broken?\n\n'
f'\n' f'\n'

View File

@ -284,14 +284,9 @@ async def _errors_relayed_via_ipc(
try: try:
yield # run RPC invoke body yield # run RPC invoke body
# NOTE, never REPL any pseudo-expected tpt-disconnect. except TransportClosed:
except TransportClosed as err: log.exception('Tpt disconnect during remote-exc relay?')
rpc_err = err raise
log.warning(
f'Tpt disconnect during remote-exc relay due to,\n'
f'{err!r}\n'
)
raise err
# box and ship RPC errors for wire-transit via # box and ship RPC errors for wire-transit via
# the task's requesting parent IPC-channel. # the task's requesting parent IPC-channel.
@ -328,6 +323,9 @@ async def _errors_relayed_via_ipc(
and debug_kbis and debug_kbis
) )
) )
# TODO? better then `debug_filter` below?
and
not isinstance(err, TransportClosed)
): ):
# XXX QUESTION XXX: is there any case where we'll # XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default? # want to debug IPC disconnects as a default?
@ -348,6 +346,13 @@ async def _errors_relayed_via_ipc(
entered_debug = await debug._maybe_enter_pm( entered_debug = await debug._maybe_enter_pm(
err, err,
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
# don't REPL any psuedo-expected tpt-disconnect
# debug_filter=lambda exc: (
# type (exc) not in {
# TransportClosed,
# }
# ),
) )
if not entered_debug: if not entered_debug:
# if we prolly should have entered the REPL but # if we prolly should have entered the REPL but
@ -433,7 +438,7 @@ async def _errors_relayed_via_ipc(
# cancel scope will not have been inserted yet # cancel scope will not have been inserted yet
if is_rpc: if is_rpc:
log.warning( log.warning(
'RPC task likely crashed or cancelled before start?\n' 'RPC task likely errored or cancelled before start?\n'
f'|_{ctx._task}\n' f'|_{ctx._task}\n'
f' >> {ctx.repr_rpc}\n' f' >> {ctx.repr_rpc}\n'
) )
@ -689,6 +694,22 @@ async def _invoke(
f'{pretty_struct.pformat(return_msg)}\n' f'{pretty_struct.pformat(return_msg)}\n'
) )
await chan.send(return_msg) await chan.send(return_msg)
# ?TODO, remove the below since .send() already
# doesn't raise on tpt-closed?
# try:
# await chan.send(return_msg)
# except TransportClosed:
# log.exception(
# f"Failed send final result to 'parent'-side of IPC-ctx!\n"
# f'\n'
# f'{chan}\n'
# f'Channel already disconnected ??\n'
# f'\n'
# f'{pretty_struct.pformat(return_msg)}'
# )
# # ?TODO? will this ever be true though?
# if chan.connected():
# raise
# NOTE: this happens IFF `ctx._scope.cancel()` is # NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of, # called by any of,
@ -914,11 +935,6 @@ async def try_ship_error_to_remote(
# XXX NOTE XXX in SC terms this is one of the worst things # XXX NOTE XXX in SC terms this is one of the worst things
# that can happen and provides for a 2-general's dilemma.. # that can happen and provides for a 2-general's dilemma..
#
# FURHTER, we should never really have to handle these
# lowlevel excs from `trio` since the `Channel.send()` layers
# downward should be mostly wrapping such cases in a
# tpt-closed; the `.critical()` usage is warranted.
except ( except (
trio.ClosedResourceError, trio.ClosedResourceError,
trio.BrokenResourceError, trio.BrokenResourceError,

View File

@ -183,14 +183,6 @@ class Actor:
def is_registrar(self) -> bool: def is_registrar(self) -> bool:
return self.is_arbiter return self.is_arbiter
@property
def is_root(self) -> bool:
'''
This actor is the parent most in the tree?
'''
return _state.is_root_process()
msg_buffer_size: int = 2**6 msg_buffer_size: int = 2**6
# nursery placeholders filled in by `async_main()`, # nursery placeholders filled in by `async_main()`,

View File

@ -38,7 +38,6 @@ import trio
from ._exceptions import ( from ._exceptions import (
ContextCancelled, ContextCancelled,
RemoteActorError, RemoteActorError,
TransportClosed,
) )
from .log import get_logger from .log import get_logger
from .trionics import ( from .trionics import (
@ -410,8 +409,10 @@ class MsgStream(trio.abc.Channel):
# it). # it).
with trio.CancelScope(shield=True): with trio.CancelScope(shield=True):
await self._ctx.send_stop() await self._ctx.send_stop()
except ( except (
TransportClosed, trio.BrokenResourceError,
trio.ClosedResourceError
) as re: ) as re:
# the underlying channel may already have been pulled # the underlying channel may already have been pulled
# in which case our stop message is meaningless since # in which case our stop message is meaningless since
@ -592,8 +593,9 @@ class MsgStream(trio.abc.Channel):
), ),
) )
except ( except (
trio.ClosedResourceError,
trio.BrokenResourceError,
BrokenPipeError, BrokenPipeError,
TransportClosed,
) as _trans_err: ) as _trans_err:
trans_err = _trans_err trans_err = _trans_err
if ( if (

View File

@ -1257,26 +1257,3 @@ async def breakpoint(
api_frame=inspect.currentframe(), api_frame=inspect.currentframe(),
**kwargs, **kwargs,
) )
async def maybe_pause_bp():
'''
Internal (ONLY for now) `breakpoint()`-er fn which only tries to
use the multi-actor `.pause()` API when the current actor is the
root.
?! BUT WHY !?
-------
This is useful when debugging cases where the tpt layer breaks
(or is intentionally broken, say during resiliency testing) in
the case where a child can no longer contact the root process to
acquire the process-tree-singleton TTY lock.
'''
import tractor
actor = tractor.current_actor()
if actor.aid.name == 'root':
await tractor.pause(shield=True)
else:
tractor.devx.mk_pdb().set_trace()

View File

@ -307,12 +307,7 @@ class Channel:
) -> None: ) -> None:
''' '''
Send a coded msg-blob over the underlying IPC transport. Send a coded msg-blob over the transport.
This fn raises `TransportClosed` on comms failures and is
normally handled by higher level runtime machinery for the
expected-graceful cases, normally ephemercal
(re/dis)connects.
''' '''
__tracebackhide__: bool = hide_tb __tracebackhide__: bool = hide_tb
@ -339,10 +334,9 @@ class Channel:
except KeyError: except KeyError:
raise err raise err
case TransportClosed(): case TransportClosed():
src_exc_str: str = err.repr_src_exc()
log.transport( log.transport(
f'Transport stream closed due to,\n' f'Transport stream closed due to\n'
f'{src_exc_str}' f'{err.repr_src_exc()}\n'
) )
case _: case _:
@ -351,11 +345,6 @@ class Channel:
raise raise
async def recv(self) -> Any: async def recv(self) -> Any:
'''
Receive the latest (queued) msg-blob from the underlying IPC
transport.
'''
assert self._transport assert self._transport
return await self._transport.recv() return await self._transport.recv()
@ -429,18 +418,16 @@ class Channel:
self self
) -> AsyncGenerator[Any, None]: ) -> AsyncGenerator[Any, None]:
''' '''
Yield `MsgType` IPC msgs decoded and deliverd from an Yield `MsgType` IPC msgs decoded and deliverd from
underlying `MsgTransport` protocol. an underlying `MsgTransport` protocol.
This is a streaming routine alo implemented as an This is a streaming routine alo implemented as an async-gen
async-generator func (same a `MsgTransport._iter_pkts()`) func (same a `MsgTransport._iter_pkts()`) gets allocated by
gets allocated by a `.__call__()` inside `.__init__()` where a `.__call__()` inside `.__init__()` where it is assigned to
it is assigned to the `._aiter_msgs` attr. the `._aiter_msgs` attr.
''' '''
if not self._transport: assert self._transport
raise RuntimeError('No IPC transport initialized!?')
while True: while True:
try: try:
async for msg in self._transport: async for msg in self._transport:
@ -475,15 +462,7 @@ class Channel:
# continue # continue
def connected(self) -> bool: def connected(self) -> bool:
''' return self._transport.connected() if self._transport else False
Predicate whether underlying IPC tpt is connected.
'''
return (
self._transport.connected()
if self._transport
else False
)
async def _do_handshake( async def _do_handshake(
self, self,
@ -514,11 +493,8 @@ async def _connect_chan(
addr: UnwrappedAddress addr: UnwrappedAddress
) -> typing.AsyncGenerator[Channel, None]: ) -> typing.AsyncGenerator[Channel, None]:
''' '''
Create and connect a `Channel` to the provided `addr`, disconnect Create and connect a channel with disconnect on context manager
it on cm exit. teardown.
NOTE, this is a lowlevel, normally internal-only iface. You
should likely use `.open_portal()` instead.
''' '''
chan = await Channel.from_addr(addr) chan = await Channel.from_addr(addr)

View File

@ -154,6 +154,7 @@ class MsgTransport(Protocol):
# ... # ...
class MsgpackTransport(MsgTransport): class MsgpackTransport(MsgTransport):
# TODO: better naming for this? # TODO: better naming for this?
@ -277,18 +278,14 @@ class MsgpackTransport(MsgTransport):
except trio.ClosedResourceError as cre: except trio.ClosedResourceError as cre:
closure_err = cre closure_err = cre
# await tractor.devx._trace.maybe_pause_bp()
raise TransportClosed( raise TransportClosed(
message=( message=(
f'{tpt_name} was already closed locally?' f'{tpt_name} was already closed locally ?\n'
), ),
src_exc=closure_err, src_exc=closure_err,
loglevel='error', loglevel='error',
raise_on_report=( raise_on_report=(
'another task closed this fd' 'another task closed this fd' in closure_err.args
in
closure_err.args
), ),
) from closure_err ) from closure_err
@ -438,11 +435,6 @@ class MsgpackTransport(MsgTransport):
trans_err = _re trans_err = _re
tpt_name: str = f'{type(self).__name__!r}' tpt_name: str = f'{type(self).__name__!r}'
trans_err_msg: str = trans_err.args[0]
by_whom: str = {
'another task closed this fd': 'locally',
'this socket was already closed': 'by peer',
}.get(trans_err_msg)
match trans_err: match trans_err:
# XXX, specifc to UDS transport and its, # XXX, specifc to UDS transport and its,
@ -454,13 +446,13 @@ class MsgpackTransport(MsgTransport):
case trio.BrokenResourceError() if ( case trio.BrokenResourceError() if (
'[Errno 32] Broken pipe' '[Errno 32] Broken pipe'
in in
trans_err_msg trans_err.args[0]
): ):
tpt_closed = TransportClosed.from_src_exc( tpt_closed = TransportClosed.from_src_exc(
message=( message=(
f'{tpt_name} already closed by peer\n' f'{tpt_name} already closed by peer\n'
), ),
body=f'{self}', body=f'{self}\n',
src_exc=trans_err, src_exc=trans_err,
raise_on_report=True, raise_on_report=True,
loglevel='transport', loglevel='transport',
@ -470,26 +462,24 @@ class MsgpackTransport(MsgTransport):
# ??TODO??, what case in piker does this and HOW # ??TODO??, what case in piker does this and HOW
# CAN WE RE-PRODUCE IT?!?!? # CAN WE RE-PRODUCE IT?!?!?
case trio.ClosedResourceError() if ( case trio.ClosedResourceError() if (
by_whom 'this socket was already closed'
in
trans_err.args[0]
): ):
tpt_closed = TransportClosed.from_src_exc( tpt_closed = TransportClosed.from_src_exc(
message=( message=(
f'{tpt_name} was already closed {by_whom!r}?\n' f'{tpt_name} already closed by peer\n'
), ),
body=f'{self}', body=f'{self}\n',
src_exc=trans_err, src_exc=trans_err,
raise_on_report=True, raise_on_report=True,
loglevel='transport', loglevel='transport',
) )
# await tractor.devx._trace.maybe_pause_bp()
raise tpt_closed from trans_err raise tpt_closed from trans_err
# XXX, unless the disconnect condition falls # unless the disconnect condition falls under "a
# under "a normal/expected operating breakage" # normal operation breakage" we usualy console warn
# (per the `trans_err_msg` guards in the cases # about it.
# above) we usualy console-error about it and
# raise-thru. about it.
case _: case _:
log.exception( log.exception(
f'{tpt_name} layer failed pre-send ??\n' f'{tpt_name} layer failed pre-send ??\n'