Add DRAFT `subint_forkserver` orphan-SIGINT test

Tier-4 test `test_orphaned_subactor_sigint_cleanup_DRAFT`
documents an empirical SIGINT-delivery gap in the
`subint_forkserver` backend: when the parent dies via
`SIGKILL` (no IPC `Portal.cancel_actor()` possible) and
`SIGINT` is sent to the orphan child, the child DOES NOT
unwind — CPython's default `KeyboardInterrupt` is delivered
to `threading.main_thread()`, whose tstate is dead in the
post-fork child bc fork inherited the worker thread, not
main. Trio running on the fork-inherited worker thread
therefore never observes the signal. Marked
`xfail(strict=True)` so the mark flips to XPASS→fail once
the backend grows explicit SIGINT plumbing.

Deats,
- harness runs the failure-mode sequence out-of-process:
  1. harness subprocess runs a fresh Python script
     that calls `try_set_start_method('subint_forkserver')`
     then opens a root actor + one `sleep_forever` subactor
  2. parse `PARENT_READY=<pid>` + `CHILD_PID=<pid>` markers
     off harness `stdout` to confirm IPC handshake
     completed
  3. `SIGKILL` the parent, `proc.wait()` to reap the
     zombie (otherwise `os.kill(pid, 0)` keeps reporting
     it alive)
  4. assert the child survived the parent-reap (i.e. was
     actually orphaned, not reaped too) before moving on
  5. `SIGINT` the orphan child, poll `os.kill(child_pid, 0)`
     every 100ms for up to 10s
- supporting helpers: `_read_marker()` with per-proc
  bytes-buffer to carry partial lines across calls,
  `_process_alive()` liveness probe via `kill(pid, 0)`
- Linux-only via `platform.system() != 'Linux'` skip —
  orphan-reparenting semantics don't generalize to
  other platforms
- port offset (`reg_addr[1] + 17`) so the harness listener
  doesn't race concurrently-running backend tests
- best-effort `finally:` cleanup: `SIGKILL` any still-alive
  pids + `proc.kill()` + bounded `proc.wait()` to avoid
  leaking orphans across the session

Also, tier-4 header comment documents the cross-backend
generalization path: applicable to any multi-process
backend (`trio`, `mp_spawn`, `mp_forkserver`,
`subint_forkserver`), NOT to plain `subint` (in-process
subints have no orphan OS-child). Move path: lift
harness into `tests/_orphan_harness.py`, parametrize on
session `_spawn_method`, add
`skipif _spawn_method == 'subint'`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
subint_forkserver
Gud Boi 2026-04-22 19:39:41 -04:00
parent 27cc02e83d
commit 253e7cbd1c
1 changed files with 256 additions and 0 deletions

View File

