597 lines
20 KiB
Python
597 lines
20 KiB
Python
'''
|
|
Integration exercises for the `tractor.spawn._subint_forkserver`
|
|
submodule at three tiers:
|
|
|
|
1. the low-level primitives
|
|
(`fork_from_worker_thread()` +
|
|
`run_subint_in_worker_thread()`) driven from inside a real
|
|
`trio.run()` in the parent process,
|
|
|
|
2. the full `subint_forkserver_proc` spawn backend wired
|
|
through tractor's normal actor-nursery + portal-RPC
|
|
machinery — i.e. `open_root_actor` + `open_nursery` +
|
|
`run_in_actor` against a subactor spawned via fork from a
|
|
main-interp worker thread.
|
|
|
|
Background
|
|
----------
|
|
`ai/conc-anal/subint_fork_blocked_by_cpython_post_fork_issue.md`
|
|
establishes that `os.fork()` from a non-main sub-interpreter
|
|
aborts the child at the CPython level. The sibling
|
|
`subint_fork_from_main_thread_smoketest.py` proves the escape
|
|
hatch: fork from a main-interp *worker thread* (one that has
|
|
never entered a subint) works, and the forked child can then
|
|
host its own `trio.run()` inside a fresh subint.
|
|
|
|
Those smoke-test scenarios are standalone — no trio runtime
|
|
in the *parent*. Tiers (1)+(2) here cover the primitives
|
|
driven from inside `trio.run()` in the parent, and tier (3)
|
|
(the `*_spawn_basic` test) drives the registered
|
|
`subint_forkserver` spawn backend end-to-end against the
|
|
tractor runtime.
|
|
|
|
Gating
|
|
------
|
|
- py3.14+ (via `concurrent.interpreters` presence)
|
|
- no `--spawn-backend` restriction — the backend-level test
|
|
flips `tractor.spawn._spawn._spawn_method` programmatically
|
|
(via `try_set_start_method('subint_forkserver')`) and
|
|
restores it on teardown, so these tests are independent of
|
|
the session-level CLI backend choice.
|
|
|
|
'''
|
|
from __future__ import annotations
|
|
from functools import partial
|
|
import os
|
|
from pathlib import Path
|
|
import platform
|
|
import select
|
|
import signal
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
import pytest
|
|
import trio
|
|
|
|
import tractor
|
|
from tractor.devx import dump_on_hang
|
|
|
|
|
|
# Gate: subint forkserver primitives require py3.14+. Check
|
|
# the public stdlib wrapper's presence (added in 3.14) rather
|
|
# than `_interpreters` directly — see
|
|
# `tractor.spawn._subint` for why.
|
|
pytest.importorskip('concurrent.interpreters')
|
|
|
|
from tractor.spawn._subint_forkserver import ( # noqa: E402
|
|
fork_from_worker_thread,
|
|
run_subint_in_worker_thread,
|
|
wait_child,
|
|
)
|
|
from tractor.spawn import _spawn as _spawn_mod # noqa: E402
|
|
from tractor.spawn._spawn import try_set_start_method # noqa: E402
|
|
|
|
|
|
# ----------------------------------------------------------------
|
|
# child-side callables (passed via `child_target=` across fork)
|
|
# ----------------------------------------------------------------
|
|
|
|
|
|
_CHILD_TRIO_BOOTSTRAP: str = (
|
|
'import trio\n'
|
|
'async def _main():\n'
|
|
' await trio.sleep(0.05)\n'
|
|
' return 42\n'
|
|
'result = trio.run(_main)\n'
|
|
'assert result == 42, f"trio.run returned {result}"\n'
|
|
)
|
|
|
|
|
|
def _child_trio_in_subint() -> int:
|
|
'''
|
|
`child_target` for the trio-in-child scenario: drive a
|
|
trivial `trio.run()` inside a fresh legacy-config subint
|
|
on a worker thread.
|
|
|
|
Returns an exit code suitable for `os._exit()`:
|
|
- 0: subint-hosted `trio.run()` succeeded
|
|
- 3: driver thread hang (timeout inside `run_subint_in_worker_thread`)
|
|
- 4: subint bootstrap raised some other exception
|
|
|
|
'''
|
|
try:
|
|
run_subint_in_worker_thread(
|
|
_CHILD_TRIO_BOOTSTRAP,
|
|
thread_name='child-subint-trio-thread',
|
|
)
|
|
except RuntimeError:
|
|
# timeout / thread-never-returned
|
|
return 3
|
|
except BaseException:
|
|
return 4
|
|
return 0
|
|
|
|
|
|
# ----------------------------------------------------------------
|
|
# parent-side harnesses (run inside `trio.run()`)
|
|
# ----------------------------------------------------------------
|
|
|
|
|
|
async def run_fork_in_non_trio_thread(
|
|
deadline: float,
|
|
*,
|
|
child_target=None,
|
|
) -> int:
|
|
'''
|
|
From inside a parent `trio.run()`, off-load the
|
|
forkserver primitive to a main-interp worker thread via
|
|
`trio.to_thread.run_sync()` and return the forked child's
|
|
pid.
|
|
|
|
Then `wait_child()` on that pid (also off-loaded so we
|
|
don't block trio's event loop on `waitpid()`) and assert
|
|
the child exited cleanly.
|
|
|
|
'''
|
|
with trio.fail_after(deadline):
|
|
# NOTE: `fork_from_worker_thread` internally spawns its
|
|
# own dedicated `threading.Thread` (not from trio's
|
|
# cache) and joins it before returning — so we can
|
|
# safely off-load via `to_thread.run_sync` without
|
|
# worrying about the trio-thread-cache recycling the
|
|
# runner. Pass `abandon_on_cancel=False` for the
|
|
# same "bounded + clean" rationale we use in
|
|
# `_subint.subint_proc`.
|
|
pid: int = await trio.to_thread.run_sync(
|
|
partial(
|
|
fork_from_worker_thread,
|
|
child_target,
|
|
thread_name='test-subint-forkserver',
|
|
),
|
|
abandon_on_cancel=False,
|
|
)
|
|
assert pid > 0
|
|
|
|
ok, status_str = await trio.to_thread.run_sync(
|
|
partial(
|
|
wait_child,
|
|
pid,
|
|
expect_exit_ok=True,
|
|
),
|
|
abandon_on_cancel=False,
|
|
)
|
|
assert ok, (
|
|
f'forked child did not exit cleanly: '
|
|
f'{status_str}'
|
|
)
|
|
return pid
|
|
|
|
|
|
# ----------------------------------------------------------------
|
|
# tests
|
|
# ----------------------------------------------------------------
|
|
|
|
|
|
# Bounded wall-clock via `pytest-timeout` (`method='thread'`)
|
|
# for the usual GIL-hostage safety reason documented in the
|
|
# sibling `test_subint_cancellation.py` / the class-A
|
|
# `subint_sigint_starvation_issue.md`. Each test also has an
|
|
# inner `trio.fail_after()` so assertion failures fire fast
|
|
# under normal conditions.
|
|
@pytest.mark.timeout(30, method='thread')
|
|
def test_fork_from_worker_thread_via_trio(
|
|
) -> None:
|
|
'''
|
|
Baseline: inside `trio.run()`, call
|
|
`fork_from_worker_thread()` via `trio.to_thread.run_sync()`,
|
|
get a child pid back, reap the child cleanly.
|
|
|
|
No trio-in-child. If this regresses we know the parent-
|
|
side trio↔worker-thread plumbing is broken independent
|
|
of any child-side subint machinery.
|
|
|
|
'''
|
|
deadline: float = 10.0
|
|
with dump_on_hang(
|
|
seconds=deadline,
|
|
path='/tmp/subint_forkserver_baseline.dump',
|
|
):
|
|
pid: int = trio.run(
|
|
partial(run_fork_in_non_trio_thread, deadline),
|
|
)
|
|
# parent-side sanity — we got a real pid back.
|
|
assert isinstance(pid, int) and pid > 0
|
|
# by now the child has been waited on; it shouldn't be
|
|
# reap-able again.
|
|
with pytest.raises((ChildProcessError, OSError)):
|
|
os.waitpid(pid, os.WNOHANG)
|
|
|
|
|
|
@pytest.mark.timeout(30, method='thread')
|
|
def test_fork_and_run_trio_in_child() -> None:
|
|
'''
|
|
End-to-end: inside the parent's `trio.run()`, off-load
|
|
`fork_from_worker_thread()` to a worker thread, have the
|
|
forked child then create a fresh subint and run
|
|
`trio.run()` inside it on yet another worker thread.
|
|
|
|
This is the full "forkserver + trio-in-subint-in-child"
|
|
pattern the proposed `subint_forkserver` spawn backend
|
|
would rest on.
|
|
|
|
'''
|
|
deadline: float = 15.0
|
|
with dump_on_hang(
|
|
seconds=deadline,
|
|
path='/tmp/subint_forkserver_trio_in_child.dump',
|
|
):
|
|
pid: int = trio.run(
|
|
partial(
|
|
run_fork_in_non_trio_thread,
|
|
deadline,
|
|
child_target=_child_trio_in_subint,
|
|
),
|
|
)
|
|
assert isinstance(pid, int) and pid > 0
|
|
|
|
|
|
# ----------------------------------------------------------------
|
|
# tier-3 backend test: drive the registered `subint_forkserver`
|
|
# spawn backend end-to-end through tractor's actor-nursery +
|
|
# portal-RPC machinery.
|
|
# ----------------------------------------------------------------
|
|
|
|
|
|
async def _trivial_rpc() -> str:
|
|
'''
|
|
Minimal subactor-side RPC body: just return a sentinel
|
|
string the parent can assert on.
|
|
|
|
'''
|
|
return 'hello from subint-forkserver child'
|
|
|
|
|
|
async def _happy_path_forkserver(
|
|
reg_addr: tuple[str, int | str],
|
|
deadline: float,
|
|
) -> None:
|
|
'''
|
|
Parent-side harness: stand up a root actor, open an actor
|
|
nursery, spawn one subactor via the currently-selected
|
|
spawn backend (which this test will have flipped to
|
|
`subint_forkserver`), run a trivial RPC through its
|
|
portal, assert the round-trip result.
|
|
|
|
'''
|
|
with trio.fail_after(deadline):
|
|
async with (
|
|
tractor.open_root_actor(
|
|
registry_addrs=[reg_addr],
|
|
),
|
|
tractor.open_nursery() as an,
|
|
):
|
|
portal: tractor.Portal = await an.run_in_actor(
|
|
_trivial_rpc,
|
|
name='subint-forkserver-child',
|
|
)
|
|
result: str = await portal.wait_for_result()
|
|
assert result == 'hello from subint-forkserver child'
|
|
|
|
|
|
@pytest.fixture
|
|
def forkserver_spawn_method():
|
|
'''
|
|
Flip `tractor.spawn._spawn._spawn_method` to
|
|
`'subint_forkserver'` for the duration of a test, then
|
|
restore whatever was in place before (usually the
|
|
session-level CLI choice, typically `'trio'`).
|
|
|
|
Without this, other tests in the same session would
|
|
observe the global flip and start spawning via fork —
|
|
which is almost certainly NOT what their assertions were
|
|
written against.
|
|
|
|
'''
|
|
prev_method: str = _spawn_mod._spawn_method
|
|
prev_ctx = _spawn_mod._ctx
|
|
try_set_start_method('subint_forkserver')
|
|
try:
|
|
yield
|
|
finally:
|
|
_spawn_mod._spawn_method = prev_method
|
|
_spawn_mod._ctx = prev_ctx
|
|
|
|
|
|
@pytest.mark.timeout(60, method='thread')
|
|
def test_subint_forkserver_spawn_basic(
|
|
reg_addr: tuple[str, int | str],
|
|
forkserver_spawn_method,
|
|
) -> None:
|
|
'''
|
|
Happy-path: spawn ONE subactor via the
|
|
`subint_forkserver` backend (parent-side fork from a
|
|
main-interp worker thread), do a trivial portal-RPC
|
|
round-trip, tear the nursery down cleanly.
|
|
|
|
If this passes, the "forkserver + tractor runtime" arch
|
|
is proven end-to-end: the registered
|
|
`subint_forkserver_proc` spawn target successfully
|
|
forks a child, the child runs `_actor_child_main()` +
|
|
completes IPC handshake + serves an RPC, and the parent
|
|
reaps via `_ForkedProc.wait()` without regressing any of
|
|
the normal nursery teardown invariants.
|
|
|
|
'''
|
|
deadline: float = 20.0
|
|
with dump_on_hang(
|
|
seconds=deadline,
|
|
path='/tmp/subint_forkserver_spawn_basic.dump',
|
|
):
|
|
trio.run(
|
|
partial(
|
|
_happy_path_forkserver,
|
|
reg_addr,
|
|
deadline,
|
|
),
|
|
)
|
|
|
|
|
|
# ----------------------------------------------------------------
|
|
# tier-4 DRAFT: orphaned-subactor SIGINT survivability
|
|
#
|
|
# Motivating question: with `subint_forkserver`, the child's
|
|
# `trio.run()` lives on the fork-inherited worker thread which
|
|
# is NOT `threading.main_thread()` — so trio cannot install its
|
|
# `signal.set_wakeup_fd`-based SIGINT handler. If the parent
|
|
# goes away via `SIGKILL` (no IPC `Portal.cancel_actor()`
|
|
# possible), does SIGINT on the orphan child cleanly tear it
|
|
# down via CPython's default `KeyboardInterrupt` delivery, or
|
|
# does it hang?
|
|
#
|
|
# Working hypothesis (unverified pre-this-test): post-fork the
|
|
# child is effectively single-threaded (only the fork-worker
|
|
# tstate survived), so SIGINT → default handler → raises
|
|
# `KeyboardInterrupt` on the only thread — which happens to be
|
|
# the one driving trio's event loop — so trio observes it at
|
|
# the next checkpoint. If so, we're "fine" on this backend
|
|
# despite the missing trio SIGINT handler.
|
|
#
|
|
# Cross-backend generalization (decide after this passes):
|
|
# - applicable to any backend whose subactors are separate OS
|
|
# processes: `trio`, `mp_spawn`, `mp_forkserver`,
|
|
# `subint_forkserver`.
|
|
# - NOT applicable to plain `subint` (subactors are in-process
|
|
# subinterpreters, no orphan child process to SIGINT).
|
|
# - move path: lift the harness script into
|
|
# `tests/_orphan_harness.py`, parametrize on the session's
|
|
# `_spawn_method`, add `skipif _spawn_method == 'subint'`.
|
|
# ----------------------------------------------------------------
|
|
|
|
|
|
_ORPHAN_HARNESS_SCRIPT: str = '''
|
|
import os
|
|
import sys
|
|
import trio
|
|
import tractor
|
|
from tractor.spawn._spawn import try_set_start_method
|
|
|
|
async def _sleep_forever() -> None:
|
|
print(f"CHILD_PID={os.getpid()}", flush=True)
|
|
await trio.sleep_forever()
|
|
|
|
async def _main(reg_addr):
|
|
async with (
|
|
tractor.open_root_actor(registry_addrs=[reg_addr]),
|
|
tractor.open_nursery() as an,
|
|
):
|
|
portal = await an.run_in_actor(
|
|
_sleep_forever,
|
|
name="orphan-test-child",
|
|
)
|
|
print(f"PARENT_READY={os.getpid()}", flush=True)
|
|
await trio.sleep_forever()
|
|
|
|
if __name__ == "__main__":
|
|
backend = sys.argv[1]
|
|
host = sys.argv[2]
|
|
port = int(sys.argv[3])
|
|
try_set_start_method(backend)
|
|
trio.run(_main, (host, port))
|
|
'''
|
|
|
|
|
|
def _read_marker(
|
|
proc: subprocess.Popen,
|
|
marker: str,
|
|
timeout: float,
|
|
_buf: dict,
|
|
) -> str:
|
|
'''
|
|
Block until `<marker>=<value>\\n` appears on `proc.stdout`
|
|
and return `<value>`. Uses a per-proc byte buffer (`_buf`)
|
|
to carry partial lines across calls.
|
|
|
|
'''
|
|
deadline: float = time.monotonic() + timeout
|
|
remainder: bytes = _buf.get('remainder', b'')
|
|
prefix: bytes = f'{marker}='.encode()
|
|
while time.monotonic() < deadline:
|
|
# drain any complete lines already buffered
|
|
while b'\n' in remainder:
|
|
line, remainder = remainder.split(b'\n', 1)
|
|
if line.startswith(prefix):
|
|
_buf['remainder'] = remainder
|
|
return line[len(prefix):].decode().strip()
|
|
ready, _, _ = select.select([proc.stdout], [], [], 0.2)
|
|
if not ready:
|
|
continue
|
|
chunk: bytes = os.read(proc.stdout.fileno(), 4096)
|
|
if not chunk:
|
|
break
|
|
remainder += chunk
|
|
_buf['remainder'] = remainder
|
|
raise TimeoutError(
|
|
f'Never observed marker {marker!r} on harness stdout '
|
|
f'within {timeout}s'
|
|
)
|
|
|
|
|
|
def _process_alive(pid: int) -> bool:
|
|
'''Liveness probe for a pid we do NOT parent (post-orphan).'''
|
|
try:
|
|
os.kill(pid, 0)
|
|
return True
|
|
except ProcessLookupError:
|
|
return False
|
|
|
|
|
|
# NOTE: was previously `@pytest.mark.xfail(strict=True, ...)`
|
|
# for the orphan-SIGINT hang documented in
|
|
# `ai/conc-anal/subint_forkserver_orphan_sigint_hang_issue.md`
|
|
# — now passes after the fork-child FD-hygiene fix in
|
|
# `tractor.spawn._subint_forkserver._close_inherited_fds()`:
|
|
# closing all inherited FDs (including the parent's IPC
|
|
# listener + trio-epoll + wakeup-pipe FDs) lets the child's
|
|
# trio event loop respond cleanly to external SIGINT.
|
|
# Leaving the test in place as a regression guard.
|
|
@pytest.mark.timeout(
|
|
30,
|
|
method='thread',
|
|
)
|
|
def test_orphaned_subactor_sigint_cleanup_DRAFT(
|
|
reg_addr: tuple[str, int | str],
|
|
tmp_path: Path,
|
|
) -> None:
|
|
'''
|
|
DRAFT — orphaned-subactor SIGINT survivability under the
|
|
`subint_forkserver` backend.
|
|
|
|
Sequence:
|
|
1. Spawn a harness subprocess that brings up a root
|
|
actor + one `sleep_forever` subactor via
|
|
`subint_forkserver`.
|
|
2. Read the harness's stdout for `PARENT_READY=<pid>`
|
|
and `CHILD_PID=<pid>` markers (confirms the
|
|
parent→child IPC handshake completed).
|
|
3. `SIGKILL` the parent (no IPC cancel possible — the
|
|
whole point of this test).
|
|
4. `SIGINT` the orphan child.
|
|
5. Poll `os.kill(child_pid, 0)` for up to 10s — assert
|
|
the child exits.
|
|
|
|
Empirical result (2026-04, py3.14): currently **FAILS** —
|
|
SIGINT on the orphan child doesn't unwind the trio loop,
|
|
despite trio's `KIManager` handler being correctly
|
|
installed in the subactor (the post-fork thread IS
|
|
`threading.main_thread()` on py3.14). `faulthandler` dump
|
|
shows the subactor wedged in `trio/_core/_io_epoll.py::
|
|
get_events` — the signal's supposed wakeup of the event
|
|
loop isn't firing. Full analysis + diagnostic evidence
|
|
in `ai/conc-anal/
|
|
subint_forkserver_orphan_sigint_hang_issue.md`.
|
|
|
|
The runtime's *intentional* "KBI-as-OS-cancel" path at
|
|
`tractor/spawn/_entry.py::_trio_main:164` is therefore
|
|
unreachable under this backend+config. Closing the gap is
|
|
aligned with existing design intent (make the already-
|
|
designed behavior actually fire), not a new feature.
|
|
Marked `xfail(strict=True)` so the
|
|
mark flips to XPASS→fail once the gap is closed and we'll
|
|
know to drop the mark.
|
|
|
|
'''
|
|
if platform.system() != 'Linux':
|
|
pytest.skip(
|
|
'orphan-reparenting semantics only exercised on Linux'
|
|
)
|
|
|
|
script_path = tmp_path / '_orphan_harness.py'
|
|
script_path.write_text(_ORPHAN_HARNESS_SCRIPT)
|
|
|
|
# Offset the port so we don't race the session reg_addr with
|
|
# any concurrently-running backend test's listener.
|
|
host: str = reg_addr[0]
|
|
port: int = int(reg_addr[1]) + 17
|
|
|
|
proc: subprocess.Popen = subprocess.Popen(
|
|
[
|
|
sys.executable,
|
|
str(script_path),
|
|
'subint_forkserver',
|
|
host,
|
|
str(port),
|
|
],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
)
|
|
parent_pid: int | None = None
|
|
child_pid: int | None = None
|
|
buf: dict = {}
|
|
try:
|
|
child_pid = int(_read_marker(proc, 'CHILD_PID', 15.0, buf))
|
|
parent_pid = int(_read_marker(proc, 'PARENT_READY', 15.0, buf))
|
|
|
|
# sanity: both alive before we start killing stuff
|
|
assert _process_alive(parent_pid), (
|
|
f'harness parent pid={parent_pid} gone before '
|
|
f'SIGKILL — test premise broken'
|
|
)
|
|
assert _process_alive(child_pid), (
|
|
f'orphan-candidate child pid={child_pid} gone '
|
|
f'before test started'
|
|
)
|
|
|
|
# step 3: kill parent — no IPC cancel arrives at child.
|
|
# `proc.wait()` reaps the zombie so it truly disappears
|
|
# from the process table (otherwise `os.kill(pid, 0)`
|
|
# keeps reporting it as alive).
|
|
os.kill(parent_pid, signal.SIGKILL)
|
|
try:
|
|
proc.wait(timeout=3.0)
|
|
except subprocess.TimeoutExpired:
|
|
pytest.fail(
|
|
f'harness parent pid={parent_pid} did not die '
|
|
f'after SIGKILL — test premise broken'
|
|
)
|
|
assert _process_alive(child_pid), (
|
|
f'child pid={child_pid} died along with parent — '
|
|
f'did the parent reap it before SIGKILL took? '
|
|
f'test premise requires an orphan.'
|
|
)
|
|
|
|
# step 4+5: SIGINT the orphan, poll for exit.
|
|
os.kill(child_pid, signal.SIGINT)
|
|
timeout: float = 6.0
|
|
cleanup_deadline: float = time.monotonic() + timeout
|
|
while time.monotonic() < cleanup_deadline:
|
|
if not _process_alive(child_pid):
|
|
return # <- success path
|
|
time.sleep(0.1)
|
|
|
|
pytest.fail(
|
|
f'Orphan subactor (pid={child_pid}) did NOT exit '
|
|
f'within 10s of SIGINT under `subint_forkserver` '
|
|
f'→ trio on non-main thread did not observe the '
|
|
f'default CPython KeyboardInterrupt; backend needs '
|
|
f'explicit SIGINT plumbing.'
|
|
)
|
|
finally:
|
|
# best-effort cleanup to avoid leaking orphans across
|
|
# the test session regardless of outcome.
|
|
for pid in (parent_pid, child_pid):
|
|
if pid is None:
|
|
continue
|
|
try:
|
|
os.kill(pid, signal.SIGKILL)
|
|
except ProcessLookupError:
|
|
pass
|
|
try:
|
|
proc.kill()
|
|
except OSError:
|
|
pass
|
|
try:
|
|
proc.wait(timeout=2.0)
|
|
except subprocess.TimeoutExpired:
|
|
pass
|