From 26914fde753d357920a6366c7e8ec15fcdbc0323 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 22 Apr 2026 18:49:23 -0400 Subject: [PATCH] Wire `subint_forkserver` as first-class backend MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Promote `_subint_forkserver` from primitives-only into a registered spawn backend: `'subint_forkserver'` is now a `SpawnMethodKey` literal, dispatched via `_methods` to the new `subint_forkserver_proc()` target, feature-gated under the existing `subint`-family py3.14+ case, and selectable via `--spawn-backend=subint_forkserver`. Deats, - new `subint_forkserver_proc()` spawn target in `_subint_forkserver`: - mirrors `trio_proc()`'s supervision model — real OS subprocess so `Portal.cancel_actor()` + `soft_kill()` on graceful teardown, `os.kill(SIGKILL)` on hard-reap (no `_interpreters.destroy()` race to fuss over bc the child lives in its own process) - only real diff from `trio_proc` is the spawn mechanism: fork from a main-interp worker thread via `fork_from_worker_thread()` (off-loaded to trio's thread pool) instead of `trio.lowlevel.open_process()` - child-side `_child_target` closure runs `tractor._child._actor_child_main()` with `spawn_method='trio'` — the child is a regular trio actor, "subint_forkserver" names how the parent spawned, not what the child runs - new `_ForkedProc` class — thin `trio.Process`-compatible shim around a raw OS pid: `.poll()` via `waitpid(WNOHANG)`, async `.wait()` off-loaded to a trio cache thread, `.kill()` via `SIGKILL`, `.returncode` cached for repeat calls. `.stdin`/`.stdout`/`.stderr` are `None` (fork-w/o-exec inherits parent FDs; we don't marshal them) which matches `soft_kill()`'s `is not None` guards Also, new backend-tier test `test_subint_forkserver_spawn_basic` drives the registered backend end-to-end via `open_root_actor` + `open_nursery` + `run_in_actor` w/ a trivial portal-RPC round-trip. Uses a `forkserver_spawn_method` fixture to flip `_spawn_method`/`_ctx` for the test's duration + restore on teardown (so other session-level tests don't observe the global flip). Test module docstring reworked to describe the three tiers now covered: (1) primitive-level, (2) parent-trio-driven primitives, (3) full registered backend. Status: still-open work (tracked on `tractor#379`) doc'd inline in the module docstring — no cancel/hard-kill stress coverage yet, child-side subint-hosted root runtime still future (gated on `msgspec#563`), thread-hygiene audit pending the same unblock. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tests/spawn/test_subint_forkserver.py | 137 +++++++++++- tractor/spawn/_spawn.py | 23 +- tractor/spawn/_subint_forkserver.py | 311 +++++++++++++++++++++++++- 3 files changed, 449 insertions(+), 22 deletions(-) diff --git a/tests/spawn/test_subint_forkserver.py b/tests/spawn/test_subint_forkserver.py index 87c497b6..bb601cfe 100644 --- a/tests/spawn/test_subint_forkserver.py +++ b/tests/spawn/test_subint_forkserver.py @@ -1,9 +1,17 @@ ''' Integration exercises for the `tractor.spawn._subint_forkserver` -primitives (`fork_from_worker_thread()` + `run_subint_in_worker_thread()`) -driven from inside a real `trio.run()` in the parent process — -the runtime shape tractor will need when we move toward wiring -up a `subint_forkserver` spawn backend proper. +submodule at three tiers: + +1. the low-level primitives + (`fork_from_worker_thread()` + + `run_subint_in_worker_thread()`) driven from inside a real + `trio.run()` in the parent process, + +2. the full `subint_forkserver_proc` spawn backend wired + through tractor's normal actor-nursery + portal-RPC + machinery — i.e. `open_root_actor` + `open_nursery` + + `run_in_actor` against a subactor spawned via fork from a + main-interp worker thread. Background ---------- @@ -16,17 +24,20 @@ never entered a subint) works, and the forked child can then host its own `trio.run()` inside a fresh subint. Those smoke-test scenarios are standalone — no trio runtime -in the *parent*. These tests exercise the same primitives -from inside `trio.run()` in the parent, proving out the -piece actually needed for a working spawn backend. +in the *parent*. Tiers (1)+(2) here cover the primitives +driven from inside `trio.run()` in the parent, and tier (3) +(the `*_spawn_basic` test) drives the registered +`subint_forkserver` spawn backend end-to-end against the +tractor runtime. Gating ------ - py3.14+ (via `concurrent.interpreters` presence) -- no backend restriction (these tests don't use - `--spawn-backend` — they drive the forkserver primitives - directly rather than going through tractor's spawn-method - registry). +- no `--spawn-backend` restriction — the backend-level test + flips `tractor.spawn._spawn._spawn_method` programmatically + (via `try_set_start_method('subint_forkserver')`) and + restores it on teardown, so these tests are independent of + the session-level CLI backend choice. ''' from __future__ import annotations @@ -36,6 +47,7 @@ import os import pytest import trio +import tractor from tractor.devx import dump_on_hang @@ -50,6 +62,8 @@ from tractor.spawn._subint_forkserver import ( # noqa: E402 run_subint_in_worker_thread, wait_child, ) +from tractor.spawn import _spawn as _spawn_mod # noqa: E402 +from tractor.spawn._spawn import try_set_start_method # noqa: E402 # ---------------------------------------------------------------- @@ -212,3 +226,104 @@ def test_fork_and_run_trio_in_child() -> None: ), ) assert isinstance(pid, int) and pid > 0 + + +# ---------------------------------------------------------------- +# tier-3 backend test: drive the registered `subint_forkserver` +# spawn backend end-to-end through tractor's actor-nursery + +# portal-RPC machinery. +# ---------------------------------------------------------------- + + +async def _trivial_rpc() -> str: + ''' + Minimal subactor-side RPC body: just return a sentinel + string the parent can assert on. + + ''' + return 'hello from subint-forkserver child' + + +async def _happy_path_forkserver( + reg_addr: tuple[str, int | str], + deadline: float, +) -> None: + ''' + Parent-side harness: stand up a root actor, open an actor + nursery, spawn one subactor via the currently-selected + spawn backend (which this test will have flipped to + `subint_forkserver`), run a trivial RPC through its + portal, assert the round-trip result. + + ''' + with trio.fail_after(deadline): + async with ( + tractor.open_root_actor( + registry_addrs=[reg_addr], + ), + tractor.open_nursery() as an, + ): + portal: tractor.Portal = await an.run_in_actor( + _trivial_rpc, + name='subint-forkserver-child', + ) + result: str = await portal.wait_for_result() + assert result == 'hello from subint-forkserver child' + + +@pytest.fixture +def forkserver_spawn_method(): + ''' + Flip `tractor.spawn._spawn._spawn_method` to + `'subint_forkserver'` for the duration of a test, then + restore whatever was in place before (usually the + session-level CLI choice, typically `'trio'`). + + Without this, other tests in the same session would + observe the global flip and start spawning via fork — + which is almost certainly NOT what their assertions were + written against. + + ''' + prev_method: str = _spawn_mod._spawn_method + prev_ctx = _spawn_mod._ctx + try_set_start_method('subint_forkserver') + try: + yield + finally: + _spawn_mod._spawn_method = prev_method + _spawn_mod._ctx = prev_ctx + + +@pytest.mark.timeout(60, method='thread') +def test_subint_forkserver_spawn_basic( + reg_addr: tuple[str, int | str], + forkserver_spawn_method, +) -> None: + ''' + Happy-path: spawn ONE subactor via the + `subint_forkserver` backend (parent-side fork from a + main-interp worker thread), do a trivial portal-RPC + round-trip, tear the nursery down cleanly. + + If this passes, the "forkserver + tractor runtime" arch + is proven end-to-end: the registered + `subint_forkserver_proc` spawn target successfully + forks a child, the child runs `_actor_child_main()` + + completes IPC handshake + serves an RPC, and the parent + reaps via `_ForkedProc.wait()` without regressing any of + the normal nursery teardown invariants. + + ''' + deadline: float = 20.0 + with dump_on_hang( + seconds=deadline, + path='/tmp/subint_forkserver_spawn_basic.dump', + ): + trio.run( + partial( + _happy_path_forkserver, + reg_addr, + deadline, + ), + ) diff --git a/tractor/spawn/_spawn.py b/tractor/spawn/_spawn.py index 937d8c95..14b1aafb 100644 --- a/tractor/spawn/_spawn.py +++ b/tractor/spawn/_spawn.py @@ -72,6 +72,13 @@ SpawnMethodKey = Literal[ # `ai/conc-anal/subint_fork_blocked_by_cpython_post_fork_issue.md` # + issue #379 for the full analysis. 'subint_fork', + # EXPERIMENTAL — the `subint_fork` workaround. `os.fork()` + # from a non-trio worker thread (never entered a subint) + # is CPython-legal and works cleanly; forked child runs + # `tractor._child._actor_child_main()` against a trio + # runtime, exactly like `trio_proc` but via fork instead + # of subproc-exec. See `tractor.spawn._subint_forkserver`. + 'subint_forkserver', ] _spawn_method: SpawnMethodKey = 'trio' @@ -124,13 +131,14 @@ def try_set_start_method( case 'trio': _ctx = None - case 'subint' | 'subint_fork': - # Both subint backends need no `mp.context`; both - # feature-gate on the py3.14 public + case 'subint' | 'subint_fork' | 'subint_forkserver': + # All subint-family backends need no `mp.context`; + # all three feature-gate on the py3.14 public # `concurrent.interpreters` wrapper (PEP 734). See # `tractor.spawn._subint` for the detailed - # reasoning and the distinction between the two - # (`subint_fork` is WIP/experimental). + # reasoning. `subint_fork` is blocked at the + # CPython level (raises `NotImplementedError`); + # `subint_forkserver` is the working workaround. from ._subint import _has_subints if not _has_subints: raise RuntimeError( @@ -469,6 +477,7 @@ from ._trio import trio_proc from ._mp import mp_proc from ._subint import subint_proc from ._subint_fork import subint_fork_proc +from ._subint_forkserver import subint_forkserver_proc # proc spawning backend target map @@ -483,4 +492,8 @@ _methods: dict[SpawnMethodKey, Callable] = { # clean `NotImplementedError` with pointer to the analysis, # rather than an "invalid backend" error. 'subint_fork': subint_fork_proc, + # WIP — fork-from-non-trio-worker-thread, works on py3.14+ + # (validated via `ai/conc-anal/subint_fork_from_main_thread_smoketest.py`). + # See `tractor.spawn._subint_forkserver`. + 'subint_forkserver': subint_forkserver_proc, } diff --git a/tractor/spawn/_subint_forkserver.py b/tractor/spawn/_subint_forkserver.py index 49d1a294..23322084 100644 --- a/tractor/spawn/_subint_forkserver.py +++ b/tractor/spawn/_subint_forkserver.py @@ -53,11 +53,27 @@ and inherited parent state. Status ------ -**EXPERIMENTAL** — primitives only. Not yet wired into -`tractor.spawn._spawn`'s backend registry. The next step is -to drive these from a parent-side `trio.run()` and hook the -returned child pid into tractor's normal actor-nursery/IPC -machinery. +**EXPERIMENTAL** — wired as the `'subint_forkserver'` entry +in `tractor.spawn._spawn._methods` and selectable via +`try_set_start_method('subint_forkserver')` / `--spawn-backend +=subint_forkserver`. Parent-side spawn, child-side runtime +bring-up and normal portal-RPC teardown are validated by the +backend-tier test in +`tests/spawn/test_subint_forkserver.py::test_subint_forkserver_spawn_basic`. + +Still-open work (tracked on tractor #379): + +- no cancellation / hard-kill stress coverage yet (counterpart + to `tests/test_subint_cancellation.py` for the plain + `subint` backend), +- child-side "subint-hosted root runtime" mode (the second + half of the envisioned arch — currently the forked child + runs plain `_trio_main` via `spawn_method='trio'`; the + subint-hosted variant is still the future step gated on + msgspec PEP 684 support), +- thread-hygiene audit of the two `threading.Thread` + primitives below, gated on the same msgspec unblock + (see TODO section further down). TODO — cleanup gated on msgspec PEP 684 support ----------------------------------------------- @@ -93,16 +109,37 @@ See also from __future__ import annotations import os import signal +import sys import threading +from functools import partial from typing import ( + Any, Callable, TYPE_CHECKING, ) +import trio +from trio import TaskStatus + from tractor.log import get_logger +from tractor.msg import ( + types as msgtypes, + pretty_struct, +) +from tractor.runtime._state import current_actor +from tractor.runtime._portal import Portal +from ._spawn import ( + cancel_on_completion, + soft_kill, +) if TYPE_CHECKING: - pass + from tractor.discovery._addr import UnwrappedAddress + from tractor.ipc import ( + _server, + ) + from tractor.runtime._runtime import Actor + from tractor.runtime._supervise import ActorNursery log = get_logger('tractor') @@ -360,3 +397,265 @@ def run_subint_in_worker_thread( ) if err is not None: raise err + + +class _ForkedProc: + ''' + Thin `trio.Process`-compatible shim around a raw OS pid + returned by `fork_from_worker_thread()`, exposing just + enough surface for the `soft_kill()` / hard-reap pattern + borrowed from `trio_proc()`. + + Unlike `trio.Process`, we have no direct handles on the + child's std-streams (fork-without-exec inherits the + parent's FDs, but we don't marshal them into this + wrapper) — `.stdin`/`.stdout`/`.stderr` are all `None`, + which matches what `soft_kill()` handles via its + `is not None` guards. + + ''' + def __init__(self, pid: int): + self.pid: int = pid + self._returncode: int | None = None + # `soft_kill`/`hard_kill` check these for pipe + # teardown — all None since we didn't wire up pipes + # on the fork-without-exec path. + self.stdin = None + self.stdout = None + self.stderr = None + + def poll(self) -> int | None: + ''' + Non-blocking liveness probe. Returns `None` if the + child is still running, else its exit code (negative + for signal-death, matching `subprocess.Popen` + convention). + + ''' + if self._returncode is not None: + return self._returncode + try: + waited_pid, status = os.waitpid(self.pid, os.WNOHANG) + except ChildProcessError: + # already reaped (or never existed) — treat as + # clean exit for polling purposes. + self._returncode = 0 + return 0 + if waited_pid == 0: + return None + self._returncode = self._parse_status(status) + return self._returncode + + @property + def returncode(self) -> int | None: + return self._returncode + + async def wait(self) -> int: + ''' + Async blocking wait for the child's exit, off-loaded + to a trio cache thread so we don't block the event + loop on `waitpid()`. Safe to call multiple times; + subsequent calls return the cached rc without + re-issuing the syscall. + + ''' + if self._returncode is not None: + return self._returncode + _, status = await trio.to_thread.run_sync( + os.waitpid, + self.pid, + 0, + abandon_on_cancel=False, + ) + self._returncode = self._parse_status(status) + return self._returncode + + def kill(self) -> None: + ''' + OS-level `SIGKILL` to the child. Swallows + `ProcessLookupError` (already dead). + + ''' + try: + os.kill(self.pid, signal.SIGKILL) + except ProcessLookupError: + pass + + def _parse_status(self, status: int) -> int: + if os.WIFEXITED(status): + return os.WEXITSTATUS(status) + elif os.WIFSIGNALED(status): + # negative rc by `subprocess.Popen` convention + return -os.WTERMSIG(status) + return 0 + + def __repr__(self) -> str: + return ( + f'<_ForkedProc pid={self.pid} ' + f'returncode={self._returncode}>' + ) + + +async def subint_forkserver_proc( + name: str, + actor_nursery: ActorNursery, + subactor: Actor, + errors: dict[tuple[str, str], Exception], + + # passed through to actor main + bind_addrs: list[UnwrappedAddress], + parent_addr: UnwrappedAddress, + _runtime_vars: dict[str, Any], + *, + infect_asyncio: bool = False, + task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED, + proc_kwargs: dict[str, any] = {}, + +) -> None: + ''' + Spawn a subactor via `os.fork()` from a non-trio worker + thread (see `fork_from_worker_thread()`), with the forked + child running `tractor._child._actor_child_main()` and + connecting back via tractor's normal IPC handshake. + + Supervision model mirrors `trio_proc()` — we manage a + real OS subprocess, so `Portal.cancel_actor()` + + `soft_kill()` on graceful teardown and `os.kill(SIGKILL)` + on hard-reap both apply directly (no + `_interpreters.destroy()` voodoo needed since the child + is in its own process). + + The only real difference from `trio_proc` is the spawn + mechanism: fork from a known-clean main-interp worker + thread instead of `trio.lowlevel.open_process()`. + + ''' + if not _has_subints: + raise RuntimeError( + f'The {"subint_forkserver"!r} spawn backend ' + f'requires Python 3.14+.\n' + f'Current runtime: {sys.version}' + ) + + uid: tuple[str, str] = subactor.aid.uid + loglevel: str | None = subactor.loglevel + + # Closure captured into the fork-child's memory image. + # In the child this is the first post-fork Python code to + # run, on what was the fork-worker thread in the parent. + def _child_target() -> int: + # Lazy import so the parent doesn't pay for it on + # every spawn — it's module-level in `_child` but + # cheap enough to re-resolve here. + from tractor._child import _actor_child_main + _actor_child_main( + uid=uid, + loglevel=loglevel, + parent_addr=parent_addr, + infect_asyncio=infect_asyncio, + # NOTE, from the child-side runtime's POV it's + # a regular trio actor — it uses `_trio_main`, + # receives `SpawnSpec` over IPC, etc. The + # `subint_forkserver` name is a property of HOW + # the parent spawned, not of what the child is. + spawn_method='trio', + ) + return 0 + + cancelled_during_spawn: bool = False + proc: _ForkedProc | None = None + ipc_server: _server.Server = actor_nursery._actor.ipc_server + + try: + try: + pid: int = await trio.to_thread.run_sync( + partial( + fork_from_worker_thread, + _child_target, + thread_name=( + f'subint-forkserver[{name}]' + ), + ), + abandon_on_cancel=False, + ) + proc = _ForkedProc(pid) + log.runtime( + f'Forked subactor via forkserver\n' + f'(>\n' + f' |_{proc}\n' + ) + + event, chan = await ipc_server.wait_for_peer(uid) + + except trio.Cancelled: + cancelled_during_spawn = True + raise + + assert proc is not None + + portal = Portal(chan) + actor_nursery._children[uid] = ( + subactor, + proc, + portal, + ) + + sspec = msgtypes.SpawnSpec( + _parent_main_data=subactor._parent_main_data, + enable_modules=subactor.enable_modules, + reg_addrs=subactor.reg_addrs, + bind_addrs=bind_addrs, + _runtime_vars=_runtime_vars, + ) + log.runtime( + f'Sending spawn spec to forkserver child\n' + f'{{}}=> {chan.aid.reprol()!r}\n' + f'\n' + f'{pretty_struct.pformat(sspec)}\n' + ) + await chan.send(sspec) + + curr_actor: Actor = current_actor() + curr_actor._actoruid2nursery[uid] = actor_nursery + + task_status.started(portal) + + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() + + async with trio.open_nursery() as nursery: + if portal in actor_nursery._cancel_after_result_on_exit: + nursery.start_soon( + cancel_on_completion, + portal, + subactor, + errors, + ) + + # reuse `trio_proc`'s soft-kill dance — `proc` + # is our `_ForkedProc` shim which implements the + # same `.poll()` / `.wait()` / `.kill()` surface + # `soft_kill` expects. + await soft_kill( + proc, + _ForkedProc.wait, + portal, + ) + nursery.cancel_scope.cancel() + + finally: + # Hard reap: SIGKILL + waitpid. Cheap since we have + # the real OS pid, unlike `subint_proc` which has to + # fuss with `_interpreters.destroy()` races. + if proc is not None and proc.poll() is None: + log.cancel( + f'Hard killing forkserver subactor\n' + f'>x)\n' + f' |_{proc}\n' + ) + with trio.CancelScope(shield=True): + proc.kill() + await proc.wait() + + if not cancelled_during_spawn: + actor_nursery._children.pop(uid, None)