Compare commits

..

No commits in common. "7b69d4a7dfbf32fa4b731bd25f9280e1745cc268" and "916f88a07037afec37746956bf7ccf5c30b3dcf7" have entirely different histories.

2 changed files with 29 additions and 103 deletions

View File

@ -732,21 +732,15 @@ def test_aio_errors_and_channel_propagates_and_closes(
async def aio_echo_server(
chan: to_asyncio.LinkedTaskChannel,
to_trio: trio.MemorySendChannel,
from_trio: asyncio.Queue,
) -> None:
'''
An IPC-msg "echo server" with msgs received and relayed by
a parent `trio.Task` into a child `asyncio.Task`
and then repeated back to that local parent (`trio.Task`)
and sent again back to the original calling remote actor.
'''
# same semantics as `trio.TaskStatus.started()`
chan.started_nowait('start')
to_trio.send_nowait('start')
while True:
try:
msg = await chan.get()
msg = await from_trio.get()
except to_asyncio.TrioTaskExited:
print(
'breaking aio echo loop due to `trio` exit!'
@ -754,7 +748,7 @@ async def aio_echo_server(
break
# echo the msg back
chan.send_nowait(msg)
to_trio.send_nowait(msg)
# if we get the terminate sentinel
# break the echo loop
@ -771,10 +765,7 @@ async def trio_to_aio_echo_server(
):
async with to_asyncio.open_channel_from(
aio_echo_server,
) as (
first, # value from `chan.started_nowait()` above
chan,
):
) as (first, chan):
assert first == 'start'
await ctx.started(first)
@ -785,8 +776,7 @@ async def trio_to_aio_echo_server(
await chan.send(msg)
out = await chan.receive()
# echo back to parent-actor's remote parent-ctx-task!
# echo back to parent actor-task
await stream.send(out)
if out is None:
@ -1100,12 +1090,14 @@ def test_sigint_closes_lifetime_stack(
# ?TODO asyncio.Task fn-deco?
# -[ ] do sig checkingat import time like @context?
# -[ ] maybe name it @aio_task ??
# -[ ] chan: to_asyncio.InterloopChannel ??
# -[ ] do fn-sig checking at import time like @context?
# |_[ ] maybe name it @a(sync)io_task ??
# @asyncio_task <- not bad ??
async def raise_before_started(
# from_trio: asyncio.Queue,
# to_trio: trio.abc.SendChannel,
chan: to_asyncio.LinkedTaskChannel,
) -> None:
'''
`asyncio.Task` entry point which RTEs before calling

View File

@ -48,7 +48,7 @@ from tractor._state import (
_runtime_vars,
)
from tractor._context import Unresolved
from tractor import devx
from tractor.devx import debug
from tractor.log import (
get_logger,
StackLevelAdapter,
@ -94,14 +94,10 @@ else:
QueueShutDown = False
# TODO, generally speaking we can generalize this abstraction as,
#
# > A "SC linked, inter-event-loop" channel for comms between
# > a `parent: trio.Task` -> `child: asyncio.Task` pair.
#
# It is **very similar** in terms of its operation as a "supervision
# scope primitive" to that of our `._context.Context` with the only
# difference being in how the tasks conduct msg-passing comms.
# TODO, generally speaking we can generalize this abstraction, a "SC linked
# parent->child task pair", as the same "supervision scope primitive"
# **that is** our `._context.Context` with the only difference being
# in how the tasks conduct msg-passing comms.
#
# For `LinkedTaskChannel` we are passing the equivalent of (once you
# include all the recently added `._trio/aio_to_raise`
@ -126,7 +122,6 @@ class LinkedTaskChannel(
task scheduled in the host loop.
'''
# ?TODO, rename as `._aio_q` since it's 2-way?
_to_aio: asyncio.Queue
_from_aio: trio.MemoryReceiveChannel
@ -240,11 +235,9 @@ class LinkedTaskChannel(
#
async def receive(self) -> Any:
'''
Receive a value `trio.Task` <- `asyncio.Task`.
Note the tasks in each loop are "SC linked" as a pair with
Receive a value from the paired `asyncio.Task` with
exception/cancel handling to teardown both sides on any
unexpected error or cancellation.
unexpected error.
'''
try:
@ -268,42 +261,15 @@ class LinkedTaskChannel(
):
raise err
async def get(self) -> Any:
'''
Receive a value `asyncio.Task` <- `trio.Task`.
This is equiv to `await self._from_trio.get()`.
'''
return await self._to_aio.get()
async def send(self, item: Any) -> None:
'''
Send a value through `trio.Task` -> `asyncio.Task`
presuming
it defines a `from_trio` argument or makes calls
to `chan.get()` , if it does not
Send a value through to the asyncio task presuming
it defines a ``from_trio`` argument, if it does not
this method will raise an error.
'''
self._to_aio.put_nowait(item)
# TODO? could we only compile-in this method on an instance
# handed to the `asyncio`-side, i.e. the fn invoked with
# `.open_channel_from()`.
def send_nowait(
self,
item: Any,
) -> None:
'''
Send a value through FROM the `asyncio.Task` to
the `trio.Task` NON-BLOCKING.
This is equiv to `self._to_trio.send_nowait()`.
'''
self._to_trio.send_nowait(item)
# TODO? needed?
# async def wait_aio_complete(self) -> None:
# await self._aio_task_complete.wait()
@ -371,12 +337,9 @@ def _run_asyncio_task(
'''
__tracebackhide__: bool = hide_tb
if not (actor := tractor.current_actor()).is_infected_aio():
if not tractor.current_actor().is_infected_aio():
raise RuntimeError(
f'`infect_asyncio: bool` mode is not enabled ??\n'
f'Ensure you pass `ActorNursery.start_actor(infect_asyncio=True)`\n'
f'\n'
f'{actor}\n'
"`infect_asyncio` mode is not enabled!?"
)
# ITC (inter task comms), these channel/queue names are mostly from
@ -439,23 +402,7 @@ def _run_asyncio_task(
orig = result = id(coro)
try:
# XXX TODO UGH!
# this seems to break a `test_sync_pause_from_aio_task`
# in a REALLY weird way where a `dict` value for
# `_runtime_vars['_root_addrs']` is delivered from the
# parent actor??
#
# XXX => see masked `.set_trace()` block in
# `Actor.from_parent()`..
#
# with devx.maybe_open_crash_handler(
# # XXX, if trio-side exits (intentionally) we
# # shouldn't care bc it should have its own crash
# # handling logic.
# ignore={TrioTaskExited,},
# ) as _bxerr:
result: Any = await coro
chan._aio_result = result
except BaseException as aio_err:
chan._aio_err = aio_err
@ -562,7 +509,7 @@ def _run_asyncio_task(
if (
debug_mode()
and
(greenback := devx.debug.maybe_import_greenback(
(greenback := debug.maybe_import_greenback(
force_reload=True,
raise_not_found=False,
))
@ -962,11 +909,7 @@ async def translate_aio_errors(
except BaseException as _trio_err:
trio_err = chan._trio_err = _trio_err
# await tractor.pause(shield=True) # workx!
# !TODO! we need an inter-loop lock here to avoid aio-tasks
# clobbering trio ones when both crash in debug-mode!
#
entered: bool = await devx.debug._maybe_enter_pm(
entered: bool = await debug._maybe_enter_pm(
trio_err,
api_frame=inspect.currentframe(),
)
@ -1300,17 +1243,10 @@ async def open_channel_from(
suppress_graceful_exits: bool = True,
**target_kwargs,
) -> AsyncIterator[
tuple[LinkedTaskChannel, Any]
]:
) -> AsyncIterator[Any]:
'''
Start an `asyncio.Task` as `target()` and open an inter-loop
(linked) channel for streaming between it and the current
`trio.Task`.
A pair `(chan: LinkedTaskChannel, Any)` is delivered to the caller
where the 2nd element is the value provided by the
`asyncio.Task`'s unblocking call to `chan.started_nowait()`.
Open an inter-loop linked task channel for streaming between a target
spawned ``asyncio`` task and ``trio``.
'''
chan: LinkedTaskChannel = _run_asyncio_task(
@ -1335,7 +1271,6 @@ async def open_channel_from(
# deliver stream handle upward
yield first, chan
# ^TODO! swap these!!
except trio.Cancelled as taskc:
if cs.cancel_called:
if isinstance(chan._trio_to_raise, AsyncioCancelled):
@ -1366,8 +1301,7 @@ async def open_channel_from(
)
else:
# XXX SHOULD NEVER HAPPEN!
log.error("SHOULD NEVER GET HERE !?!?")
await tractor.pause(shield=True)
await tractor.pause()
else:
chan._to_trio.close()