Compare commits
6 Commits
7f602db50d
...
f03730efa2
| Author | SHA1 | Date |
|---|---|---|
|
|
f03730efa2 | |
|
|
d6c405ec6b | |
|
|
ab7b75a9d6 | |
|
|
51ac0c623e | |
|
|
3f0bde1bf8 | |
|
|
fa1a15dce8 |
|
|
@ -284,6 +284,10 @@ async def _errors_relayed_via_ipc(
|
|||
try:
|
||||
yield # run RPC invoke body
|
||||
|
||||
except TransportClosed:
|
||||
log.exception('Tpt disconnect during remote-exc relay?')
|
||||
raise
|
||||
|
||||
# box and ship RPC errors for wire-transit via
|
||||
# the task's requesting parent IPC-channel.
|
||||
except (
|
||||
|
|
@ -319,6 +323,9 @@ async def _errors_relayed_via_ipc(
|
|||
and debug_kbis
|
||||
)
|
||||
)
|
||||
# TODO? better then `debug_filter` below?
|
||||
and
|
||||
not isinstance(err, TransportClosed)
|
||||
):
|
||||
# XXX QUESTION XXX: is there any case where we'll
|
||||
# want to debug IPC disconnects as a default?
|
||||
|
|
@ -327,13 +334,25 @@ async def _errors_relayed_via_ipc(
|
|||
# recovery logic - the only case is some kind of
|
||||
# strange bug in our transport layer itself? Going
|
||||
# to keep this open ended for now.
|
||||
log.debug(
|
||||
'RPC task crashed, attempting to enter debugger\n'
|
||||
f'|_{ctx}'
|
||||
)
|
||||
|
||||
if _state.debug_mode():
|
||||
log.exception(
|
||||
f'RPC task crashed!\n'
|
||||
f'Attempting to enter debugger\n'
|
||||
f'\n'
|
||||
f'{ctx}'
|
||||
)
|
||||
|
||||
entered_debug = await debug._maybe_enter_pm(
|
||||
err,
|
||||
api_frame=inspect.currentframe(),
|
||||
|
||||
# don't REPL any psuedo-expected tpt-disconnect
|
||||
# debug_filter=lambda exc: (
|
||||
# type (exc) not in {
|
||||
# TransportClosed,
|
||||
# }
|
||||
# ),
|
||||
)
|
||||
if not entered_debug:
|
||||
# if we prolly should have entered the REPL but
|
||||
|
|
@ -450,7 +469,7 @@ async def _invoke(
|
|||
kwargs: dict[str, Any],
|
||||
|
||||
is_rpc: bool = True,
|
||||
hide_tb: bool = True,
|
||||
hide_tb: bool = False,
|
||||
return_msg_type: Return|CancelAck = Return,
|
||||
|
||||
task_status: TaskStatus[
|
||||
|
|
@ -675,6 +694,22 @@ async def _invoke(
|
|||
f'{pretty_struct.pformat(return_msg)}\n'
|
||||
)
|
||||
await chan.send(return_msg)
|
||||
# ?TODO, remove the below since .send() already
|
||||
# doesn't raise on tpt-closed?
|
||||
# try:
|
||||
# await chan.send(return_msg)
|
||||
# except TransportClosed:
|
||||
# log.exception(
|
||||
# f"Failed send final result to 'parent'-side of IPC-ctx!\n"
|
||||
# f'\n'
|
||||
# f'{chan}\n'
|
||||
# f'Channel already disconnected ??\n'
|
||||
# f'\n'
|
||||
# f'{pretty_struct.pformat(return_msg)}'
|
||||
# )
|
||||
# # ?TODO? will this ever be true though?
|
||||
# if chan.connected():
|
||||
# raise
|
||||
|
||||
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
||||
# called by any of,
|
||||
|
|
|
|||
|
|
@ -459,21 +459,23 @@ class MsgpackTransport(MsgTransport):
|
|||
)
|
||||
raise tpt_closed from trans_err
|
||||
|
||||
# case trio.ClosedResourceError() if (
|
||||
# 'this socket was already closed'
|
||||
# in
|
||||
# trans_err.args[0]
|
||||
# ):
|
||||
# tpt_closed = TransportClosed.from_src_exc(
|
||||
# message=(
|
||||
# f'{tpt_name} already closed by peer\n'
|
||||
# ),
|
||||
# body=f'{self}\n',
|
||||
# src_exc=trans_err,
|
||||
# raise_on_report=True,
|
||||
# loglevel='transport',
|
||||
# )
|
||||
# raise tpt_closed from trans_err
|
||||
# ??TODO??, what case in piker does this and HOW
|
||||
# CAN WE RE-PRODUCE IT?!?!?
|
||||
case trio.ClosedResourceError() if (
|
||||
'this socket was already closed'
|
||||
in
|
||||
trans_err.args[0]
|
||||
):
|
||||
tpt_closed = TransportClosed.from_src_exc(
|
||||
message=(
|
||||
f'{tpt_name} already closed by peer\n'
|
||||
),
|
||||
body=f'{self}\n',
|
||||
src_exc=trans_err,
|
||||
raise_on_report=True,
|
||||
loglevel='transport',
|
||||
)
|
||||
raise tpt_closed from trans_err
|
||||
|
||||
# unless the disconnect condition falls under "a
|
||||
# normal operation breakage" we usualy console warn
|
||||
|
|
|
|||
|
|
@ -169,7 +169,7 @@ class UDSAddress(
|
|||
# ?^TODO, for `multiaddr`'s parser we can't use the `::`
|
||||
# above^, SO maybe a `.` or something else here?
|
||||
# sockname: str = '.'.join(actor.uid) + f'@{pid}'
|
||||
# -[ ] CURRETLY using `.` BREAKS TEST SUITE tho..
|
||||
# -[ ] CURRENTLY using `.` BREAKS TEST SUITE tho..
|
||||
else:
|
||||
prefix: str = '<unknown-actor>'
|
||||
if is_root_process():
|
||||
|
|
|
|||
|
|
@ -416,9 +416,6 @@ def get_logger(
|
|||
|
||||
mod_ns_path: str = caller_mod.__name__
|
||||
mod_pkg_ns_path: str = caller_mod.__package__
|
||||
# if 'snakelib' in mod_pkg_ns_path:
|
||||
# import pdbp
|
||||
# breakpoint()
|
||||
if (
|
||||
mod_pkg_ns_path in mod_ns_path
|
||||
or
|
||||
|
|
@ -491,10 +488,6 @@ def get_logger(
|
|||
proper_name: str = name.removeprefix(
|
||||
f'{pkg_name}.'
|
||||
)
|
||||
# if 'pylib' in name:
|
||||
# import pdbp
|
||||
# breakpoint()
|
||||
|
||||
msg: str = (
|
||||
f'@ {get_caller_mod()}\n'
|
||||
f'Duplicate pkg-name in sub-logger `name`-key?\n'
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ from outcome import (
|
|||
Outcome,
|
||||
)
|
||||
|
||||
log: StackLevelAdapter = get_logger(__name__)
|
||||
log: StackLevelAdapter = get_logger()
|
||||
|
||||
|
||||
__all__ = [
|
||||
|
|
|
|||
Loading…
Reference in New Issue