Compare commits

..

No commits in common. "3749720d74b0cdccdd52e0b429aad76e03b32c2e" and "5fc64107e566a5b59097cb1e9a6b3171f2125106" have entirely different histories.

6 changed files with 23 additions and 232 deletions

View File

@ -1,145 +0,0 @@
from contextlib import (
contextmanager as cm,
# TODO, any diff in async case(s)??
# asynccontextmanager as acm,
)
from functools import partial
import tractor
import trio
log = tractor.log.get_logger(__name__)
tractor.log.get_console_log('info')
@cm
def teardown_on_exc(
raise_from_handler: bool = False,
):
'''
You could also have a teardown handler which catches any exc and
does some required teardown. In this case the problem is
compounded UNLESS you ensure the handler's scope is OUTSIDE the
`ux.aclose()`.. that is in the caller's enclosing scope.
'''
try:
yield
except BaseException as _berr:
berr = _berr
log.exception(
f'Handling termination teardown in child due to,\n'
f'{berr!r}\n'
)
if raise_from_handler:
# XXX teardown ops XXX
# on termination these steps say need to be run to
# ensure wider system consistency (like the state of
# remote connections/services).
#
# HOWEVER, any bug in this teardown code is also
# masked by the `tx.aclose()`!
# this is also true if `_tn.cancel_scope` is
# `.cancel_called` by the parent in a graceful
# request case..
# simulate a bug in teardown handler.
raise RuntimeError(
'woopsie teardown bug!'
)
raise # no teardown bug.
async def finite_stream_to_rent(
tx: trio.abc.SendChannel,
child_errors_mid_stream: bool,
task_status: trio.TaskStatus[
trio.CancelScope,
] = trio.TASK_STATUS_IGNORED,
):
async with (
# XXX without this unmasker the mid-streaming RTE is never
# reported since it is masked by the `tx.aclose()`
# call which in turn raises `Cancelled`!
#
# NOTE, this is WITHOUT doing any exception handling
# inside the child task!
#
# TODO, uncomment next LoC to see the supprsessed beg[RTE]!
# tractor.trionics.maybe_raise_from_masking_exc(),
tx as tx, # .aclose() is the guilty masker chkpt!
trio.open_nursery() as _tn,
):
# pass our scope back to parent for supervision\
# control.
task_status.started(_tn.cancel_scope)
with teardown_on_exc(
raise_from_handler=not child_errors_mid_stream,
):
for i in range(100):
log.info(
f'Child tx {i!r}\n'
)
if (
child_errors_mid_stream
and
i == 66
):
# oh wait but WOOPS there's a bug
# in that teardown code!?
raise RuntimeError(
'woopsie, a mid-streaming bug!?'
)
await tx.send(i)
async def main(
# TODO! toggle this for the 2 cases!
# 1. child errors mid-stream while parent is also requesting
# (graceful) cancel of that child streamer.
#
# 2. child contains a teardown handler which contains a
# bug and raises.
#
child_errors_mid_stream: bool,
):
tx, rx = trio.open_memory_channel(1)
async with (
trio.open_nursery() as tn,
rx as rx,
):
_child_cs = await tn.start(
partial(
finite_stream_to_rent,
child_errors_mid_stream=child_errors_mid_stream,
tx=tx,
)
)
async for msg in rx:
log.info(
f'Rent rx {msg!r}\n'
)
# simulate some external cancellation
# request **JUST BEFORE** the child errors.
if msg == 65:
log.cancel(
f'Cancelling parent on,\n'
f'msg={msg}\n'
f'\n'
f'Simulates OOB cancel request!\n'
)
tn.cancel_scope.cancel()
if __name__ == '__main__':
for case in [True, False]:
trio.run(main, case)

View File

