Compare commits
3 Commits
916f88a070
...
7b69d4a7df
| Author | SHA1 | Date |
|---|---|---|
|
|
7b69d4a7df | |
|
|
26dbba9e1a | |
|
|
8f3c95ff54 |
|
|
@ -732,15 +732,21 @@ def test_aio_errors_and_channel_propagates_and_closes(
|
||||||
|
|
||||||
|
|
||||||
async def aio_echo_server(
|
async def aio_echo_server(
|
||||||
to_trio: trio.MemorySendChannel,
|
chan: to_asyncio.LinkedTaskChannel,
|
||||||
from_trio: asyncio.Queue,
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
'''
|
||||||
|
An IPC-msg "echo server" with msgs received and relayed by
|
||||||
|
a parent `trio.Task` into a child `asyncio.Task`
|
||||||
|
and then repeated back to that local parent (`trio.Task`)
|
||||||
|
and sent again back to the original calling remote actor.
|
||||||
|
|
||||||
to_trio.send_nowait('start')
|
'''
|
||||||
|
# same semantics as `trio.TaskStatus.started()`
|
||||||
|
chan.started_nowait('start')
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
msg = await from_trio.get()
|
msg = await chan.get()
|
||||||
except to_asyncio.TrioTaskExited:
|
except to_asyncio.TrioTaskExited:
|
||||||
print(
|
print(
|
||||||
'breaking aio echo loop due to `trio` exit!'
|
'breaking aio echo loop due to `trio` exit!'
|
||||||
|
|
@ -748,7 +754,7 @@ async def aio_echo_server(
|
||||||
break
|
break
|
||||||
|
|
||||||
# echo the msg back
|
# echo the msg back
|
||||||
to_trio.send_nowait(msg)
|
chan.send_nowait(msg)
|
||||||
|
|
||||||
# if we get the terminate sentinel
|
# if we get the terminate sentinel
|
||||||
# break the echo loop
|
# break the echo loop
|
||||||
|
|
@ -765,7 +771,10 @@ async def trio_to_aio_echo_server(
|
||||||
):
|
):
|
||||||
async with to_asyncio.open_channel_from(
|
async with to_asyncio.open_channel_from(
|
||||||
aio_echo_server,
|
aio_echo_server,
|
||||||
) as (first, chan):
|
) as (
|
||||||
|
first, # value from `chan.started_nowait()` above
|
||||||
|
chan,
|
||||||
|
):
|
||||||
assert first == 'start'
|
assert first == 'start'
|
||||||
|
|
||||||
await ctx.started(first)
|
await ctx.started(first)
|
||||||
|
|
@ -776,7 +785,8 @@ async def trio_to_aio_echo_server(
|
||||||
await chan.send(msg)
|
await chan.send(msg)
|
||||||
|
|
||||||
out = await chan.receive()
|
out = await chan.receive()
|
||||||
# echo back to parent actor-task
|
|
||||||
|
# echo back to parent-actor's remote parent-ctx-task!
|
||||||
await stream.send(out)
|
await stream.send(out)
|
||||||
|
|
||||||
if out is None:
|
if out is None:
|
||||||
|
|
@ -1090,14 +1100,12 @@ def test_sigint_closes_lifetime_stack(
|
||||||
|
|
||||||
|
|
||||||
# ?TODO asyncio.Task fn-deco?
|
# ?TODO asyncio.Task fn-deco?
|
||||||
# -[ ] do sig checkingat import time like @context?
|
|
||||||
# -[ ] maybe name it @aio_task ??
|
|
||||||
# -[ ] chan: to_asyncio.InterloopChannel ??
|
# -[ ] chan: to_asyncio.InterloopChannel ??
|
||||||
|
# -[ ] do fn-sig checking at import time like @context?
|
||||||
|
# |_[ ] maybe name it @a(sync)io_task ??
|
||||||
|
# @asyncio_task <- not bad ??
|
||||||
async def raise_before_started(
|
async def raise_before_started(
|
||||||
# from_trio: asyncio.Queue,
|
|
||||||
# to_trio: trio.abc.SendChannel,
|
|
||||||
chan: to_asyncio.LinkedTaskChannel,
|
chan: to_asyncio.LinkedTaskChannel,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
'''
|
'''
|
||||||
`asyncio.Task` entry point which RTEs before calling
|
`asyncio.Task` entry point which RTEs before calling
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,7 @@ from tractor._state import (
|
||||||
_runtime_vars,
|
_runtime_vars,
|
||||||
)
|
)
|
||||||
from tractor._context import Unresolved
|
from tractor._context import Unresolved
|
||||||
from tractor.devx import debug
|
from tractor import devx
|
||||||
from tractor.log import (
|
from tractor.log import (
|
||||||
get_logger,
|
get_logger,
|
||||||
StackLevelAdapter,
|
StackLevelAdapter,
|
||||||
|
|
@ -94,10 +94,14 @@ else:
|
||||||
QueueShutDown = False
|
QueueShutDown = False
|
||||||
|
|
||||||
|
|
||||||
# TODO, generally speaking we can generalize this abstraction, a "SC linked
|
# TODO, generally speaking we can generalize this abstraction as,
|
||||||
# parent->child task pair", as the same "supervision scope primitive"
|
#
|
||||||
# **that is** our `._context.Context` with the only difference being
|
# > A "SC linked, inter-event-loop" channel for comms between
|
||||||
# in how the tasks conduct msg-passing comms.
|
# > a `parent: trio.Task` -> `child: asyncio.Task` pair.
|
||||||
|
#
|
||||||
|
# It is **very similar** in terms of its operation as a "supervision
|
||||||
|
# scope primitive" to that of our `._context.Context` with the only
|
||||||
|
# difference being in how the tasks conduct msg-passing comms.
|
||||||
#
|
#
|
||||||
# For `LinkedTaskChannel` we are passing the equivalent of (once you
|
# For `LinkedTaskChannel` we are passing the equivalent of (once you
|
||||||
# include all the recently added `._trio/aio_to_raise`
|
# include all the recently added `._trio/aio_to_raise`
|
||||||
|
|
@ -122,6 +126,7 @@ class LinkedTaskChannel(
|
||||||
task scheduled in the host loop.
|
task scheduled in the host loop.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
# ?TODO, rename as `._aio_q` since it's 2-way?
|
||||||
_to_aio: asyncio.Queue
|
_to_aio: asyncio.Queue
|
||||||
_from_aio: trio.MemoryReceiveChannel
|
_from_aio: trio.MemoryReceiveChannel
|
||||||
|
|
||||||
|
|
@ -235,9 +240,11 @@ class LinkedTaskChannel(
|
||||||
#
|
#
|
||||||
async def receive(self) -> Any:
|
async def receive(self) -> Any:
|
||||||
'''
|
'''
|
||||||
Receive a value from the paired `asyncio.Task` with
|
Receive a value `trio.Task` <- `asyncio.Task`.
|
||||||
|
|
||||||
|
Note the tasks in each loop are "SC linked" as a pair with
|
||||||
exception/cancel handling to teardown both sides on any
|
exception/cancel handling to teardown both sides on any
|
||||||
unexpected error.
|
unexpected error or cancellation.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
try:
|
try:
|
||||||
|
|
@ -261,15 +268,42 @@ class LinkedTaskChannel(
|
||||||
):
|
):
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
|
async def get(self) -> Any:
|
||||||
|
'''
|
||||||
|
Receive a value `asyncio.Task` <- `trio.Task`.
|
||||||
|
|
||||||
|
This is equiv to `await self._from_trio.get()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return await self._to_aio.get()
|
||||||
|
|
||||||
async def send(self, item: Any) -> None:
|
async def send(self, item: Any) -> None:
|
||||||
'''
|
'''
|
||||||
Send a value through to the asyncio task presuming
|
Send a value through `trio.Task` -> `asyncio.Task`
|
||||||
it defines a ``from_trio`` argument, if it does not
|
presuming
|
||||||
|
it defines a `from_trio` argument or makes calls
|
||||||
|
to `chan.get()` , if it does not
|
||||||
this method will raise an error.
|
this method will raise an error.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
self._to_aio.put_nowait(item)
|
self._to_aio.put_nowait(item)
|
||||||
|
|
||||||
|
# TODO? could we only compile-in this method on an instance
|
||||||
|
# handed to the `asyncio`-side, i.e. the fn invoked with
|
||||||
|
# `.open_channel_from()`.
|
||||||
|
def send_nowait(
|
||||||
|
self,
|
||||||
|
item: Any,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Send a value through FROM the `asyncio.Task` to
|
||||||
|
the `trio.Task` NON-BLOCKING.
|
||||||
|
|
||||||
|
This is equiv to `self._to_trio.send_nowait()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
self._to_trio.send_nowait(item)
|
||||||
|
|
||||||
# TODO? needed?
|
# TODO? needed?
|
||||||
# async def wait_aio_complete(self) -> None:
|
# async def wait_aio_complete(self) -> None:
|
||||||
# await self._aio_task_complete.wait()
|
# await self._aio_task_complete.wait()
|
||||||
|
|
@ -337,9 +371,12 @@ def _run_asyncio_task(
|
||||||
|
|
||||||
'''
|
'''
|
||||||
__tracebackhide__: bool = hide_tb
|
__tracebackhide__: bool = hide_tb
|
||||||
if not tractor.current_actor().is_infected_aio():
|
if not (actor := tractor.current_actor()).is_infected_aio():
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
"`infect_asyncio` mode is not enabled!?"
|
f'`infect_asyncio: bool` mode is not enabled ??\n'
|
||||||
|
f'Ensure you pass `ActorNursery.start_actor(infect_asyncio=True)`\n'
|
||||||
|
f'\n'
|
||||||
|
f'{actor}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
# ITC (inter task comms), these channel/queue names are mostly from
|
# ITC (inter task comms), these channel/queue names are mostly from
|
||||||
|
|
@ -402,7 +439,23 @@ def _run_asyncio_task(
|
||||||
|
|
||||||
orig = result = id(coro)
|
orig = result = id(coro)
|
||||||
try:
|
try:
|
||||||
|
# XXX TODO UGH!
|
||||||
|
# this seems to break a `test_sync_pause_from_aio_task`
|
||||||
|
# in a REALLY weird way where a `dict` value for
|
||||||
|
# `_runtime_vars['_root_addrs']` is delivered from the
|
||||||
|
# parent actor??
|
||||||
|
#
|
||||||
|
# XXX => see masked `.set_trace()` block in
|
||||||
|
# `Actor.from_parent()`..
|
||||||
|
#
|
||||||
|
# with devx.maybe_open_crash_handler(
|
||||||
|
# # XXX, if trio-side exits (intentionally) we
|
||||||
|
# # shouldn't care bc it should have its own crash
|
||||||
|
# # handling logic.
|
||||||
|
# ignore={TrioTaskExited,},
|
||||||
|
# ) as _bxerr:
|
||||||
result: Any = await coro
|
result: Any = await coro
|
||||||
|
|
||||||
chan._aio_result = result
|
chan._aio_result = result
|
||||||
except BaseException as aio_err:
|
except BaseException as aio_err:
|
||||||
chan._aio_err = aio_err
|
chan._aio_err = aio_err
|
||||||
|
|
@ -509,7 +562,7 @@ def _run_asyncio_task(
|
||||||
if (
|
if (
|
||||||
debug_mode()
|
debug_mode()
|
||||||
and
|
and
|
||||||
(greenback := debug.maybe_import_greenback(
|
(greenback := devx.debug.maybe_import_greenback(
|
||||||
force_reload=True,
|
force_reload=True,
|
||||||
raise_not_found=False,
|
raise_not_found=False,
|
||||||
))
|
))
|
||||||
|
|
@ -909,7 +962,11 @@ async def translate_aio_errors(
|
||||||
except BaseException as _trio_err:
|
except BaseException as _trio_err:
|
||||||
trio_err = chan._trio_err = _trio_err
|
trio_err = chan._trio_err = _trio_err
|
||||||
# await tractor.pause(shield=True) # workx!
|
# await tractor.pause(shield=True) # workx!
|
||||||
entered: bool = await debug._maybe_enter_pm(
|
|
||||||
|
# !TODO! we need an inter-loop lock here to avoid aio-tasks
|
||||||
|
# clobbering trio ones when both crash in debug-mode!
|
||||||
|
#
|
||||||
|
entered: bool = await devx.debug._maybe_enter_pm(
|
||||||
trio_err,
|
trio_err,
|
||||||
api_frame=inspect.currentframe(),
|
api_frame=inspect.currentframe(),
|
||||||
)
|
)
|
||||||
|
|
@ -1243,10 +1300,17 @@ async def open_channel_from(
|
||||||
suppress_graceful_exits: bool = True,
|
suppress_graceful_exits: bool = True,
|
||||||
**target_kwargs,
|
**target_kwargs,
|
||||||
|
|
||||||
) -> AsyncIterator[Any]:
|
) -> AsyncIterator[
|
||||||
|
tuple[LinkedTaskChannel, Any]
|
||||||
|
]:
|
||||||
'''
|
'''
|
||||||
Open an inter-loop linked task channel for streaming between a target
|
Start an `asyncio.Task` as `target()` and open an inter-loop
|
||||||
spawned ``asyncio`` task and ``trio``.
|
(linked) channel for streaming between it and the current
|
||||||
|
`trio.Task`.
|
||||||
|
|
||||||
|
A pair `(chan: LinkedTaskChannel, Any)` is delivered to the caller
|
||||||
|
where the 2nd element is the value provided by the
|
||||||
|
`asyncio.Task`'s unblocking call to `chan.started_nowait()`.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
chan: LinkedTaskChannel = _run_asyncio_task(
|
chan: LinkedTaskChannel = _run_asyncio_task(
|
||||||
|
|
@ -1271,6 +1335,7 @@ async def open_channel_from(
|
||||||
|
|
||||||
# deliver stream handle upward
|
# deliver stream handle upward
|
||||||
yield first, chan
|
yield first, chan
|
||||||
|
# ^TODO! swap these!!
|
||||||
except trio.Cancelled as taskc:
|
except trio.Cancelled as taskc:
|
||||||
if cs.cancel_called:
|
if cs.cancel_called:
|
||||||
if isinstance(chan._trio_to_raise, AsyncioCancelled):
|
if isinstance(chan._trio_to_raise, AsyncioCancelled):
|
||||||
|
|
@ -1301,7 +1366,8 @@ async def open_channel_from(
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
# XXX SHOULD NEVER HAPPEN!
|
# XXX SHOULD NEVER HAPPEN!
|
||||||
await tractor.pause()
|
log.error("SHOULD NEVER GET HERE !?!?")
|
||||||
|
await tractor.pause(shield=True)
|
||||||
else:
|
else:
|
||||||
chan._to_trio.close()
|
chan._to_trio.close()
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue