Compare commits
8 Commits
5fc64107e5
...
3749720d74
Author | SHA1 | Date |
---|---|---|
|
3749720d74 | |
|
7d15b3ea4b | |
|
262008f784 | |
|
059903bf81 | |
|
900e0b4cd1 | |
|
38a4e37d47 | |
|
a06289c47d | |
|
eeb748a206 |
|
@ -0,0 +1,145 @@
|
|||
from contextlib import (
|
||||
contextmanager as cm,
|
||||
# TODO, any diff in async case(s)??
|
||||
# asynccontextmanager as acm,
|
||||
)
|
||||
from functools import partial
|
||||
|
||||
import tractor
|
||||
import trio
|
||||
|
||||
|
||||
log = tractor.log.get_logger(__name__)
|
||||
tractor.log.get_console_log('info')
|
||||
|
||||
@cm
|
||||
def teardown_on_exc(
|
||||
raise_from_handler: bool = False,
|
||||
):
|
||||
'''
|
||||
You could also have a teardown handler which catches any exc and
|
||||
does some required teardown. In this case the problem is
|
||||
compounded UNLESS you ensure the handler's scope is OUTSIDE the
|
||||
`ux.aclose()`.. that is in the caller's enclosing scope.
|
||||
|
||||
'''
|
||||
try:
|
||||
yield
|
||||
except BaseException as _berr:
|
||||
berr = _berr
|
||||
log.exception(
|
||||
f'Handling termination teardown in child due to,\n'
|
||||
f'{berr!r}\n'
|
||||
)
|
||||
if raise_from_handler:
|
||||
# XXX teardown ops XXX
|
||||
# on termination these steps say need to be run to
|
||||
# ensure wider system consistency (like the state of
|
||||
# remote connections/services).
|
||||
#
|
||||
# HOWEVER, any bug in this teardown code is also
|
||||
# masked by the `tx.aclose()`!
|
||||
# this is also true if `_tn.cancel_scope` is
|
||||
# `.cancel_called` by the parent in a graceful
|
||||
# request case..
|
||||
|
||||
# simulate a bug in teardown handler.
|
||||
raise RuntimeError(
|
||||
'woopsie teardown bug!'
|
||||
)
|
||||
|
||||
raise # no teardown bug.
|
||||
|
||||
|
||||
async def finite_stream_to_rent(
|
||||
tx: trio.abc.SendChannel,
|
||||
child_errors_mid_stream: bool,
|
||||
|
||||
task_status: trio.TaskStatus[
|
||||
trio.CancelScope,
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
):
|
||||
async with (
|
||||
# XXX without this unmasker the mid-streaming RTE is never
|
||||
# reported since it is masked by the `tx.aclose()`
|
||||
# call which in turn raises `Cancelled`!
|
||||
#
|
||||
# NOTE, this is WITHOUT doing any exception handling
|
||||
# inside the child task!
|
||||
#
|
||||
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
||||
# tractor.trionics.maybe_raise_from_masking_exc(),
|
||||
|
||||
tx as tx, # .aclose() is the guilty masker chkpt!
|
||||
trio.open_nursery() as _tn,
|
||||
):
|
||||
# pass our scope back to parent for supervision\
|
||||
# control.
|
||||
task_status.started(_tn.cancel_scope)
|
||||
|
||||
with teardown_on_exc(
|
||||
raise_from_handler=not child_errors_mid_stream,
|
||||
):
|
||||
for i in range(100):
|
||||
log.info(
|
||||
f'Child tx {i!r}\n'
|
||||
)
|
||||
if (
|
||||
child_errors_mid_stream
|
||||
and
|
||||
i == 66
|
||||
):
|
||||
# oh wait but WOOPS there's a bug
|
||||
# in that teardown code!?
|
||||
raise RuntimeError(
|
||||
'woopsie, a mid-streaming bug!?'
|
||||
)
|
||||
|
||||
await tx.send(i)
|
||||
|
||||
|
||||
async def main(
|
||||
# TODO! toggle this for the 2 cases!
|
||||
# 1. child errors mid-stream while parent is also requesting
|
||||
# (graceful) cancel of that child streamer.
|
||||
#
|
||||
# 2. child contains a teardown handler which contains a
|
||||
# bug and raises.
|
||||
#
|
||||
child_errors_mid_stream: bool,
|
||||
):
|
||||
tx, rx = trio.open_memory_channel(1)
|
||||
|
||||
async with (
|
||||
trio.open_nursery() as tn,
|
||||
rx as rx,
|
||||
):
|
||||
|
||||
_child_cs = await tn.start(
|
||||
partial(
|
||||
finite_stream_to_rent,
|
||||
child_errors_mid_stream=child_errors_mid_stream,
|
||||
tx=tx,
|
||||
)
|
||||
)
|
||||
async for msg in rx:
|
||||
log.info(
|
||||
f'Rent rx {msg!r}\n'
|
||||
)
|
||||
|
||||
# simulate some external cancellation
|
||||
# request **JUST BEFORE** the child errors.
|
||||
if msg == 65:
|
||||
log.cancel(
|
||||
f'Cancelling parent on,\n'
|
||||
f'msg={msg}\n'
|
||||
f'\n'
|
||||
f'Simulates OOB cancel request!\n'
|
||||
)
|
||||
tn.cancel_scope.cancel()
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
for case in [True, False]:
|
||||
trio.run(main, case)
|
|
@ -235,10 +235,16 @@ async def cancel_after(wait, reg_addr):
|
|||
|
||||
|
||||
@pytest.fixture(scope='module')
|
||||
def time_quad_ex(reg_addr, ci_env, spawn_backend):
|
||||
def time_quad_ex(
|
||||
reg_addr: tuple,
|
||||
ci_env: bool,
|
||||
spawn_backend: str,
|
||||
):
|
||||
if spawn_backend == 'mp':
|
||||
"""no idea but the mp *nix runs are flaking out here often...
|
||||
"""
|
||||
'''
|
||||
no idea but the mp *nix runs are flaking out here often...
|
||||
|
||||
'''
|
||||
pytest.skip("Test is too flaky on mp in CI")
|
||||
|
||||
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
|
||||
|
@ -249,12 +255,24 @@ def time_quad_ex(reg_addr, ci_env, spawn_backend):
|
|||
return results, diff
|
||||
|
||||
|
||||
def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
|
||||
"""This also serves as a kind of "we'd like to be this fast test"."""
|
||||
def test_a_quadruple_example(
|
||||
time_quad_ex: tuple,
|
||||
ci_env: bool,
|
||||
spawn_backend: str,
|
||||
):
|
||||
'''
|
||||
This also serves as a kind of "we'd like to be this fast test".
|
||||
|
||||
'''
|
||||
results, diff = time_quad_ex
|
||||
assert results
|
||||
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
|
||||
this_fast = (
|
||||
6 if platform.system() in (
|
||||
'Windows',
|
||||
'Darwin',
|
||||
)
|
||||
else 3
|
||||
)
|
||||
assert diff < this_fast
|
||||
|
||||
|
||||
|
|
|
@ -478,7 +478,10 @@ async def open_root_actor(
|
|||
|
||||
# start runtime in a bg sub-task, yield to caller.
|
||||
async with (
|
||||
collapse_eg(),
|
||||
collapse_eg(
|
||||
# bp=True,
|
||||
hide_tb=False,
|
||||
),
|
||||
trio.open_nursery() as root_tn,
|
||||
|
||||
# XXX, finally-footgun below?
|
||||
|
@ -523,6 +526,12 @@ async def open_root_actor(
|
|||
err,
|
||||
api_frame=inspect.currentframe(),
|
||||
debug_filter=debug_filter,
|
||||
|
||||
# XXX NOTE, required to debug root-actor
|
||||
# crashes under cancellation conditions; so
|
||||
# most of them!
|
||||
shield=root_tn.cancel_scope.cancel_called,
|
||||
# ^TODO? write a (debugger) test for this ya?
|
||||
)
|
||||
|
||||
if (
|
||||
|
|
|
@ -324,8 +324,8 @@ async def _errors_relayed_via_ipc(
|
|||
)
|
||||
)
|
||||
# TODO? better then `debug_filter` below?
|
||||
# and
|
||||
# not isinstance(err, TransportClosed)
|
||||
and
|
||||
not isinstance(err, TransportClosed)
|
||||
):
|
||||
# XXX QUESTION XXX: is there any case where we'll
|
||||
# want to debug IPC disconnects as a default?
|
||||
|
@ -693,20 +693,23 @@ async def _invoke(
|
|||
f'\n'
|
||||
f'{pretty_struct.pformat(return_msg)}\n'
|
||||
)
|
||||
try:
|
||||
await chan.send(return_msg)
|
||||
except TransportClosed:
|
||||
log.exception(
|
||||
f"Failed send final result to 'parent'-side of IPC-ctx!\n"
|
||||
f'\n'
|
||||
f'{chan}\n'
|
||||
f'Channel already disconnected ??\n'
|
||||
f'\n'
|
||||
f'{pretty_struct.pformat(return_msg)}'
|
||||
)
|
||||
# ?TODO? will this ever be true though?
|
||||
if chan.connected():
|
||||
raise
|
||||
await chan.send(return_msg)
|
||||
# ?TODO, remove the below since .send() already
|
||||
# doesn't raise on tpt-closed?
|
||||
# try:
|
||||
# await chan.send(return_msg)
|
||||
# except TransportClosed:
|
||||
# log.exception(
|
||||
# f"Failed send final result to 'parent'-side of IPC-ctx!\n"
|
||||
# f'\n'
|
||||
# f'{chan}\n'
|
||||
# f'Channel already disconnected ??\n'
|
||||
# f'\n'
|
||||
# f'{pretty_struct.pformat(return_msg)}'
|
||||
# )
|
||||
# # ?TODO? will this ever be true though?
|
||||
# if chan.connected():
|
||||
# raise
|
||||
|
||||
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
||||
# called by any of,
|
||||
|
|
|
@ -297,6 +297,23 @@ async def hard_kill(
|
|||
# zombies (as a feature) we ask the OS to do send in the
|
||||
# removal swad as the last resort.
|
||||
if cs.cancelled_caught:
|
||||
|
||||
# TODO? attempt at intermediary-rent-sub
|
||||
# with child in debug lock?
|
||||
# |_https://github.com/goodboy/tractor/issues/320
|
||||
#
|
||||
# if not is_root_process():
|
||||
# log.warning(
|
||||
# 'Attempting to acquire debug-REPL-lock before zombie reap!'
|
||||
# )
|
||||
# with trio.CancelScope(shield=True):
|
||||
# async with debug.acquire_debug_lock(
|
||||
# subactor_uid=current_actor().uid,
|
||||
# ) as _ctx:
|
||||
# log.warning(
|
||||
# 'Acquired debug lock, child ready to be killed ??\n'
|
||||
# )
|
||||
|
||||
# TODO: toss in the skynet-logo face as ascii art?
|
||||
log.critical(
|
||||
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
|
||||
|
|
|
@ -430,6 +430,7 @@ class MsgpackTransport(MsgTransport):
|
|||
return await self.stream.send_all(size + bytes_data)
|
||||
except (
|
||||
trio.BrokenResourceError,
|
||||
trio.ClosedResourceError,
|
||||
) as _re:
|
||||
trans_err = _re
|
||||
tpt_name: str = f'{type(self).__name__!r}'
|
||||
|
@ -458,6 +459,22 @@ class MsgpackTransport(MsgTransport):
|
|||
)
|
||||
raise tpt_closed from trans_err
|
||||
|
||||
# case trio.ClosedResourceError() if (
|
||||
# 'this socket was already closed'
|
||||
# in
|
||||
# trans_err.args[0]
|
||||
# ):
|
||||
# tpt_closed = TransportClosed.from_src_exc(
|
||||
# message=(
|
||||
# f'{tpt_name} already closed by peer\n'
|
||||
# ),
|
||||
# body=f'{self}\n',
|
||||
# src_exc=trans_err,
|
||||
# raise_on_report=True,
|
||||
# loglevel='transport',
|
||||
# )
|
||||
# raise tpt_closed from trans_err
|
||||
|
||||
# unless the disconnect condition falls under "a
|
||||
# normal operation breakage" we usualy console warn
|
||||
# about it.
|
||||
|
|
Loading…
Reference in New Issue