Compare commits
8 Commits
5fc64107e5
...
3749720d74
Author | SHA1 | Date |
---|---|---|
|
3749720d74 | |
|
7d15b3ea4b | |
|
262008f784 | |
|
059903bf81 | |
|
900e0b4cd1 | |
|
38a4e37d47 | |
|
a06289c47d | |
|
eeb748a206 |
|
@ -0,0 +1,145 @@
|
||||||
|
from contextlib import (
|
||||||
|
contextmanager as cm,
|
||||||
|
# TODO, any diff in async case(s)??
|
||||||
|
# asynccontextmanager as acm,
|
||||||
|
)
|
||||||
|
from functools import partial
|
||||||
|
|
||||||
|
import tractor
|
||||||
|
import trio
|
||||||
|
|
||||||
|
|
||||||
|
log = tractor.log.get_logger(__name__)
|
||||||
|
tractor.log.get_console_log('info')
|
||||||
|
|
||||||
|
@cm
|
||||||
|
def teardown_on_exc(
|
||||||
|
raise_from_handler: bool = False,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
You could also have a teardown handler which catches any exc and
|
||||||
|
does some required teardown. In this case the problem is
|
||||||
|
compounded UNLESS you ensure the handler's scope is OUTSIDE the
|
||||||
|
`ux.aclose()`.. that is in the caller's enclosing scope.
|
||||||
|
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
except BaseException as _berr:
|
||||||
|
berr = _berr
|
||||||
|
log.exception(
|
||||||
|
f'Handling termination teardown in child due to,\n'
|
||||||
|
f'{berr!r}\n'
|
||||||
|
)
|
||||||
|
if raise_from_handler:
|
||||||
|
# XXX teardown ops XXX
|
||||||
|
# on termination these steps say need to be run to
|
||||||
|
# ensure wider system consistency (like the state of
|
||||||
|
# remote connections/services).
|
||||||
|
#
|
||||||
|
# HOWEVER, any bug in this teardown code is also
|
||||||
|
# masked by the `tx.aclose()`!
|
||||||
|
# this is also true if `_tn.cancel_scope` is
|
||||||
|
# `.cancel_called` by the parent in a graceful
|
||||||
|
# request case..
|
||||||
|
|
||||||
|
# simulate a bug in teardown handler.
|
||||||
|
raise RuntimeError(
|
||||||
|
'woopsie teardown bug!'
|
||||||
|
)
|
||||||
|
|
||||||
|
raise # no teardown bug.
|
||||||
|
|
||||||
|
|
||||||
|
async def finite_stream_to_rent(
|
||||||
|
tx: trio.abc.SendChannel,
|
||||||
|
child_errors_mid_stream: bool,
|
||||||
|
|
||||||
|
task_status: trio.TaskStatus[
|
||||||
|
trio.CancelScope,
|
||||||
|
] = trio.TASK_STATUS_IGNORED,
|
||||||
|
):
|
||||||
|
async with (
|
||||||
|
# XXX without this unmasker the mid-streaming RTE is never
|
||||||
|
# reported since it is masked by the `tx.aclose()`
|
||||||
|
# call which in turn raises `Cancelled`!
|
||||||
|
#
|
||||||
|
# NOTE, this is WITHOUT doing any exception handling
|
||||||
|
# inside the child task!
|
||||||
|
#
|
||||||
|
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
|
||||||
|
# tractor.trionics.maybe_raise_from_masking_exc(),
|
||||||
|
|
||||||
|
tx as tx, # .aclose() is the guilty masker chkpt!
|
||||||
|
trio.open_nursery() as _tn,
|
||||||
|
):
|
||||||
|
# pass our scope back to parent for supervision\
|
||||||
|
# control.
|
||||||
|
task_status.started(_tn.cancel_scope)
|
||||||
|
|
||||||
|
with teardown_on_exc(
|
||||||
|
raise_from_handler=not child_errors_mid_stream,
|
||||||
|
):
|
||||||
|
for i in range(100):
|
||||||
|
log.info(
|
||||||
|
f'Child tx {i!r}\n'
|
||||||
|
)
|
||||||
|
if (
|
||||||
|
child_errors_mid_stream
|
||||||
|
and
|
||||||
|
i == 66
|
||||||
|
):
|
||||||
|
# oh wait but WOOPS there's a bug
|
||||||
|
# in that teardown code!?
|
||||||
|
raise RuntimeError(
|
||||||
|
'woopsie, a mid-streaming bug!?'
|
||||||
|
)
|
||||||
|
|
||||||
|
await tx.send(i)
|
||||||
|
|
||||||
|
|
||||||
|
async def main(
|
||||||
|
# TODO! toggle this for the 2 cases!
|
||||||
|
# 1. child errors mid-stream while parent is also requesting
|
||||||
|
# (graceful) cancel of that child streamer.
|
||||||
|
#
|
||||||
|
# 2. child contains a teardown handler which contains a
|
||||||
|
# bug and raises.
|
||||||
|
#
|
||||||
|
child_errors_mid_stream: bool,
|
||||||
|
):
|
||||||
|
tx, rx = trio.open_memory_channel(1)
|
||||||
|
|
||||||
|
async with (
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
rx as rx,
|
||||||
|
):
|
||||||
|
|
||||||
|
_child_cs = await tn.start(
|
||||||
|
partial(
|
||||||
|
finite_stream_to_rent,
|
||||||
|
child_errors_mid_stream=child_errors_mid_stream,
|
||||||
|
tx=tx,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
async for msg in rx:
|
||||||
|
log.info(
|
||||||
|
f'Rent rx {msg!r}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
# simulate some external cancellation
|
||||||
|
# request **JUST BEFORE** the child errors.
|
||||||
|
if msg == 65:
|
||||||
|
log.cancel(
|
||||||
|
f'Cancelling parent on,\n'
|
||||||
|
f'msg={msg}\n'
|
||||||
|
f'\n'
|
||||||
|
f'Simulates OOB cancel request!\n'
|
||||||
|
)
|
||||||
|
tn.cancel_scope.cancel()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
|
||||||
|
for case in [True, False]:
|
||||||
|
trio.run(main, case)
|
|
@ -235,10 +235,16 @@ async def cancel_after(wait, reg_addr):
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope='module')
|
@pytest.fixture(scope='module')
|
||||||
def time_quad_ex(reg_addr, ci_env, spawn_backend):
|
def time_quad_ex(
|
||||||
|
reg_addr: tuple,
|
||||||
|
ci_env: bool,
|
||||||
|
spawn_backend: str,
|
||||||
|
):
|
||||||
if spawn_backend == 'mp':
|
if spawn_backend == 'mp':
|
||||||
"""no idea but the mp *nix runs are flaking out here often...
|
'''
|
||||||
"""
|
no idea but the mp *nix runs are flaking out here often...
|
||||||
|
|
||||||
|
'''
|
||||||
pytest.skip("Test is too flaky on mp in CI")
|
pytest.skip("Test is too flaky on mp in CI")
|
||||||
|
|
||||||
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
|
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
|
||||||
|
@ -249,12 +255,24 @@ def time_quad_ex(reg_addr, ci_env, spawn_backend):
|
||||||
return results, diff
|
return results, diff
|
||||||
|
|
||||||
|
|
||||||
def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
|
def test_a_quadruple_example(
|
||||||
"""This also serves as a kind of "we'd like to be this fast test"."""
|
time_quad_ex: tuple,
|
||||||
|
ci_env: bool,
|
||||||
|
spawn_backend: str,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
This also serves as a kind of "we'd like to be this fast test".
|
||||||
|
|
||||||
|
'''
|
||||||
results, diff = time_quad_ex
|
results, diff = time_quad_ex
|
||||||
assert results
|
assert results
|
||||||
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
|
this_fast = (
|
||||||
|
6 if platform.system() in (
|
||||||
|
'Windows',
|
||||||
|
'Darwin',
|
||||||
|
)
|
||||||
|
else 3
|
||||||
|
)
|
||||||
assert diff < this_fast
|
assert diff < this_fast
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -478,7 +478,10 @@ async def open_root_actor(
|
||||||
|
|
||||||
# start runtime in a bg sub-task, yield to caller.
|
# start runtime in a bg sub-task, yield to caller.
|
||||||
async with (
|
async with (
|
||||||
collapse_eg(),
|
collapse_eg(
|
||||||
|
# bp=True,
|
||||||
|
hide_tb=False,
|
||||||
|
),
|
||||||
trio.open_nursery() as root_tn,
|
trio.open_nursery() as root_tn,
|
||||||
|
|
||||||
# XXX, finally-footgun below?
|
# XXX, finally-footgun below?
|
||||||
|
@ -523,6 +526,12 @@ async def open_root_actor(
|
||||||
err,
|
err,
|
||||||
api_frame=inspect.currentframe(),
|
api_frame=inspect.currentframe(),
|
||||||
debug_filter=debug_filter,
|
debug_filter=debug_filter,
|
||||||
|
|
||||||
|
# XXX NOTE, required to debug root-actor
|
||||||
|
# crashes under cancellation conditions; so
|
||||||
|
# most of them!
|
||||||
|
shield=root_tn.cancel_scope.cancel_called,
|
||||||
|
# ^TODO? write a (debugger) test for this ya?
|
||||||
)
|
)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
|
|
|
@ -324,8 +324,8 @@ async def _errors_relayed_via_ipc(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
# TODO? better then `debug_filter` below?
|
# TODO? better then `debug_filter` below?
|
||||||
# and
|
and
|
||||||
# not isinstance(err, TransportClosed)
|
not isinstance(err, TransportClosed)
|
||||||
):
|
):
|
||||||
# XXX QUESTION XXX: is there any case where we'll
|
# XXX QUESTION XXX: is there any case where we'll
|
||||||
# want to debug IPC disconnects as a default?
|
# want to debug IPC disconnects as a default?
|
||||||
|
@ -693,20 +693,23 @@ async def _invoke(
|
||||||
f'\n'
|
f'\n'
|
||||||
f'{pretty_struct.pformat(return_msg)}\n'
|
f'{pretty_struct.pformat(return_msg)}\n'
|
||||||
)
|
)
|
||||||
try:
|
|
||||||
await chan.send(return_msg)
|
await chan.send(return_msg)
|
||||||
except TransportClosed:
|
# ?TODO, remove the below since .send() already
|
||||||
log.exception(
|
# doesn't raise on tpt-closed?
|
||||||
f"Failed send final result to 'parent'-side of IPC-ctx!\n"
|
# try:
|
||||||
f'\n'
|
# await chan.send(return_msg)
|
||||||
f'{chan}\n'
|
# except TransportClosed:
|
||||||
f'Channel already disconnected ??\n'
|
# log.exception(
|
||||||
f'\n'
|
# f"Failed send final result to 'parent'-side of IPC-ctx!\n"
|
||||||
f'{pretty_struct.pformat(return_msg)}'
|
# f'\n'
|
||||||
)
|
# f'{chan}\n'
|
||||||
# ?TODO? will this ever be true though?
|
# f'Channel already disconnected ??\n'
|
||||||
if chan.connected():
|
# f'\n'
|
||||||
raise
|
# f'{pretty_struct.pformat(return_msg)}'
|
||||||
|
# )
|
||||||
|
# # ?TODO? will this ever be true though?
|
||||||
|
# if chan.connected():
|
||||||
|
# raise
|
||||||
|
|
||||||
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
# NOTE: this happens IFF `ctx._scope.cancel()` is
|
||||||
# called by any of,
|
# called by any of,
|
||||||
|
|
|
@ -297,6 +297,23 @@ async def hard_kill(
|
||||||
# zombies (as a feature) we ask the OS to do send in the
|
# zombies (as a feature) we ask the OS to do send in the
|
||||||
# removal swad as the last resort.
|
# removal swad as the last resort.
|
||||||
if cs.cancelled_caught:
|
if cs.cancelled_caught:
|
||||||
|
|
||||||
|
# TODO? attempt at intermediary-rent-sub
|
||||||
|
# with child in debug lock?
|
||||||
|
# |_https://github.com/goodboy/tractor/issues/320
|
||||||
|
#
|
||||||
|
# if not is_root_process():
|
||||||
|
# log.warning(
|
||||||
|
# 'Attempting to acquire debug-REPL-lock before zombie reap!'
|
||||||
|
# )
|
||||||
|
# with trio.CancelScope(shield=True):
|
||||||
|
# async with debug.acquire_debug_lock(
|
||||||
|
# subactor_uid=current_actor().uid,
|
||||||
|
# ) as _ctx:
|
||||||
|
# log.warning(
|
||||||
|
# 'Acquired debug lock, child ready to be killed ??\n'
|
||||||
|
# )
|
||||||
|
|
||||||
# TODO: toss in the skynet-logo face as ascii art?
|
# TODO: toss in the skynet-logo face as ascii art?
|
||||||
log.critical(
|
log.critical(
|
||||||
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
|
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'
|
||||||
|
|
|
@ -430,6 +430,7 @@ class MsgpackTransport(MsgTransport):
|
||||||
return await self.stream.send_all(size + bytes_data)
|
return await self.stream.send_all(size + bytes_data)
|
||||||
except (
|
except (
|
||||||
trio.BrokenResourceError,
|
trio.BrokenResourceError,
|
||||||
|
trio.ClosedResourceError,
|
||||||
) as _re:
|
) as _re:
|
||||||
trans_err = _re
|
trans_err = _re
|
||||||
tpt_name: str = f'{type(self).__name__!r}'
|
tpt_name: str = f'{type(self).__name__!r}'
|
||||||
|
@ -458,6 +459,22 @@ class MsgpackTransport(MsgTransport):
|
||||||
)
|
)
|
||||||
raise tpt_closed from trans_err
|
raise tpt_closed from trans_err
|
||||||
|
|
||||||
|
# case trio.ClosedResourceError() if (
|
||||||
|
# 'this socket was already closed'
|
||||||
|
# in
|
||||||
|
# trans_err.args[0]
|
||||||
|
# ):
|
||||||
|
# tpt_closed = TransportClosed.from_src_exc(
|
||||||
|
# message=(
|
||||||
|
# f'{tpt_name} already closed by peer\n'
|
||||||
|
# ),
|
||||||
|
# body=f'{self}\n',
|
||||||
|
# src_exc=trans_err,
|
||||||
|
# raise_on_report=True,
|
||||||
|
# loglevel='transport',
|
||||||
|
# )
|
||||||
|
# raise tpt_closed from trans_err
|
||||||
|
|
||||||
# unless the disconnect condition falls under "a
|
# unless the disconnect condition falls under "a
|
||||||
# normal operation breakage" we usualy console warn
|
# normal operation breakage" we usualy console warn
|
||||||
# about it.
|
# about it.
|
||||||
|
|
Loading…
Reference in New Issue