Wire `subint_forkserver` as first-class backend
Promote `_subint_forkserver` from primitives-only into a
registered spawn backend: `'subint_forkserver'` is now a
`SpawnMethodKey` literal, dispatched via `_methods` to
the new `subint_forkserver_proc()` target, feature-gated
under the existing `subint`-family py3.14+ case, and
selectable via `--spawn-backend=subint_forkserver`.
Deats,
- new `subint_forkserver_proc()` spawn target in
`_subint_forkserver`:
- mirrors `trio_proc()`'s supervision model — real OS
subprocess so `Portal.cancel_actor()` + `soft_kill()`
on graceful teardown, `os.kill(SIGKILL)` on hard-reap
(no `_interpreters.destroy()` race to fuss over bc the
child lives in its own process)
- only real diff from `trio_proc` is the spawn mechanism:
fork from a main-interp worker thread via
`fork_from_worker_thread()` (off-loaded to trio's
thread pool) instead of `trio.lowlevel.open_process()`
- child-side `_child_target` closure runs
`tractor._child._actor_child_main()` with
`spawn_method='trio'` — the child is a regular trio
actor, "subint_forkserver" names how the parent
spawned, not what the child runs
- new `_ForkedProc` class — thin `trio.Process`-compatible
shim around a raw OS pid: `.poll()` via
`waitpid(WNOHANG)`, async `.wait()` off-loaded to a trio
cache thread, `.kill()` via `SIGKILL`, `.returncode`
cached for repeat calls. `.stdin`/`.stdout`/`.stderr`
are `None` (fork-w/o-exec inherits parent FDs; we don't
marshal them) which matches `soft_kill()`'s `is not None`
guards
Also, new backend-tier test
`test_subint_forkserver_spawn_basic` drives the registered
backend end-to-end via `open_root_actor` + `open_nursery` +
`run_in_actor` w/ a trivial portal-RPC round-trip. Uses a
`forkserver_spawn_method` fixture to flip
`_spawn_method`/`_ctx` for the test's duration + restore on
teardown (so other session-level tests don't observe the
global flip). Test module docstring reworked to describe
the three tiers now covered: (1) primitive-level, (2)
parent-trio-driven primitives, (3) full registered backend.
Status: still-open work (tracked on `tractor#379`) doc'd
inline in the module docstring — no cancel/hard-kill stress
coverage yet, child-side subint-hosted root runtime still
future (gated on `msgspec#563`), thread-hygiene audit
pending the same unblock.
(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
subint_fork_proto
parent
5bd5f957d3
commit
43bd6a6410
|
|
@ -1,9 +1,17 @@
|
||||||
'''
|
'''
|
||||||
Integration exercises for the `tractor.spawn._subint_forkserver`
|
Integration exercises for the `tractor.spawn._subint_forkserver`
|
||||||
primitives (`fork_from_worker_thread()` + `run_subint_in_worker_thread()`)
|
submodule at three tiers:
|
||||||
driven from inside a real `trio.run()` in the parent process —
|
|
||||||
the runtime shape tractor will need when we move toward wiring
|
1. the low-level primitives
|
||||||
up a `subint_forkserver` spawn backend proper.
|
(`fork_from_worker_thread()` +
|
||||||
|
`run_subint_in_worker_thread()`) driven from inside a real
|
||||||
|
`trio.run()` in the parent process,
|
||||||
|
|
||||||
|
2. the full `subint_forkserver_proc` spawn backend wired
|
||||||
|
through tractor's normal actor-nursery + portal-RPC
|
||||||
|
machinery — i.e. `open_root_actor` + `open_nursery` +
|
||||||
|
`run_in_actor` against a subactor spawned via fork from a
|
||||||
|
main-interp worker thread.
|
||||||
|
|
||||||
Background
|
Background
|
||||||
----------
|
----------
|
||||||
|
|
@ -16,17 +24,20 @@ never entered a subint) works, and the forked child can then
|
||||||
host its own `trio.run()` inside a fresh subint.
|
host its own `trio.run()` inside a fresh subint.
|
||||||
|
|
||||||
Those smoke-test scenarios are standalone — no trio runtime
|
Those smoke-test scenarios are standalone — no trio runtime
|
||||||
in the *parent*. These tests exercise the same primitives
|
in the *parent*. Tiers (1)+(2) here cover the primitives
|
||||||
from inside `trio.run()` in the parent, proving out the
|
driven from inside `trio.run()` in the parent, and tier (3)
|
||||||
piece actually needed for a working spawn backend.
|
(the `*_spawn_basic` test) drives the registered
|
||||||
|
`subint_forkserver` spawn backend end-to-end against the
|
||||||
|
tractor runtime.
|
||||||
|
|
||||||
Gating
|
Gating
|
||||||
------
|
------
|
||||||
- py3.14+ (via `concurrent.interpreters` presence)
|
- py3.14+ (via `concurrent.interpreters` presence)
|
||||||
- no backend restriction (these tests don't use
|
- no `--spawn-backend` restriction — the backend-level test
|
||||||
`--spawn-backend` — they drive the forkserver primitives
|
flips `tractor.spawn._spawn._spawn_method` programmatically
|
||||||
directly rather than going through tractor's spawn-method
|
(via `try_set_start_method('subint_forkserver')`) and
|
||||||
registry).
|
restores it on teardown, so these tests are independent of
|
||||||
|
the session-level CLI backend choice.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
@ -36,6 +47,7 @@ import os
|
||||||
import pytest
|
import pytest
|
||||||
import trio
|
import trio
|
||||||
|
|
||||||
|
import tractor
|
||||||
from tractor.devx import dump_on_hang
|
from tractor.devx import dump_on_hang
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -50,6 +62,8 @@ from tractor.spawn._subint_forkserver import ( # noqa: E402
|
||||||
run_subint_in_worker_thread,
|
run_subint_in_worker_thread,
|
||||||
wait_child,
|
wait_child,
|
||||||
)
|
)
|
||||||
|
from tractor.spawn import _spawn as _spawn_mod # noqa: E402
|
||||||
|
from tractor.spawn._spawn import try_set_start_method # noqa: E402
|
||||||
|
|
||||||
|
|
||||||
# ----------------------------------------------------------------
|
# ----------------------------------------------------------------
|
||||||
|
|
@ -212,3 +226,104 @@ def test_fork_and_run_trio_in_child() -> None:
|
||||||
),
|
),
|
||||||
)
|
)
|
||||||
assert isinstance(pid, int) and pid > 0
|
assert isinstance(pid, int) and pid > 0
|
||||||
|
|
||||||
|
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
# tier-3 backend test: drive the registered `subint_forkserver`
|
||||||
|
# spawn backend end-to-end through tractor's actor-nursery +
|
||||||
|
# portal-RPC machinery.
|
||||||
|
# ----------------------------------------------------------------
|
||||||
|
|
||||||
|
|
||||||
|
async def _trivial_rpc() -> str:
|
||||||
|
'''
|
||||||
|
Minimal subactor-side RPC body: just return a sentinel
|
||||||
|
string the parent can assert on.
|
||||||
|
|
||||||
|
'''
|
||||||
|
return 'hello from subint-forkserver child'
|
||||||
|
|
||||||
|
|
||||||
|
async def _happy_path_forkserver(
|
||||||
|
reg_addr: tuple[str, int | str],
|
||||||
|
deadline: float,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Parent-side harness: stand up a root actor, open an actor
|
||||||
|
nursery, spawn one subactor via the currently-selected
|
||||||
|
spawn backend (which this test will have flipped to
|
||||||
|
`subint_forkserver`), run a trivial RPC through its
|
||||||
|
portal, assert the round-trip result.
|
||||||
|
|
||||||
|
'''
|
||||||
|
with trio.fail_after(deadline):
|
||||||
|
async with (
|
||||||
|
tractor.open_root_actor(
|
||||||
|
registry_addrs=[reg_addr],
|
||||||
|
),
|
||||||
|
tractor.open_nursery() as an,
|
||||||
|
):
|
||||||
|
portal: tractor.Portal = await an.run_in_actor(
|
||||||
|
_trivial_rpc,
|
||||||
|
name='subint-forkserver-child',
|
||||||
|
)
|
||||||
|
result: str = await portal.wait_for_result()
|
||||||
|
assert result == 'hello from subint-forkserver child'
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def forkserver_spawn_method():
|
||||||
|
'''
|
||||||
|
Flip `tractor.spawn._spawn._spawn_method` to
|
||||||
|
`'subint_forkserver'` for the duration of a test, then
|
||||||
|
restore whatever was in place before (usually the
|
||||||
|
session-level CLI choice, typically `'trio'`).
|
||||||
|
|
||||||
|
Without this, other tests in the same session would
|
||||||
|
observe the global flip and start spawning via fork —
|
||||||
|
which is almost certainly NOT what their assertions were
|
||||||
|
written against.
|
||||||
|
|
||||||
|
'''
|
||||||
|
prev_method: str = _spawn_mod._spawn_method
|
||||||
|
prev_ctx = _spawn_mod._ctx
|
||||||
|
try_set_start_method('subint_forkserver')
|
||||||
|
try:
|
||||||
|
yield
|
||||||
|
finally:
|
||||||
|
_spawn_mod._spawn_method = prev_method
|
||||||
|
_spawn_mod._ctx = prev_ctx
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.timeout(60, method='thread')
|
||||||
|
def test_subint_forkserver_spawn_basic(
|
||||||
|
reg_addr: tuple[str, int | str],
|
||||||
|
forkserver_spawn_method,
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Happy-path: spawn ONE subactor via the
|
||||||
|
`subint_forkserver` backend (parent-side fork from a
|
||||||
|
main-interp worker thread), do a trivial portal-RPC
|
||||||
|
round-trip, tear the nursery down cleanly.
|
||||||
|
|
||||||
|
If this passes, the "forkserver + tractor runtime" arch
|
||||||
|
is proven end-to-end: the registered
|
||||||
|
`subint_forkserver_proc` spawn target successfully
|
||||||
|
forks a child, the child runs `_actor_child_main()` +
|
||||||
|
completes IPC handshake + serves an RPC, and the parent
|
||||||
|
reaps via `_ForkedProc.wait()` without regressing any of
|
||||||
|
the normal nursery teardown invariants.
|
||||||
|
|
||||||
|
'''
|
||||||
|
deadline: float = 20.0
|
||||||
|
with dump_on_hang(
|
||||||
|
seconds=deadline,
|
||||||
|
path='/tmp/subint_forkserver_spawn_basic.dump',
|
||||||
|
):
|
||||||
|
trio.run(
|
||||||
|
partial(
|
||||||
|
_happy_path_forkserver,
|
||||||
|
reg_addr,
|
||||||
|
deadline,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -72,6 +72,13 @@ SpawnMethodKey = Literal[
|
||||||
# `ai/conc-anal/subint_fork_blocked_by_cpython_post_fork_issue.md`
|
# `ai/conc-anal/subint_fork_blocked_by_cpython_post_fork_issue.md`
|
||||||
# + issue #379 for the full analysis.
|
# + issue #379 for the full analysis.
|
||||||
'subint_fork',
|
'subint_fork',
|
||||||
|
# EXPERIMENTAL — the `subint_fork` workaround. `os.fork()`
|
||||||
|
# from a non-trio worker thread (never entered a subint)
|
||||||
|
# is CPython-legal and works cleanly; forked child runs
|
||||||
|
# `tractor._child._actor_child_main()` against a trio
|
||||||
|
# runtime, exactly like `trio_proc` but via fork instead
|
||||||
|
# of subproc-exec. See `tractor.spawn._subint_forkserver`.
|
||||||
|
'subint_forkserver',
|
||||||
]
|
]
|
||||||
_spawn_method: SpawnMethodKey = 'trio'
|
_spawn_method: SpawnMethodKey = 'trio'
|
||||||
|
|
||||||
|
|
@ -124,13 +131,14 @@ def try_set_start_method(
|
||||||
case 'trio':
|
case 'trio':
|
||||||
_ctx = None
|
_ctx = None
|
||||||
|
|
||||||
case 'subint' | 'subint_fork':
|
case 'subint' | 'subint_fork' | 'subint_forkserver':
|
||||||
# Both subint backends need no `mp.context`; both
|
# All subint-family backends need no `mp.context`;
|
||||||
# feature-gate on the py3.14 public
|
# all three feature-gate on the py3.14 public
|
||||||
# `concurrent.interpreters` wrapper (PEP 734). See
|
# `concurrent.interpreters` wrapper (PEP 734). See
|
||||||
# `tractor.spawn._subint` for the detailed
|
# `tractor.spawn._subint` for the detailed
|
||||||
# reasoning and the distinction between the two
|
# reasoning. `subint_fork` is blocked at the
|
||||||
# (`subint_fork` is WIP/experimental).
|
# CPython level (raises `NotImplementedError`);
|
||||||
|
# `subint_forkserver` is the working workaround.
|
||||||
from ._subint import _has_subints
|
from ._subint import _has_subints
|
||||||
if not _has_subints:
|
if not _has_subints:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
|
|
@ -469,6 +477,7 @@ from ._trio import trio_proc
|
||||||
from ._mp import mp_proc
|
from ._mp import mp_proc
|
||||||
from ._subint import subint_proc
|
from ._subint import subint_proc
|
||||||
from ._subint_fork import subint_fork_proc
|
from ._subint_fork import subint_fork_proc
|
||||||
|
from ._subint_forkserver import subint_forkserver_proc
|
||||||
|
|
||||||
|
|
||||||
# proc spawning backend target map
|
# proc spawning backend target map
|
||||||
|
|
@ -483,4 +492,8 @@ _methods: dict[SpawnMethodKey, Callable] = {
|
||||||
# clean `NotImplementedError` with pointer to the analysis,
|
# clean `NotImplementedError` with pointer to the analysis,
|
||||||
# rather than an "invalid backend" error.
|
# rather than an "invalid backend" error.
|
||||||
'subint_fork': subint_fork_proc,
|
'subint_fork': subint_fork_proc,
|
||||||
|
# WIP — fork-from-non-trio-worker-thread, works on py3.14+
|
||||||
|
# (validated via `ai/conc-anal/subint_fork_from_main_thread_smoketest.py`).
|
||||||
|
# See `tractor.spawn._subint_forkserver`.
|
||||||
|
'subint_forkserver': subint_forkserver_proc,
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -53,11 +53,27 @@ and inherited parent state.
|
||||||
|
|
||||||
Status
|
Status
|
||||||
------
|
------
|
||||||
**EXPERIMENTAL** — primitives only. Not yet wired into
|
**EXPERIMENTAL** — wired as the `'subint_forkserver'` entry
|
||||||
`tractor.spawn._spawn`'s backend registry. The next step is
|
in `tractor.spawn._spawn._methods` and selectable via
|
||||||
to drive these from a parent-side `trio.run()` and hook the
|
`try_set_start_method('subint_forkserver')` / `--spawn-backend
|
||||||
returned child pid into tractor's normal actor-nursery/IPC
|
=subint_forkserver`. Parent-side spawn, child-side runtime
|
||||||
machinery.
|
bring-up and normal portal-RPC teardown are validated by the
|
||||||
|
backend-tier test in
|
||||||
|
`tests/spawn/test_subint_forkserver.py::test_subint_forkserver_spawn_basic`.
|
||||||
|
|
||||||
|
Still-open work (tracked on tractor #379):
|
||||||
|
|
||||||
|
- no cancellation / hard-kill stress coverage yet (counterpart
|
||||||
|
to `tests/test_subint_cancellation.py` for the plain
|
||||||
|
`subint` backend),
|
||||||
|
- child-side "subint-hosted root runtime" mode (the second
|
||||||
|
half of the envisioned arch — currently the forked child
|
||||||
|
runs plain `_trio_main` via `spawn_method='trio'`; the
|
||||||
|
subint-hosted variant is still the future step gated on
|
||||||
|
msgspec PEP 684 support),
|
||||||
|
- thread-hygiene audit of the two `threading.Thread`
|
||||||
|
primitives below, gated on the same msgspec unblock
|
||||||
|
(see TODO section further down).
|
||||||
|
|
||||||
TODO — cleanup gated on msgspec PEP 684 support
|
TODO — cleanup gated on msgspec PEP 684 support
|
||||||
-----------------------------------------------
|
-----------------------------------------------
|
||||||
|
|
@ -93,16 +109,37 @@ See also
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import os
|
import os
|
||||||
import signal
|
import signal
|
||||||
|
import sys
|
||||||
import threading
|
import threading
|
||||||
|
from functools import partial
|
||||||
from typing import (
|
from typing import (
|
||||||
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
import trio
|
||||||
|
from trio import TaskStatus
|
||||||
|
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
|
from tractor.msg import (
|
||||||
|
types as msgtypes,
|
||||||
|
pretty_struct,
|
||||||
|
)
|
||||||
|
from tractor.runtime._state import current_actor
|
||||||
|
from tractor.runtime._portal import Portal
|
||||||
|
from ._spawn import (
|
||||||
|
cancel_on_completion,
|
||||||
|
soft_kill,
|
||||||
|
)
|
||||||
|
|
||||||
if TYPE_CHECKING:
|
if TYPE_CHECKING:
|
||||||
pass
|
from tractor.discovery._addr import UnwrappedAddress
|
||||||
|
from tractor.ipc import (
|
||||||
|
_server,
|
||||||
|
)
|
||||||
|
from tractor.runtime._runtime import Actor
|
||||||
|
from tractor.runtime._supervise import ActorNursery
|
||||||
|
|
||||||
|
|
||||||
log = get_logger('tractor')
|
log = get_logger('tractor')
|
||||||
|
|
@ -360,3 +397,265 @@ def run_subint_in_worker_thread(
|
||||||
)
|
)
|
||||||
if err is not None:
|
if err is not None:
|
||||||
raise err
|
raise err
|
||||||
|
|
||||||
|
|
||||||
|
class _ForkedProc:
|
||||||
|
'''
|
||||||
|
Thin `trio.Process`-compatible shim around a raw OS pid
|
||||||
|
returned by `fork_from_worker_thread()`, exposing just
|
||||||
|
enough surface for the `soft_kill()` / hard-reap pattern
|
||||||
|
borrowed from `trio_proc()`.
|
||||||
|
|
||||||
|
Unlike `trio.Process`, we have no direct handles on the
|
||||||
|
child's std-streams (fork-without-exec inherits the
|
||||||
|
parent's FDs, but we don't marshal them into this
|
||||||
|
wrapper) — `.stdin`/`.stdout`/`.stderr` are all `None`,
|
||||||
|
which matches what `soft_kill()` handles via its
|
||||||
|
`is not None` guards.
|
||||||
|
|
||||||
|
'''
|
||||||
|
def __init__(self, pid: int):
|
||||||
|
self.pid: int = pid
|
||||||
|
self._returncode: int | None = None
|
||||||
|
# `soft_kill`/`hard_kill` check these for pipe
|
||||||
|
# teardown — all None since we didn't wire up pipes
|
||||||
|
# on the fork-without-exec path.
|
||||||
|
self.stdin = None
|
||||||
|
self.stdout = None
|
||||||
|
self.stderr = None
|
||||||
|
|
||||||
|
def poll(self) -> int | None:
|
||||||
|
'''
|
||||||
|
Non-blocking liveness probe. Returns `None` if the
|
||||||
|
child is still running, else its exit code (negative
|
||||||
|
for signal-death, matching `subprocess.Popen`
|
||||||
|
convention).
|
||||||
|
|
||||||
|
'''
|
||||||
|
if self._returncode is not None:
|
||||||
|
return self._returncode
|
||||||
|
try:
|
||||||
|
waited_pid, status = os.waitpid(self.pid, os.WNOHANG)
|
||||||
|
except ChildProcessError:
|
||||||
|
# already reaped (or never existed) — treat as
|
||||||
|
# clean exit for polling purposes.
|
||||||
|
self._returncode = 0
|
||||||
|
return 0
|
||||||
|
if waited_pid == 0:
|
||||||
|
return None
|
||||||
|
self._returncode = self._parse_status(status)
|
||||||
|
return self._returncode
|
||||||
|
|
||||||
|
@property
|
||||||
|
def returncode(self) -> int | None:
|
||||||
|
return self._returncode
|
||||||
|
|
||||||
|
async def wait(self) -> int:
|
||||||
|
'''
|
||||||
|
Async blocking wait for the child's exit, off-loaded
|
||||||
|
to a trio cache thread so we don't block the event
|
||||||
|
loop on `waitpid()`. Safe to call multiple times;
|
||||||
|
subsequent calls return the cached rc without
|
||||||
|
re-issuing the syscall.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if self._returncode is not None:
|
||||||
|
return self._returncode
|
||||||
|
_, status = await trio.to_thread.run_sync(
|
||||||
|
os.waitpid,
|
||||||
|
self.pid,
|
||||||
|
0,
|
||||||
|
abandon_on_cancel=False,
|
||||||
|
)
|
||||||
|
self._returncode = self._parse_status(status)
|
||||||
|
return self._returncode
|
||||||
|
|
||||||
|
def kill(self) -> None:
|
||||||
|
'''
|
||||||
|
OS-level `SIGKILL` to the child. Swallows
|
||||||
|
`ProcessLookupError` (already dead).
|
||||||
|
|
||||||
|
'''
|
||||||
|
try:
|
||||||
|
os.kill(self.pid, signal.SIGKILL)
|
||||||
|
except ProcessLookupError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def _parse_status(self, status: int) -> int:
|
||||||
|
if os.WIFEXITED(status):
|
||||||
|
return os.WEXITSTATUS(status)
|
||||||
|
elif os.WIFSIGNALED(status):
|
||||||
|
# negative rc by `subprocess.Popen` convention
|
||||||
|
return -os.WTERMSIG(status)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def __repr__(self) -> str:
|
||||||
|
return (
|
||||||
|
f'<_ForkedProc pid={self.pid} '
|
||||||
|
f'returncode={self._returncode}>'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
async def subint_forkserver_proc(
|
||||||
|
name: str,
|
||||||
|
actor_nursery: ActorNursery,
|
||||||
|
subactor: Actor,
|
||||||
|
errors: dict[tuple[str, str], Exception],
|
||||||
|
|
||||||
|
# passed through to actor main
|
||||||
|
bind_addrs: list[UnwrappedAddress],
|
||||||
|
parent_addr: UnwrappedAddress,
|
||||||
|
_runtime_vars: dict[str, Any],
|
||||||
|
*,
|
||||||
|
infect_asyncio: bool = False,
|
||||||
|
task_status: TaskStatus[Portal] = trio.TASK_STATUS_IGNORED,
|
||||||
|
proc_kwargs: dict[str, any] = {},
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
'''
|
||||||
|
Spawn a subactor via `os.fork()` from a non-trio worker
|
||||||
|
thread (see `fork_from_worker_thread()`), with the forked
|
||||||
|
child running `tractor._child._actor_child_main()` and
|
||||||
|
connecting back via tractor's normal IPC handshake.
|
||||||
|
|
||||||
|
Supervision model mirrors `trio_proc()` — we manage a
|
||||||
|
real OS subprocess, so `Portal.cancel_actor()` +
|
||||||
|
`soft_kill()` on graceful teardown and `os.kill(SIGKILL)`
|
||||||
|
on hard-reap both apply directly (no
|
||||||
|
`_interpreters.destroy()` voodoo needed since the child
|
||||||
|
is in its own process).
|
||||||
|
|
||||||
|
The only real difference from `trio_proc` is the spawn
|
||||||
|
mechanism: fork from a known-clean main-interp worker
|
||||||
|
thread instead of `trio.lowlevel.open_process()`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
if not _has_subints:
|
||||||
|
raise RuntimeError(
|
||||||
|
f'The {"subint_forkserver"!r} spawn backend '
|
||||||
|
f'requires Python 3.14+.\n'
|
||||||
|
f'Current runtime: {sys.version}'
|
||||||
|
)
|
||||||
|
|
||||||
|
uid: tuple[str, str] = subactor.aid.uid
|
||||||
|
loglevel: str | None = subactor.loglevel
|
||||||
|
|
||||||
|
# Closure captured into the fork-child's memory image.
|
||||||
|
# In the child this is the first post-fork Python code to
|
||||||
|
# run, on what was the fork-worker thread in the parent.
|
||||||
|
def _child_target() -> int:
|
||||||
|
# Lazy import so the parent doesn't pay for it on
|
||||||
|
# every spawn — it's module-level in `_child` but
|
||||||
|
# cheap enough to re-resolve here.
|
||||||
|
from tractor._child import _actor_child_main
|
||||||
|
_actor_child_main(
|
||||||
|
uid=uid,
|
||||||
|
loglevel=loglevel,
|
||||||
|
parent_addr=parent_addr,
|
||||||
|
infect_asyncio=infect_asyncio,
|
||||||
|
# NOTE, from the child-side runtime's POV it's
|
||||||
|
# a regular trio actor — it uses `_trio_main`,
|
||||||
|
# receives `SpawnSpec` over IPC, etc. The
|
||||||
|
# `subint_forkserver` name is a property of HOW
|
||||||
|
# the parent spawned, not of what the child is.
|
||||||
|
spawn_method='trio',
|
||||||
|
)
|
||||||
|
return 0
|
||||||
|
|
||||||
|
cancelled_during_spawn: bool = False
|
||||||
|
proc: _ForkedProc | None = None
|
||||||
|
ipc_server: _server.Server = actor_nursery._actor.ipc_server
|
||||||
|
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
pid: int = await trio.to_thread.run_sync(
|
||||||
|
partial(
|
||||||
|
fork_from_worker_thread,
|
||||||
|
_child_target,
|
||||||
|
thread_name=(
|
||||||
|
f'subint-forkserver[{name}]'
|
||||||
|
),
|
||||||
|
),
|
||||||
|
abandon_on_cancel=False,
|
||||||
|
)
|
||||||
|
proc = _ForkedProc(pid)
|
||||||
|
log.runtime(
|
||||||
|
f'Forked subactor via forkserver\n'
|
||||||
|
f'(>\n'
|
||||||
|
f' |_{proc}\n'
|
||||||
|
)
|
||||||
|
|
||||||
|
event, chan = await ipc_server.wait_for_peer(uid)
|
||||||
|
|
||||||
|
except trio.Cancelled:
|
||||||
|
cancelled_during_spawn = True
|
||||||
|
raise
|
||||||
|
|
||||||
|
assert proc is not None
|
||||||
|
|
||||||
|
portal = Portal(chan)
|
||||||
|
actor_nursery._children[uid] = (
|
||||||
|
subactor,
|
||||||
|
proc,
|
||||||
|
portal,
|
||||||
|
)
|
||||||
|
|
||||||
|
sspec = msgtypes.SpawnSpec(
|
||||||
|
_parent_main_data=subactor._parent_main_data,
|
||||||
|
enable_modules=subactor.enable_modules,
|
||||||
|
reg_addrs=subactor.reg_addrs,
|
||||||
|
bind_addrs=bind_addrs,
|
||||||
|
_runtime_vars=_runtime_vars,
|
||||||
|
)
|
||||||
|
log.runtime(
|
||||||
|
f'Sending spawn spec to forkserver child\n'
|
||||||
|
f'{{}}=> {chan.aid.reprol()!r}\n'
|
||||||
|
f'\n'
|
||||||
|
f'{pretty_struct.pformat(sspec)}\n'
|
||||||
|
)
|
||||||
|
await chan.send(sspec)
|
||||||
|
|
||||||
|
curr_actor: Actor = current_actor()
|
||||||
|
curr_actor._actoruid2nursery[uid] = actor_nursery
|
||||||
|
|
||||||
|
task_status.started(portal)
|
||||||
|
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
await actor_nursery._join_procs.wait()
|
||||||
|
|
||||||
|
async with trio.open_nursery() as nursery:
|
||||||
|
if portal in actor_nursery._cancel_after_result_on_exit:
|
||||||
|
nursery.start_soon(
|
||||||
|
cancel_on_completion,
|
||||||
|
portal,
|
||||||
|
subactor,
|
||||||
|
errors,
|
||||||
|
)
|
||||||
|
|
||||||
|
# reuse `trio_proc`'s soft-kill dance — `proc`
|
||||||
|
# is our `_ForkedProc` shim which implements the
|
||||||
|
# same `.poll()` / `.wait()` / `.kill()` surface
|
||||||
|
# `soft_kill` expects.
|
||||||
|
await soft_kill(
|
||||||
|
proc,
|
||||||
|
_ForkedProc.wait,
|
||||||
|
portal,
|
||||||
|
)
|
||||||
|
nursery.cancel_scope.cancel()
|
||||||
|
|
||||||
|
finally:
|
||||||
|
# Hard reap: SIGKILL + waitpid. Cheap since we have
|
||||||
|
# the real OS pid, unlike `subint_proc` which has to
|
||||||
|
# fuss with `_interpreters.destroy()` races.
|
||||||
|
if proc is not None and proc.poll() is None:
|
||||||
|
log.cancel(
|
||||||
|
f'Hard killing forkserver subactor\n'
|
||||||
|
f'>x)\n'
|
||||||
|
f' |_{proc}\n'
|
||||||
|
)
|
||||||
|
with trio.CancelScope(shield=True):
|
||||||
|
proc.kill()
|
||||||
|
await proc.wait()
|
||||||
|
|
||||||
|
if not cancelled_during_spawn:
|
||||||
|
actor_nursery._children.pop(uid, None)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue