From c1727ce05e31870c05e9ffbd6598f41972ed6325 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 08:52:21 -0400 Subject: [PATCH 1/9] Add a test of both stream styles side-by-side Not sure we even have a test for this yet. The main issue discovered by a user project (https://github.com/adder46/wrath) was that a kbi raised inside a block like this (with both recv-only and send-recv streams) would not cancel on the first ctrl-c sent from console and instead SIGiNT had to be repeatedly sent as many times as there are subactors in the first level tree. This test catches that as well as just verifies the basic side-by-side functionality. --- tests/test_advanced_streaming.py | 51 ++++++++++++++++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/tests/test_advanced_streaming.py b/tests/test_advanced_streaming.py index 4429d251..74c06ca3 100644 --- a/tests/test_advanced_streaming.py +++ b/tests/test_advanced_streaming.py @@ -218,3 +218,54 @@ def test_reqresp_ontopof_streaming(): trio.run(main) except trio.TooSlowError: pass + + +async def async_gen_stream(sequence): + for i in sequence: + yield i + await trio.sleep(0.1) + + +@tractor.context +async def echo_ctx_stream( + ctx: tractor.Context, +) -> None: + await ctx.started() + + async with ctx.open_stream() as stream: + async for msg in stream: + await stream.send(msg) + + +def test_sigint_both_stream_types(): + '''Verify that running a bi-directional and recv only stream + side-by-side will cancel correctly from SIGINT. + + ''' + async def main(): + with trio.fail_after(2): + async with tractor.open_nursery() as n: + # name of this actor will be same as target func + portal = await n.start_actor( + '2_way', + enable_modules=[__name__] + ) + + async with portal.open_context(echo_ctx_stream) as (ctx, _): + async with ctx.open_stream() as stream: + async with portal.open_stream_from( + async_gen_stream, + sequence=list(range(1)), + ) as gen_stream: + + msg = await gen_stream.receive() + await stream.send(msg) + resp = await stream.receive() + assert resp == msg + raise KeyboardInterrupt + + try: + trio.run(main) + assert 0, "Didn't receive KBI!?" + except KeyboardInterrupt: + pass From 8d79d83ac22b5a54a191956265ecda907b207f43 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 08:59:24 -0400 Subject: [PATCH 2/9] Ensure kbi will cancel context block Follow up to previous commit: extend our simple context test set to include cancellation via kbi in the parent as well as timeout logic and testing of the parent opening a stream even though the target actor does not. Thanks again to https://github.com/adder46/wrath for discovering this bug. --- tests/test_2way.py | 75 +++++++++++++++++++++++++++++++--------------- 1 file changed, 51 insertions(+), 24 deletions(-) diff --git a/tests/test_2way.py b/tests/test_2way.py index 1ef05d2f..218a5b84 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -51,53 +51,80 @@ async def assert_state(value: bool): @pytest.mark.parametrize( 'error_parent', - [False, True], + [False, ValueError, KeyboardInterrupt], ) @pytest.mark.parametrize( 'callee_blocks_forever', [False, True], + ids=lambda item: f'callee_blocks_forever={item}' +) +@pytest.mark.parametrize( + 'pointlessly_open_stream', + [False, True], + ids=lambda item: f'open_stream={item}' ) def test_simple_context( error_parent, callee_blocks_forever, + pointlessly_open_stream, ): async def main(): - async with tractor.open_nursery() as n: + with trio.fail_after(1.5): + async with tractor.open_nursery() as nursery: - portal = await n.start_actor( - 'simple_context', - enable_modules=[__name__], - ) + portal = await nursery.start_actor( + 'simple_context', + enable_modules=[__name__], + ) - async with portal.open_context( - simple_setup_teardown, - data=10, - block_forever=callee_blocks_forever, - ) as (ctx, sent): + try: + async with portal.open_context( + simple_setup_teardown, + data=10, + block_forever=callee_blocks_forever, + ) as (ctx, sent): - assert sent == 11 + assert sent == 11 - if callee_blocks_forever: - await portal.run(assert_state, value=True) - await ctx.cancel() - else: - assert await ctx.result() == 'yo' + if callee_blocks_forever: + await portal.run(assert_state, value=True) + else: + assert await ctx.result() == 'yo' - # after cancellation - await portal.run(assert_state, value=False) + if not error_parent: + await ctx.cancel() - if error_parent: - raise ValueError + if pointlessly_open_stream: + async with ctx.open_stream(): + if error_parent: + raise error_parent - # shut down daemon - await portal.cancel_actor() + if callee_blocks_forever: + await ctx.cancel() + else: + # in this case the stream will send a + # 'stop' msg to the far end which needs + # to be ignored + pass + else: + if error_parent: + raise error_parent + + finally: + + # after cancellation + if not error_parent: + await portal.run(assert_state, value=False) + + # shut down daemon + await portal.cancel_actor() if error_parent: try: trio.run(main) - except ValueError: + except error_parent: pass else: trio.run(main) From bd31f47d5f89055bfcf69dbf32b0564949ce21ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 10:05:40 -0400 Subject: [PATCH 3/9] Handle kbi in ctx blocks via `BaseException` Fixes prior committed tests by more generally handling `BaseExcepion` in context blocks. Left in the commented concrete list for reference. --- tractor/_portal.py | 37 +++++++++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/tractor/_portal.py b/tractor/_portal.py index 63c59ed3..137761e4 100644 --- a/tractor/_portal.py +++ b/tractor/_portal.py @@ -177,7 +177,6 @@ class Portal: f"Cancelling all streams with {self.channel.uid}") for stream in self._streams.copy(): try: - # with trio.CancelScope(shield=True): await stream.aclose() except trio.ClosedResourceError: # don't error the stream having already been closed @@ -294,7 +293,6 @@ class Portal: async def open_stream_from( self, async_gen_func: Callable, # typing: ignore - shield: bool = False, **kwargs, ) -> AsyncGenerator[ReceiveMsgStream, None]: @@ -318,11 +316,17 @@ class Portal: # receive only stream assert functype == 'asyncgen' - ctx = Context(self.channel, cid, _portal=self) + ctx = Context( + self.channel, + cid, + # do we need this to be closed implicitly? + # _recv_chan=recv_chan, + _portal=self + ) try: # deliver receive only stream async with ReceiveMsgStream( - ctx, recv_chan, shield=shield + ctx, recv_chan, ) as rchan: self._streams.add(rchan) yield rchan @@ -337,13 +341,16 @@ class Portal: # message right now since there shouldn't be a reason to # stop and restart the stream, right? try: - await ctx.cancel() + with trio.CancelScope(shield=True): + await ctx.cancel() except trio.ClosedResourceError: # if the far end terminates before we send a cancel the # underlying transport-channel may already be closed. - log.debug(f'Context {ctx} was already closed?') + log.warning(f'Context {ctx} was already closed?') + # XXX: should this always be done? + # await recv_chan.aclose() self._streams.remove(rchan) @asynccontextmanager @@ -408,8 +415,8 @@ class Portal: # pairs with handling in ``Actor._push_result()`` # recv_chan._ctx = ctx - # await trio.lowlevel.checkpoint() + yield ctx, first except ContextCancelled as err: @@ -427,9 +434,14 @@ class Portal: log.debug(f'Context {ctx} cancelled gracefully') except ( - trio.Cancelled, - trio.MultiError, - Exception, + BaseException, + + # more specifically, we need to handle: + # Exception, + # trio.Cancelled, + # trio.MultiError, + # KeyboardInterrupt, + ) as err: _err = err # the context cancels itself on any cancel @@ -440,6 +452,11 @@ class Portal: raise finally: + # in the case where a runtime nursery (due to internal bug) + # or a remote actor transmits an error we want to be + # sure we get the error the underlying feeder mem chan. + # if it's not raised here it *should* be raised from the + # msg loop nursery right? result = await ctx.result() # though it should be impossible for any tasks From 8b416e6bba69717b76a9bb0daea0fdd34793f102 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 10:20:49 -0400 Subject: [PATCH 4/9] Stream and context api tweaks - drop `shield` input to `MsgStream` - check for cancel called prior to loading the feeder mem chan in `Context.open_stream()` - warn on a timeout when trying to cancel a remote task from `Context.cancel()` - drop noop endofchannel handler block --- tractor/_streaming.py | 36 ++++++++++++++++++------------------ 1 file changed, 18 insertions(+), 18 deletions(-) diff --git a/tractor/_streaming.py b/tractor/_streaming.py index 9d832b28..24775313 100644 --- a/tractor/_streaming.py +++ b/tractor/_streaming.py @@ -51,7 +51,6 @@ class ReceiveMsgStream(trio.abc.ReceiveChannel): self, ctx: 'Context', # typing: ignore # noqa rx_chan: trio.MemoryReceiveChannel, - shield: bool = False, _broadcaster: Optional[BroadcastReceiver] = None, ) -> None: @@ -295,6 +294,9 @@ class MsgStream(ReceiveMsgStream, trio.abc.Channel): '''Send a message over this stream to the far end. ''' + # if self._eoc: + # raise trio.ClosedResourceError('This stream is already ded') + await self._ctx.chan.send({'yield': data, 'cid': self._ctx.cid}) @@ -365,7 +367,7 @@ class Context: ''' side = 'caller' if self._portal else 'callee' - log.warning(f'Cancelling {side} side of context to {self.chan}') + log.warning(f'Cancelling {side} side of context to {self.chan.uid}') self._cancel_called = True @@ -396,6 +398,10 @@ class Context: log.warning( "May have failed to cancel remote task " f"{cid} for {self._portal.channel.uid}") + else: + log.warning( + "Timed out on cancelling remote task " + f"{cid} for {self._portal.channel.uid}") else: # callee side remote task @@ -439,16 +445,6 @@ class Context: # here we create a mem chan that corresponds to the # far end caller / callee. - # NOTE: in one way streaming this only happens on the - # caller side inside `Actor.send_cmd()` so if you try - # to send a stop from the caller to the callee in the - # single-direction-stream case you'll get a lookup error - # currently. - _, recv_chan = actor.get_memchans( - self.chan.uid, - self.cid - ) - # Likewise if the surrounding context has been cancelled we error here # since it likely means the surrounding block was exited or # killed @@ -459,6 +455,16 @@ class Context: f'Context around {actor.uid[0]}:{task} was already cancelled!' ) + # NOTE: in one way streaming this only happens on the + # caller side inside `Actor.send_cmd()` so if you try + # to send a stop from the caller to the callee in the + # single-direction-stream case you'll get a lookup error + # currently. + _, recv_chan = actor.get_memchans( + self.chan.uid, + self.cid + ) + # XXX: If the underlying channel feeder receive mem chan has # been closed then likely client code has already exited # a ``.open_stream()`` block prior or there was some other @@ -482,12 +488,6 @@ class Context: # await trio.lowlevel.checkpoint() yield rchan - except trio.EndOfChannel: - # likely the far end sent us a 'stop' message to - # terminate the stream. - raise - - else: # XXX: Make the stream "one-shot use". On exit, signal # ``trio.EndOfChannel``/``StopAsyncIteration`` to the # far end. From 518a0d5e145582ee76a01e459d17dd6e5dda8aac Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 10:38:04 -0400 Subject: [PATCH 5/9] Add todo for log msg filename.. --- tractor/log.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tractor/log.py b/tractor/log.py index 667c7c65..4bfc798e 100644 --- a/tractor/log.py +++ b/tractor/log.py @@ -52,6 +52,8 @@ BOLD_PALETTE = { } +# TODO: this isn't showing the correct '{filename}' +# as it did before.. class StackLevelAdapter(logging.LoggerAdapter): def transport( From d734dcede46f54c52d7071c687c98f5281d5824c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 11:43:50 -0400 Subject: [PATCH 6/9] Accept a multierror on cancellation (windows?) --- tests/test_2way.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_2way.py b/tests/test_2way.py index 218a5b84..4bad958a 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -126,6 +126,10 @@ def test_simple_context( trio.run(main) except error_parent: pass + except trio.MultiError as me: + # XXX: on windows it seems we may have to expect the group error + from tractor._exceptions import is_multi_cancelled + assert is_multi_cancelled(me) else: trio.run(main) From b1235442fb73066f598048277e8cb01cbb551d5a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 12:10:39 -0400 Subject: [PATCH 7/9] Add longer timeout on windows --- tests/test_2way.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/tests/test_2way.py b/tests/test_2way.py index 4bad958a..c038ae4d 100644 --- a/tests/test_2way.py +++ b/tests/test_2way.py @@ -2,6 +2,8 @@ Bidirectional streaming and context API. """ +import platform + import pytest import trio import tractor @@ -69,9 +71,11 @@ def test_simple_context( pointlessly_open_stream, ): + timeout = 1.5 if not platform.system() == 'Windows' else 3 + async def main(): - with trio.fail_after(1.5): + with trio.fail_after(timeout): async with tractor.open_nursery() as nursery: portal = await nursery.start_actor( From 8fd515c7b9d80fbbaf913f92f923b85abd4761bc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 4 Oct 2021 12:28:55 -0400 Subject: [PATCH 8/9] Add nooz --- newsfragments/239.bug.rst | 6 ++++++ 1 file changed, 6 insertions(+) create mode 100644 newsfragments/239.bug.rst diff --git a/newsfragments/239.bug.rst b/newsfragments/239.bug.rst new file mode 100644 index 00000000..bf9138ab --- /dev/null +++ b/newsfragments/239.bug.rst @@ -0,0 +1,6 @@ +Fix keyboard interrupt handling in ``Portal.open_context()`` blocks. + +Previously this not triggering cancellation of the remote task context +and could result in hangs if a stream was also opened. This fix is to +accept `BaseException` since it is likely any other top level exception +other then kbi (even though not expected) should also get this result. From 4f831abe2524064f823213ae9b94f2b02e162819 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Oct 2021 12:18:26 -0400 Subject: [PATCH 9/9] Hipshot, try to avoid subs teardown race --- tests/test_task_broadcasting.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/test_task_broadcasting.py b/tests/test_task_broadcasting.py index 82651978..f32d209f 100644 --- a/tests/test_task_broadcasting.py +++ b/tests/test_task_broadcasting.py @@ -1,5 +1,6 @@ """ Broadcast channels for fan-out to local tasks. + """ from contextlib import asynccontextmanager from functools import partial @@ -332,6 +333,9 @@ def test_ensure_slow_consumers_lag_out( await trio.sleep(delay) if task.name == 'sub_1': + # trigger checkpoint to clean out other subs + await trio.sleep(0) + # the non-lagger got # a ``trio.EndOfChannel`` # because the ``tx`` below was closed