Compare commits
3 Commits
916f88a070
...
7b69d4a7df
| Author | SHA1 | Date |
|---|---|---|
|
|
7b69d4a7df | |
|
|
26dbba9e1a | |
|
|
8f3c95ff54 |
|
|
@ -732,15 +732,21 @@ def test_aio_errors_and_channel_propagates_and_closes(
|
|||
|
||||
|
||||
async def aio_echo_server(
|
||||
to_trio: trio.MemorySendChannel,
|
||||
from_trio: asyncio.Queue,
|
||||
chan: to_asyncio.LinkedTaskChannel,
|
||||
) -> None:
|
||||
'''
|
||||
An IPC-msg "echo server" with msgs received and relayed by
|
||||
a parent `trio.Task` into a child `asyncio.Task`
|
||||
and then repeated back to that local parent (`trio.Task`)
|
||||
and sent again back to the original calling remote actor.
|
||||
|
||||
to_trio.send_nowait('start')
|
||||
'''
|
||||
# same semantics as `trio.TaskStatus.started()`
|
||||
chan.started_nowait('start')
|
||||
|
||||
while True:
|
||||
try:
|
||||
msg = await from_trio.get()
|
||||
msg = await chan.get()
|
||||
except to_asyncio.TrioTaskExited:
|
||||
print(
|
||||
'breaking aio echo loop due to `trio` exit!'
|
||||
|
|
@ -748,7 +754,7 @@ async def aio_echo_server(
|
|||
break
|
||||
|
||||
# echo the msg back
|
||||
to_trio.send_nowait(msg)
|
||||
chan.send_nowait(msg)
|
||||
|
||||
# if we get the terminate sentinel
|
||||
# break the echo loop
|
||||
|
|
@ -765,7 +771,10 @@ async def trio_to_aio_echo_server(
|
|||
):
|
||||
async with to_asyncio.open_channel_from(
|
||||
aio_echo_server,
|
||||
) as (first, chan):
|
||||
) as (
|
||||
first, # value from `chan.started_nowait()` above
|
||||
chan,
|
||||
):
|
||||
assert first == 'start'
|
||||
|
||||
await ctx.started(first)
|
||||
|
|
@ -776,7 +785,8 @@ async def trio_to_aio_echo_server(
|
|||
await chan.send(msg)
|
||||
|
||||
out = await chan.receive()
|
||||
# echo back to parent actor-task
|
||||
|
||||
# echo back to parent-actor's remote parent-ctx-task!
|
||||
await stream.send(out)
|
||||
|
||||
if out is None:
|
||||
|
|
@ -1090,14 +1100,12 @@ def test_sigint_closes_lifetime_stack(
|
|||
|
||||
|
||||
# ?TODO asyncio.Task fn-deco?
|
||||
# -[ ] do sig checkingat import time like @context?
|
||||
# -[ ] maybe name it @aio_task ??
|
||||
# -[ ] chan: to_asyncio.InterloopChannel ??
|
||||
# -[ ] do fn-sig checking at import time like @context?
|
||||
# |_[ ] maybe name it @a(sync)io_task ??
|
||||
# @asyncio_task <- not bad ??
|
||||
async def raise_before_started(
|
||||
# from_trio: asyncio.Queue,
|
||||
# to_trio: trio.abc.SendChannel,
|
||||
chan: to_asyncio.LinkedTaskChannel,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
`asyncio.Task` entry point which RTEs before calling
|
||||
|
|
|
|||
|
|
@ -48,7 +48,7 @@ from tractor._state import (
|
|||
_runtime_vars,
|
||||
)
|
||||
from tractor._context import Unresolved
|
||||
from tractor.devx import debug
|
||||
from tractor import devx
|
||||
from tractor.log import (
|
||||
get_logger,
|
||||
StackLevelAdapter,
|
||||
|
|
@ -94,10 +94,14 @@ else:
|
|||
QueueShutDown = False
|
||||
|
||||
|
||||
# TODO, generally speaking we can generalize this abstraction, a "SC linked
|
||||
# parent->child task pair", as the same "supervision scope primitive"
|
||||
# **that is** our `._context.Context` with the only difference being
|
||||
# in how the tasks conduct msg-passing comms.
|
||||
# TODO, generally speaking we can generalize this abstraction as,
|
||||
#
|
||||
# > A "SC linked, inter-event-loop" channel for comms between
|
||||
# > a `parent: trio.Task` -> `child: asyncio.Task` pair.
|
||||
#
|
||||
# It is **very similar** in terms of its operation as a "supervision
|
||||
# scope primitive" to that of our `._context.Context` with the only
|
||||
# difference being in how the tasks conduct msg-passing comms.
|
||||
#
|
||||
# For `LinkedTaskChannel` we are passing the equivalent of (once you
|
||||
# include all the recently added `._trio/aio_to_raise`
|
||||
|
|
@ -122,6 +126,7 @@ class LinkedTaskChannel(
|
|||
task scheduled in the host loop.
|
||||
|
||||
'''
|
||||
# ?TODO, rename as `._aio_q` since it's 2-way?
|
||||
_to_aio: asyncio.Queue
|
||||
_from_aio: trio.MemoryReceiveChannel
|
||||
|
||||
|
|
@ -235,9 +240,11 @@ class LinkedTaskChannel(
|
|||
#
|
||||
async def receive(self) -> Any:
|
||||
'''
|
||||
Receive a value from the paired `asyncio.Task` with
|
||||
Receive a value `trio.Task` <- `asyncio.Task`.
|
||||
|
||||
Note the tasks in each loop are "SC linked" as a pair with
|
||||
exception/cancel handling to teardown both sides on any
|
||||
unexpected error.
|
||||
unexpected error or cancellation.
|
||||
|
||||
'''
|
||||
try:
|
||||
|
|
@ -261,15 +268,42 @@ class LinkedTaskChannel(
|
|||
):
|
||||
raise err
|
||||
|
||||
async def get(self) -> Any:
|
||||
'''
|
||||
Receive a value `asyncio.Task` <- `trio.Task`.
|
||||
|
||||
This is equiv to `await self._from_trio.get()`.
|
||||
|
||||
'''
|
||||
return await self._to_aio.get()
|
||||
|
||||
async def send(self, item: Any) -> None:
|
||||
'''
|
||||
Send a value through to the asyncio task presuming
|
||||
it defines a ``from_trio`` argument, if it does not
|
||||
Send a value through `trio.Task` -> `asyncio.Task`
|
||||
presuming
|
||||
it defines a `from_trio` argument or makes calls
|
||||
to `chan.get()` , if it does not
|
||||
this method will raise an error.
|
||||
|
||||
'''
|
||||
self._to_aio.put_nowait(item)
|
||||
|
||||
# TODO? could we only compile-in this method on an instance
|
||||
# handed to the `asyncio`-side, i.e. the fn invoked with
|
||||
# `.open_channel_from()`.
|
||||
def send_nowait(
|
||||
self,
|
||||
item: Any,
|
||||
) -> None:
|
||||
'''
|
||||
Send a value through FROM the `asyncio.Task` to
|
||||
the `trio.Task` NON-BLOCKING.
|
||||
|
||||
This is equiv to `self._to_trio.send_nowait()`.
|
||||
|
||||
'''
|
||||
self._to_trio.send_nowait(item)
|
||||
|
||||
# TODO? needed?
|
||||
# async def wait_aio_complete(self) -> None:
|
||||
# await self._aio_task_complete.wait()
|
||||
|
|
@ -337,9 +371,12 @@ def _run_asyncio_task(
|
|||
|
||||
'''
|
||||
__tracebackhide__: bool = hide_tb
|
||||
if not tractor.current_actor().is_infected_aio():
|
||||
if not (actor := tractor.current_actor()).is_infected_aio():
|
||||
raise RuntimeError(
|
||||
"`infect_asyncio` mode is not enabled!?"
|
||||
f'`infect_asyncio: bool` mode is not enabled ??\n'
|
||||
f'Ensure you pass `ActorNursery.start_actor(infect_asyncio=True)`\n'
|
||||
f'\n'
|
||||
f'{actor}\n'
|
||||
)
|
||||
|
||||
# ITC (inter task comms), these channel/queue names are mostly from
|
||||
|
|
@ -402,7 +439,23 @@ def _run_asyncio_task(
|
|||
|
||||
orig = result = id(coro)
|
||||
try:
|
||||
# XXX TODO UGH!
|
||||
# this seems to break a `test_sync_pause_from_aio_task`
|
||||
# in a REALLY weird way where a `dict` value for
|
||||
# `_runtime_vars['_root_addrs']` is delivered from the
|
||||
# parent actor??
|
||||
#
|
||||
# XXX => see masked `.set_trace()` block in
|
||||
# `Actor.from_parent()`..
|
||||
#
|
||||
# with devx.maybe_open_crash_handler(
|
||||
# # XXX, if trio-side exits (intentionally) we
|
||||
# # shouldn't care bc it should have its own crash
|
||||
# # handling logic.
|
||||
# ignore={TrioTaskExited,},
|
||||
# ) as _bxerr:
|
||||
result: Any = await coro
|
||||
|
||||
chan._aio_result = result
|
||||
except BaseException as aio_err:
|
||||
chan._aio_err = aio_err
|
||||
|
|
@ -509,7 +562,7 @@ def _run_asyncio_task(
|
|||
if (
|
||||
debug_mode()
|
||||
and
|
||||
(greenback := debug.maybe_import_greenback(
|
||||
(greenback := devx.debug.maybe_import_greenback(
|
||||
force_reload=True,
|
||||
raise_not_found=False,
|
||||
))
|
||||
|
|
@ -909,7 +962,11 @@ async def translate_aio_errors(
|
|||
except BaseException as _trio_err:
|
||||
trio_err = chan._trio_err = _trio_err
|
||||
# await tractor.pause(shield=True) # workx!
|
||||
entered: bool = await debug._maybe_enter_pm(
|
||||
|
||||
# !TODO! we need an inter-loop lock here to avoid aio-tasks
|
||||
# clobbering trio ones when both crash in debug-mode!
|
||||
#
|
||||
entered: bool = await devx.debug._maybe_enter_pm(
|
||||
trio_err,
|
||||
api_frame=inspect.currentframe(),
|
||||
)
|
||||
|
|
@ -1243,10 +1300,17 @@ async def open_channel_from(
|
|||
suppress_graceful_exits: bool = True,
|
||||
**target_kwargs,
|
||||
|
||||
) -> AsyncIterator[Any]:
|
||||
) -> AsyncIterator[
|
||||
tuple[LinkedTaskChannel, Any]
|
||||
]:
|
||||
'''
|
||||
Open an inter-loop linked task channel for streaming between a target
|
||||
spawned ``asyncio`` task and ``trio``.
|
||||
Start an `asyncio.Task` as `target()` and open an inter-loop
|
||||
(linked) channel for streaming between it and the current
|
||||
`trio.Task`.
|
||||
|
||||
A pair `(chan: LinkedTaskChannel, Any)` is delivered to the caller
|
||||
where the 2nd element is the value provided by the
|
||||
`asyncio.Task`'s unblocking call to `chan.started_nowait()`.
|
||||
|
||||
'''
|
||||
chan: LinkedTaskChannel = _run_asyncio_task(
|
||||
|
|
@ -1271,6 +1335,7 @@ async def open_channel_from(
|
|||
|
||||
# deliver stream handle upward
|
||||
yield first, chan
|
||||
# ^TODO! swap these!!
|
||||
except trio.Cancelled as taskc:
|
||||
if cs.cancel_called:
|
||||
if isinstance(chan._trio_to_raise, AsyncioCancelled):
|
||||
|
|
@ -1301,7 +1366,8 @@ async def open_channel_from(
|
|||
)
|
||||
else:
|
||||
# XXX SHOULD NEVER HAPPEN!
|
||||
await tractor.pause()
|
||||
log.error("SHOULD NEVER GET HERE !?!?")
|
||||
await tractor.pause(shield=True)
|
||||
else:
|
||||
chan._to_trio.close()
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue