tractor/tests/test_spawning.py

338 lines
9.3 KiB
Python
Raw Normal View History

"""
Spawning basics including audit of,
- subproc bootstrap, such as subactor runtime-data/config inheritance,
- basic (and mostly legacy) `ActorNursery` subactor starting and
cancel APIs.
Simple (and generally legacy) examples from the original
API design.
"""
2025-04-03 20:15:53 +00:00
from functools import partial
from typing import (
Any,
)
import pytest
2018-06-12 19:23:58 +00:00
import trio
import tractor
Add (back) a `tractor._testing` sub-pkg Since importing from our top level `conftest.py` is not scaleable or as "future forward thinking" in terms of: - LoC-wise (it's only one file), - prevents "external" (aka non-test) example scripts from importing content easily, - seemingly(?) can't be used via abs-import if using a `[tool.pytest.ini_options]` in a `pyproject.toml` vs. a `pytest.ini`, see: https://docs.pytest.org/en/8.0.x/reference/customize.html#pyproject-toml) => Go back to having an internal "testing" pkg like `trio` (kinda) does. Deats: - move generic top level helpers into pkg-mod including the new `expect_ctxc()` (which i needed in the advanced faults testing script. - move `@tractor_test` into `._testing.pytest` sub-mod. - adjust all the helper imports to be a `from tractor._testing import <..>` Rework `test_ipc_channel_break_during_stream()` and backing script: - make test(s) pull `debug_mode` from new fixture (which is now controlled manually from `--tpdb` flag) and drop the previous parametrized input. - update logic in ^ test for "which-side-fails" cases to better match recently updated/stricter cancel/failure semantics in terms of `ClosedResouruceError` vs. `EndOfChannel` expectations. - handle `ExceptionGroup`s with expected embedded errors in test. - better pendantics around whether to expect a user simulated KBI. - for `examples/advanced_faults/ipc_failure_during_stream.py` script: - generalize ipc breakage in new `break_ipc()` with support for diff internal `trio` methods and a #TODO for future disti frameworks - only make one sub-actor task break and the other just stream. - use new `._testing.expect_ctxc()` around ctx block. - add a bit of exception handling with `print()`s around ctxc (unused except if 'msg' break method is set) and eoc cases. - don't break parent side ipc in loop any more then once after first break, checked via flag var. - add a `pre_close: bool` flag to control whether `MsgStreama.aclose()` is called *before* any ipc breakage method. Still TODO: - drop `pytest.ini` and add the alt section to `pyproject.py`. -> currently can't get `--rootdir=` opt to work.. not showing in console header. -> ^ also breaks on 'tests' `enable_modules` imports in subactors during discovery tests?
2024-03-12 19:48:20 +00:00
from tractor._testing import tractor_test
2025-04-03 20:15:53 +00:00
data_to_pass_down = {
'doggy': 10,
'kitty': 4,
}
2018-07-11 23:24:37 +00:00
async def spawn(
2025-04-03 20:15:53 +00:00
should_be_root: bool,
data: dict,
reg_addr: tuple[str, int],
2025-04-03 20:15:53 +00:00
debug_mode: bool = False,
):
await trio.sleep(0.1)
2025-04-03 20:15:53 +00:00
actor = tractor.current_actor(err_on_no_runtime=False)
if should_be_root:
assert actor is None # no runtime yet
async with (
tractor.open_root_actor(
registry_addrs=[reg_addr],
2025-04-03 20:15:53 +00:00
),
tractor.open_nursery() as an,
):
# now runtime exists
actor: tractor.Actor = tractor.current_actor()
assert actor.is_registrar == should_be_root
2025-04-03 20:15:53 +00:00
# spawns subproc here
portal: tractor.Portal = await an.run_in_actor(
fn=spawn,
# spawning args
name='sub-actor',
enable_modules=[__name__],
# passed to a subactor-recursive RPC invoke
# of this same `spawn()` fn.
should_be_root=False,
data=data_to_pass_down,
reg_addr=reg_addr,
)
assert len(an._children) == 1
assert (
portal.channel.uid
in
tractor.current_actor().ipc_server._peers
)
2018-08-02 19:27:09 +00:00
2025-04-03 20:15:53 +00:00
# get result from child subactor
result = await portal.result()
assert result == 10
return result
else:
assert actor.is_registrar == should_be_root
2025-04-03 20:15:53 +00:00
return 10
def test_run_in_actor_same_func_in_child(
reg_addr: tuple,
debug_mode: bool,
):
result = trio.run(
2025-04-03 20:15:53 +00:00
partial(
spawn,
should_be_root=True,
data=data_to_pass_down,
reg_addr=reg_addr,
debug_mode=debug_mode,
)
)
assert result == 10
2018-06-12 19:23:58 +00:00
2021-04-27 13:14:08 +00:00
async def movie_theatre_question():
2025-04-03 20:15:53 +00:00
'''
A question asked in a dark theatre, in a tangent
(errr, I mean different) process.
2025-04-03 20:15:53 +00:00
'''
return 'have you ever seen a portal?'
@tractor_test
async def test_movie_theatre_convo(
start_method: str,
):
2025-04-03 20:15:53 +00:00
'''
The main ``tractor`` routine.
'''
async with tractor.open_nursery(debug_mode=True) as an:
2025-04-03 20:15:53 +00:00
portal = await an.start_actor(
'frank',
# enable the actor to run funcs from this current module
enable_modules=[__name__],
)
2020-12-22 15:35:05 +00:00
print(await portal.run(movie_theatre_question))
# call the subactor a 2nd time
2020-12-22 15:35:05 +00:00
print(await portal.run(movie_theatre_question))
# the async with will block here indefinitely waiting
2018-08-02 19:27:09 +00:00
# for our actor "frank" to complete, we cancel 'frank'
# to avoid blocking indefinitely
await portal.cancel_actor()
async def cellar_door(
return_value: str|None,
):
return return_value
@pytest.mark.parametrize(
'return_value', ["Dang that's beautiful", None],
ids=['return_str', 'return_None'],
)
@tractor_test
async def test_most_beautiful_word(
start_method: str,
return_value: Any,
debug_mode: bool,
):
'''
The main ``tractor`` routine.
'''
2021-11-20 18:08:19 +00:00
with trio.fail_after(1):
async with tractor.open_nursery(
debug_mode=debug_mode,
2025-04-03 20:15:53 +00:00
) as an:
portal = await an.run_in_actor(
cellar_door,
return_value=return_value,
name='some_linguist',
)
res: Any = await portal.wait_for_result()
assert res == return_value
# The ``async with`` will unblock here since the 'some_linguist'
# actor has completed its main task ``cellar_door``.
# this should pull the cached final result already captured during
# the nursery block exit.
res: Any = await portal.wait_for_result()
assert res == return_value
print(res)
async def check_loglevel(level):
assert tractor.current_actor().loglevel == level
log = tractor.log.get_logger()
# XXX using a level actually used inside tractor seems to trigger
# some kind of `logging` module bug FYI.
log.critical('yoyoyo')
@pytest.mark.parametrize(
'level', [
'debug',
'cancel',
'critical'
],
ids='loglevel={}'.format,
)
def test_loglevel_propagated_to_subactor(
capfd: pytest.CaptureFixture,
start_method: str,
reg_addr: tuple,
level: str,
):
if start_method == 'mp_forkserver':
pytest.skip(
"a bug with `capfd` seems to make forkserver capture not work?"
)
async def main():
async with tractor.open_nursery(
name='registrar',
start_method=start_method,
registry_addrs=[reg_addr],
) as tn:
await tn.run_in_actor(
check_loglevel,
loglevel=level,
level=level,
)
trio.run(main)
# ensure subactor spits log message on stderr
captured = capfd.readouterr()
assert 'yoyoyo' in captured.err
async def check_parent_main_inheritance(
expect_inherited: bool,
) -> bool:
'''
Assert that the child actor's ``_parent_main_data`` matches the
``inherit_parent_main`` flag it was spawned with.
With the trio spawn backend the parent's ``__main__`` bootstrap
data is captured and forwarded to each child so it can replay
the parent's ``__main__`` as ``__mp_main__``, mirroring the
stdlib ``multiprocessing`` bootstrap:
https://docs.python.org/3/library/multiprocessing.html#the-spawn-and-forkserver-start-methods
When ``inherit_parent_main=False`` the data dict is empty
(``{}``) so no fixup ever runs and the child keeps its own
``__main__`` untouched.
NOTE: under `pytest` the parent ``__main__`` is
``pytest.__main__`` whose ``_fixup_main_from_name()`` is a no-op
(the name ends with ``.__main__``), so we cannot observe
a difference in ``sys.modules['__main__'].__name__`` between the
two modes. Checking ``_parent_main_data`` directly is the most
reliable verification that the flag is threaded through
correctly; a ``RemoteActorError[AssertionError]`` propagates on
mismatch.
'''
import tractor
actor: tractor.Actor = tractor.current_actor()
has_data: bool = bool(actor._parent_main_data)
assert has_data == expect_inherited, (
f'Expected _parent_main_data to be '
f'{"non-empty" if expect_inherited else "empty"}, '
f'got: {actor._parent_main_data!r}'
)
return has_data
def test_run_in_actor_can_skip_parent_main_inheritance(
start_method: str, # <- only support on `trio` backend rn.
):
'''
Verify ``inherit_parent_main=False`` on ``run_in_actor()``
prevents parent ``__main__`` data from reaching the child.
'''
if start_method != 'trio':
pytest.skip(
'parent main-inheritance opt-out only affects the trio backend'
)
async def main():
async with tractor.open_nursery(start_method='trio') as an:
# Default: child receives parent __main__ bootstrap data
replaying = await an.run_in_actor(
check_parent_main_inheritance,
name='replaying-parent-main',
expect_inherited=True,
)
await replaying.result()
# Opt-out: child gets no parent __main__ data
isolated = await an.run_in_actor(
check_parent_main_inheritance,
name='isolated-parent-main',
inherit_parent_main=False,
expect_inherited=False,
)
await isolated.result()
trio.run(main)
def test_start_actor_can_skip_parent_main_inheritance(
start_method: str, # <- only support on `trio` backend rn.
):
'''
Verify ``inherit_parent_main=False`` on ``start_actor()``
prevents parent ``__main__`` data from reaching the child.
'''
if start_method != 'trio':
pytest.skip(
'parent main-inheritance opt-out only affects the trio backend'
)
async def main():
async with tractor.open_nursery(start_method='trio') as an:
# Default: child receives parent __main__ bootstrap data
replaying = await an.start_actor(
'replaying-parent-main',
enable_modules=[__name__],
)
result = await replaying.run(
check_parent_main_inheritance,
expect_inherited=True,
)
assert result is True
await replaying.cancel_actor()
# Opt-out: child gets no parent __main__ data
isolated = await an.start_actor(
'isolated-parent-main',
enable_modules=[__name__],
inherit_parent_main=False,
)
result = await isolated.run(
check_parent_main_inheritance,
expect_inherited=False,
)
assert result is False
await isolated.cancel_actor()
trio.run(main)