@ -235,16 +235,10 @@ async def cancel_after(wait, reg_addr):
@pytest.fixture(scope='module')
def time_quad_ex(
reg_addr: tuple,
ci_env: bool,
spawn_backend: str,
):
def time_quad_ex(reg_addr, ci_env, spawn_backend):
if spawn_backend == 'mp':
'''
no idea but the mp *nix runs are flaking out here often...
'''
"""no idea but the mp *nix runs are flaking out here often...
"""
pytest.skip("Test is too flaky on mp in CI")
timeout = 7 if platform.system() in ('Windows', 'Darwin') else 4
@ -255,24 +249,12 @@ def time_quad_ex(
return results, diff
def test_a_quadruple_example(
time_quad_ex: tuple,
ci_env: bool,
spawn_backend: str,
):
'''
This also serves as a kind of "we'd like to be this fast test".
def test_a_quadruple_example(time_quad_ex, ci_env, spawn_backend):
"""This also serves as a kind of "we'd like to be this fast test"."""
'''
results, diff = time_quad_ex
assert results
this_fast = (
6 if platform.system() in (
'Windows',
'Darwin',
)
else 3
)
this_fast = 6 if platform.system() in ('Windows', 'Darwin') else 3
assert diff < this_fast

View File

@ -478,10 +478,7 @@ async def open_root_actor(
# start runtime in a bg sub-task, yield to caller.
async with (
collapse_eg(
# bp=True,
hide_tb=False,
),
collapse_eg(),
trio.open_nursery() as root_tn,
# XXX, finally-footgun below?
@ -526,12 +523,6 @@ async def open_root_actor(
err,
api_frame=inspect.currentframe(),
debug_filter=debug_filter,
# XXX NOTE, required to debug root-actor
# crashes under cancellation conditions; so
# most of them!
shield=root_tn.cancel_scope.cancel_called,
# ^TODO? write a (debugger) test for this ya?
)
if (

View File

@ -324,8 +324,8 @@ async def _errors_relayed_via_ipc(
)
)
# TODO? better then `debug_filter` below?
and
not isinstance(err, TransportClosed)
# and
# not isinstance(err, TransportClosed)
):
# XXX QUESTION XXX: is there any case where we'll
# want to debug IPC disconnects as a default?
@ -693,23 +693,20 @@ async def _invoke(
f'\n'
f'{pretty_struct.pformat(return_msg)}\n'
)
await chan.send(return_msg)
# ?TODO, remove the below since .send() already
# doesn't raise on tpt-closed?
# try:
# await chan.send(return_msg)
# except TransportClosed:
# log.exception(
# f"Failed send final result to 'parent'-side of IPC-ctx!\n"
# f'\n'
# f'{chan}\n'
# f'Channel already disconnected ??\n'
# f'\n'
# f'{pretty_struct.pformat(return_msg)}'
# )
# # ?TODO? will this ever be true though?
# if chan.connected():
# raise
try:
await chan.send(return_msg)
except TransportClosed:
log.exception(
f"Failed send final result to 'parent'-side of IPC-ctx!\n"
f'\n'
f'{chan}\n'
f'Channel already disconnected ??\n'
f'\n'
f'{pretty_struct.pformat(return_msg)}'
)
# ?TODO? will this ever be true though?
if chan.connected():
raise
# NOTE: this happens IFF `ctx._scope.cancel()` is
# called by any of,

View File

@ -297,23 +297,6 @@ async def hard_kill(
# zombies (as a feature) we ask the OS to do send in the
# removal swad as the last resort.
if cs.cancelled_caught:
# TODO? attempt at intermediary-rent-sub
# with child in debug lock?
# |_https://github.com/goodboy/tractor/issues/320
#
# if not is_root_process():
# log.warning(
# 'Attempting to acquire debug-REPL-lock before zombie reap!'
# )
# with trio.CancelScope(shield=True):
# async with debug.acquire_debug_lock(
# subactor_uid=current_actor().uid,
# ) as _ctx:
# log.warning(
# 'Acquired debug lock, child ready to be killed ??\n'
# )
# TODO: toss in the skynet-logo face as ascii art?
log.critical(
# 'Well, the #ZOMBIE_LORD_IS_HERE# to collect\n'

View File

@ -430,7 +430,6 @@ class MsgpackTransport(MsgTransport):
return await self.stream.send_all(size + bytes_data)
except (
trio.BrokenResourceError,
trio.ClosedResourceError,
) as _re:
trans_err = _re
tpt_name: str = f'{type(self).__name__!r}'
@ -459,22 +458,6 @@ class MsgpackTransport(MsgTransport):
)
raise tpt_closed from trans_err
# case trio.ClosedResourceError() if (
# 'this socket was already closed'
# in
# trans_err.args[0]
# ):
# tpt_closed = TransportClosed.from_src_exc(
# message=(
# f'{tpt_name} already closed by peer\n'
# ),
# body=f'{self}\n',
# src_exc=trans_err,
# raise_on_report=True,
# loglevel='transport',
# )
# raise tpt_closed from trans_err
# unless the disconnect condition falls under "a
# normal operation breakage" we usualy console warn
# about it.