@ -43,6 +43,12 @@ Gating
from __future__ import annotations
from functools import partial
import os
import platform
import select
import signal
import subprocess
import sys
import time
import pytest
import trio
@ -327,3 +333,253 @@ def test_subint_forkserver_spawn_basic(
deadline,
),
)
# ----------------------------------------------------------------
# tier-4 DRAFT: orphaned-subactor SIGINT survivability
#
# Motivating question: with `subint_forkserver`, the child's
# `trio.run()` lives on the fork-inherited worker thread which
# is NOT `threading.main_thread()` — so trio cannot install its
# `signal.set_wakeup_fd`-based SIGINT handler. If the parent
# goes away via `SIGKILL` (no IPC `Portal.cancel_actor()`
# possible), does SIGINT on the orphan child cleanly tear it
# down via CPython's default `KeyboardInterrupt` delivery, or
# does it hang?
#
# Working hypothesis (unverified pre-this-test): post-fork the
# child is effectively single-threaded (only the fork-worker
# tstate survived), so SIGINT → default handler → raises
# `KeyboardInterrupt` on the only thread — which happens to be
# the one driving trio's event loop — so trio observes it at
# the next checkpoint. If so, we're "fine" on this backend
# despite the missing trio SIGINT handler.
#
# Cross-backend generalization (decide after this passes):
# - applicable to any backend whose subactors are separate OS
# processes: `trio`, `mp_spawn`, `mp_forkserver`,
# `subint_forkserver`.
# - NOT applicable to plain `subint` (subactors are in-process
# subinterpreters, no orphan child process to SIGINT).
# - move path: lift the harness script into
# `tests/_orphan_harness.py`, parametrize on the session's
# `_spawn_method`, add `skipif _spawn_method == 'subint'`.
# ----------------------------------------------------------------
_ORPHAN_HARNESS_SCRIPT: str = '''
import os
import sys
import trio
import tractor
from tractor.spawn._spawn import try_set_start_method
async def _sleep_forever() -> None:
print(f"CHILD_PID={os.getpid()}", flush=True)
await trio.sleep_forever()
async def _main(reg_addr):
async with (
tractor.open_root_actor(registry_addrs=[reg_addr]),
tractor.open_nursery() as an,
):
portal = await an.run_in_actor(
_sleep_forever,
name="orphan-test-child",
)
print(f"PARENT_READY={os.getpid()}", flush=True)
await trio.sleep_forever()
if __name__ == "__main__":
backend = sys.argv[1]
host = sys.argv[2]
port = int(sys.argv[3])
try_set_start_method(backend)
trio.run(_main, (host, port))
'''
def _read_marker(
proc: subprocess.Popen,
marker: str,
timeout: float,
_buf: dict,
) -> str:
'''
Block until `<marker>=<value>\\n` appears on `proc.stdout`
and return `<value>`. Uses a per-proc byte buffer (`_buf`)
to carry partial lines across calls.
'''
deadline: float = time.monotonic() + timeout
remainder: bytes = _buf.get('remainder', b'')
prefix: bytes = f'{marker}='.encode()
while time.monotonic() < deadline:
# drain any complete lines already buffered
while b'\n' in remainder:
line, remainder = remainder.split(b'\n', 1)
if line.startswith(prefix):
_buf['remainder'] = remainder
return line[len(prefix):].decode().strip()
ready, _, _ = select.select([proc.stdout], [], [], 0.2)
if not ready:
continue
chunk: bytes = os.read(proc.stdout.fileno(), 4096)
if not chunk:
break
remainder += chunk
_buf['remainder'] = remainder
raise TimeoutError(
f'Never observed marker {marker!r} on harness stdout '
f'within {timeout}s'
)
def _process_alive(pid: int) -> bool:
'''Liveness probe for a pid we do NOT parent (post-orphan).'''
try:
os.kill(pid, 0)
return True
except ProcessLookupError:
return False
@pytest.mark.xfail(
strict=True,
reason=(
'subint_forkserver orphan-child SIGINT gap: trio on the '
'fork-inherited non-main thread has no SIGINT wakeup-fd '
'handler installed, and CPython\'s default '
'KeyboardInterrupt delivery does NOT reach the trio '
'loop on this thread post-fork. Fix path TBD — see '
'TODO in `tractor.spawn._subint_forkserver`. Flip this '
'mark (or drop it) once the gap is closed.'
),
)
@pytest.mark.timeout(60, method='thread')
def test_orphaned_subactor_sigint_cleanup_DRAFT(
reg_addr: tuple[str, int | str],
tmp_path,
) -> None:
'''
DRAFT orphaned-subactor SIGINT survivability under the
`subint_forkserver` backend.
Sequence:
1. Spawn a harness subprocess that brings up a root
actor + one `sleep_forever` subactor via
`subint_forkserver`.
2. Read the harness's stdout for `PARENT_READY=<pid>`
and `CHILD_PID=<pid>` markers (confirms the
parentchild IPC handshake completed).
3. `SIGKILL` the parent (no IPC cancel possible the
whole point of this test).
4. `SIGINT` the orphan child.
5. Poll `os.kill(child_pid, 0)` for up to 10s assert
the child exits.
Empirical result (2026-04): currently **FAILS** the
"post-fork single-thread → default KeyboardInterrupt lands
on trio's thread" hypothesis from the class-A/B
conc-anal discussion turned out to be wrong. SIGINT on the
orphan child doesn't unwind the trio loop. Most likely
CPython delivers `KeyboardInterrupt` specifically to
`threading.main_thread()`, whose tstate is dead in the
post-fork child (fork inherited the worker thread, not the
original main thread). Marked `xfail(strict=True)` so the
mark flips to XPASSfail once the gap is closed and we'll
know to drop the mark.
'''
if platform.system() != 'Linux':
pytest.skip(
'orphan-reparenting semantics only exercised on Linux'
)
script_path = tmp_path / '_orphan_harness.py'
script_path.write_text(_ORPHAN_HARNESS_SCRIPT)
# Offset the port so we don't race the session reg_addr with
# any concurrently-running backend test's listener.
host: str = reg_addr[0]
port: int = int(reg_addr[1]) + 17
proc: subprocess.Popen = subprocess.Popen(
[
sys.executable,
str(script_path),
'subint_forkserver',
host,
str(port),
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
parent_pid: int | None = None
child_pid: int | None = None
buf: dict = {}
try:
child_pid = int(_read_marker(proc, 'CHILD_PID', 15.0, buf))
parent_pid = int(_read_marker(proc, 'PARENT_READY', 15.0, buf))
# sanity: both alive before we start killing stuff
assert _process_alive(parent_pid), (
f'harness parent pid={parent_pid} gone before '
f'SIGKILL — test premise broken'
)
assert _process_alive(child_pid), (
f'orphan-candidate child pid={child_pid} gone '
f'before test started'
)
# step 3: kill parent — no IPC cancel arrives at child.
# `proc.wait()` reaps the zombie so it truly disappears
# from the process table (otherwise `os.kill(pid, 0)`
# keeps reporting it as alive).
os.kill(parent_pid, signal.SIGKILL)
try:
proc.wait(timeout=3.0)
except subprocess.TimeoutExpired:
pytest.fail(
f'harness parent pid={parent_pid} did not die '
f'after SIGKILL — test premise broken'
)
assert _process_alive(child_pid), (
f'child pid={child_pid} died along with parent — '
f'did the parent reap it before SIGKILL took? '
f'test premise requires an orphan.'
)
# step 4+5: SIGINT the orphan, poll for exit.
os.kill(child_pid, signal.SIGINT)
cleanup_deadline: float = time.monotonic() + 10.0
while time.monotonic() < cleanup_deadline:
if not _process_alive(child_pid):
return # <- success path
time.sleep(0.1)
pytest.fail(
f'Orphan subactor (pid={child_pid}) did NOT exit '
f'within 10s of SIGINT under `subint_forkserver` '
f'→ trio on non-main thread did not observe the '
f'default CPython KeyboardInterrupt; backend needs '
f'explicit SIGINT plumbing.'
)
finally:
# best-effort cleanup to avoid leaking orphans across
# the test session regardless of outcome.
for pid in (parent_pid, child_pid):
if pid is None:
continue
try:
os.kill(pid, signal.SIGKILL)
except ProcessLookupError:
pass
try:
proc.kill()
except OSError:
pass
try:
proc.wait(timeout=2.0)
except subprocess.TimeoutExpired:
pass