Lift fork prims into `_subint_forkserver` mod
The smoketest (prior commit) empirically validated the
"fork-from-main-interp-worker-thread" arch on py3.14. Promote
the validated primitives out of the `ai/conc-anal/` smoketest
into `tractor.spawn._subint_forkserver` so they can eventually
be wired into a real "subint forkserver" spawn backend.
Deats,
- new module `tractor/spawn/_subint_forkserver.py` (337 LOC):
- `fork_from_worker_thread(child_target, thread_name)` —
spawn a main-interp `threading.Thread`, call `os.fork()`
from it, shuttle the child pid back to main via a pipe
- `run_trio_in_subint(bootstrap, ...)` — post-fork helper:
create a fresh subint + drive `_interpreters.exec()` on
a dedicated worker thread running the `bootstrap` str
(typically imports `trio`, defines an async entry, calls
`trio.run()`)
- `wait_child(pid, expect_exit_ok)` — `os.waitpid()` +
pass/fail classification reusable from harness AND the
eventual real spawn path
- feature-gated py3.14+ via the public
`concurrent.interpreters` presence check; matches the gate
in `tractor.spawn._subint`
- module docstring doc's the CPython-block context
(cross-refs `_subint_fork` stub + the two `conc-anal/`
docs) and status: EXPERIMENTAL, not yet registered in
`_spawn._methods`
Also, refactor the smoketest
`ai/conc-anal/subint_fork_from_main_thread_smoketest.py` to
import the primitives from the new module rather than inline
its own copies. Keeps the smoketest and the tractor-side
impl in sync as the forkserver design evolves; the smoketest
remains a zero-`tractor`-runtime CPython-level check
(imports ONLY the three primitives, no runtime bring-up).
Status: next step is to drive these from a parent-side
`trio.run()` and hook the returned child pid into the normal
actor-nursery/IPC flow — then register `subint_forkserver`
as a `SpawnMethodKey` in `_spawn.py`.
(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
subint_forkserver_backend
parent
de4f470b6c
commit
82332fbceb
|
|
@ -62,11 +62,9 @@ Usage
|
|||
from __future__ import annotations
|
||||
import argparse
|
||||
import os
|
||||
import signal
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
from typing import Callable
|
||||
|
||||
|
||||
# Hard-require py3.14 for the public `concurrent.interpreters`
|
||||
|
|
@ -84,8 +82,22 @@ except ImportError:
|
|||
sys.exit(2)
|
||||
|
||||
|
||||
# The actual primitives this script exercises live in
|
||||
# `tractor.spawn._subint_forkserver` — we re-import them here
|
||||
# rather than inlining so the module and the validation stay
|
||||
# in sync. (Early versions of this file had them inline for
|
||||
# the "zero tractor imports" isolation guarantee; now that
|
||||
# CPython-level feasibility is confirmed, the validated
|
||||
# primitives have moved into tractor proper.)
|
||||
from tractor.spawn._subint_forkserver import (
|
||||
fork_from_worker_thread,
|
||||
run_trio_in_subint,
|
||||
wait_child,
|
||||
)
|
||||
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
# small observability helpers
|
||||
# small observability helpers (test-harness only)
|
||||
# ----------------------------------------------------------------
|
||||
|
||||
|
||||
|
|
@ -94,49 +106,24 @@ def _banner(title: str) -> None:
|
|||
print(f'\n{line}\n{title}\n{line}', flush=True)
|
||||
|
||||
|
||||
def _wait_child(
|
||||
pid: int,
|
||||
*,
|
||||
def _report(
|
||||
label: str,
|
||||
*,
|
||||
ok: bool,
|
||||
status_str: str,
|
||||
expect_exit_ok: bool,
|
||||
) -> bool:
|
||||
'''
|
||||
Await a forked child's exit status and render pass/fail.
|
||||
|
||||
`expect_exit_ok=True` means we expect a normal exit (code
|
||||
0 via WEXITSTATUS). `expect_exit_ok=False` means we expect
|
||||
an abnormal death (WIFSIGNALED or nonzero WEXITSTATUS) —
|
||||
used for the `control_*` scenario where CPython is
|
||||
supposed to abort the child.
|
||||
|
||||
'''
|
||||
_, status = os.waitpid(pid, 0)
|
||||
exited_normally = os.WIFEXITED(status) and os.WEXITSTATUS(status) == 0
|
||||
signaled = os.WIFSIGNALED(status)
|
||||
sig = os.WTERMSIG(status) if signaled else None
|
||||
rc = os.WEXITSTATUS(status) if os.WIFEXITED(status) else None
|
||||
|
||||
if expect_exit_ok:
|
||||
ok = exited_normally
|
||||
expected_str = 'normal exit (rc=0)'
|
||||
else:
|
||||
ok = not exited_normally
|
||||
expected_str = (
|
||||
'abnormal death (signal or nonzero exit)'
|
||||
)
|
||||
|
||||
verdict = 'PASS' if ok else 'FAIL'
|
||||
status_str = (
|
||||
f'signal={signal.Signals(sig).name}'
|
||||
if signaled
|
||||
else f'rc={rc}'
|
||||
) -> None:
|
||||
verdict: str = 'PASS' if ok else 'FAIL'
|
||||
expected_str: str = (
|
||||
'normal exit (rc=0)'
|
||||
if expect_exit_ok
|
||||
else 'abnormal death (signal or nonzero exit)'
|
||||
)
|
||||
print(
|
||||
f'[{verdict}] {label}: '
|
||||
f'expected {expected_str}; observed {status_str}',
|
||||
flush=True,
|
||||
)
|
||||
return ok
|
||||
|
||||
|
||||
# ----------------------------------------------------------------
|
||||
|
|
@ -256,74 +243,34 @@ def scenario_main_thread_fork() -> int:
|
|||
# ----------------------------------------------------------------
|
||||
|
||||
|
||||
def _fork_from_worker_thread(
|
||||
child_target: Callable[[], int] | None = None,
|
||||
label: str = 'worker_thread_fork',
|
||||
def _run_worker_thread_fork_scenario(
|
||||
label: str,
|
||||
*,
|
||||
child_target=None,
|
||||
) -> int:
|
||||
'''
|
||||
Fork from a main-interp worker thread (not a subint).
|
||||
Returns the child's exit code observed by the parent.
|
||||
|
||||
`child_target` is called IN THE CHILD before `os._exit`.
|
||||
If omitted, the child just `_exit(0)`s immediately.
|
||||
|
||||
`label` is used in the pass/fail banner so reuse of this
|
||||
helper across scenarios reports the scenario name, not
|
||||
just the underlying fork-mechanism name.
|
||||
Thin wrapper: delegate the actual fork to the
|
||||
`tractor.spawn._subint_forkserver` primitive, then wait
|
||||
on the child and render a pass/fail banner.
|
||||
|
||||
'''
|
||||
# Use a simple pipe to shuttle the child PID back to main.
|
||||
rfd, wfd = os.pipe()
|
||||
|
||||
def _worker() -> None:
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
# CHILD: close parent's pipe ends, do work, exit.
|
||||
os.close(rfd)
|
||||
os.close(wfd)
|
||||
rc = 0
|
||||
if child_target is not None:
|
||||
try:
|
||||
rc = child_target() or 0
|
||||
except BaseException as err:
|
||||
print(
|
||||
f' CHILD: child_target raised: '
|
||||
f'{type(err).__name__}: {err}',
|
||||
file=sys.stderr, flush=True,
|
||||
)
|
||||
rc = 2
|
||||
os._exit(rc)
|
||||
else:
|
||||
# PARENT (still in worker thread): send pid to
|
||||
# main thread via the pipe.
|
||||
os.write(wfd, pid.to_bytes(8, 'little'))
|
||||
|
||||
t = threading.Thread(
|
||||
target=_worker,
|
||||
name=f'worker-fork-thread[{label}]',
|
||||
daemon=False,
|
||||
)
|
||||
t.start()
|
||||
t.join(timeout=10.0)
|
||||
if t.is_alive():
|
||||
print(
|
||||
f'[FAIL] {label}: worker-thread fork driver '
|
||||
f'did not return in 10s',
|
||||
flush=True,
|
||||
try:
|
||||
pid: int = fork_from_worker_thread(
|
||||
child_target=child_target,
|
||||
thread_name=f'worker-fork-thread[{label}]',
|
||||
)
|
||||
except RuntimeError as err:
|
||||
print(f'[FAIL] {label}: {err}', flush=True)
|
||||
return 1
|
||||
|
||||
pid_bytes = os.read(rfd, 8)
|
||||
os.close(rfd)
|
||||
os.close(wfd)
|
||||
pid = int.from_bytes(pid_bytes, 'little')
|
||||
print(f' forked child pid={pid}', flush=True)
|
||||
|
||||
return 0 if _wait_child(
|
||||
pid,
|
||||
label=label,
|
||||
ok, status_str = wait_child(pid, expect_exit_ok=True)
|
||||
_report(
|
||||
label,
|
||||
ok=ok,
|
||||
status_str=status_str,
|
||||
expect_exit_ok=True,
|
||||
) else 1
|
||||
)
|
||||
return 0 if ok else 1
|
||||
|
||||
|
||||
def scenario_worker_thread_fork() -> int:
|
||||
|
|
@ -332,9 +279,8 @@ def scenario_worker_thread_fork() -> int:
|
|||
'(expected: child exits normally — this is the one '
|
||||
'that matters)'
|
||||
)
|
||||
return _fork_from_worker_thread(
|
||||
child_target=None,
|
||||
label='worker_thread_fork',
|
||||
return _run_worker_thread_fork_scenario(
|
||||
'worker_thread_fork',
|
||||
)
|
||||
|
||||
|
||||
|
|
@ -343,52 +289,39 @@ def scenario_worker_thread_fork() -> int:
|
|||
# ----------------------------------------------------------------
|
||||
|
||||
|
||||
_CHILD_TRIO_BOOTSTRAP: str = (
|
||||
'import trio\n'
|
||||
'async def _main():\n'
|
||||
' await trio.sleep(0.05)\n'
|
||||
' return 42\n'
|
||||
'result = trio.run(_main)\n'
|
||||
'assert result == 42, f"trio.run returned {result}"\n'
|
||||
'print(" CHILD subint: trio.run OK, result=42", '
|
||||
'flush=True)\n'
|
||||
)
|
||||
|
||||
|
||||
def _child_trio_in_subint() -> int:
|
||||
'''
|
||||
CHILD-side: from fork-thread (main-interp), create a fresh
|
||||
subint and run `trio.run()` in it on a dedicated worker
|
||||
thread. Returns 0 on success.
|
||||
CHILD-side `child_target`: drive a trivial `trio.run()`
|
||||
inside a fresh legacy-config subint on a worker thread,
|
||||
using the `tractor.spawn._subint_forkserver.run_trio_in_subint`
|
||||
primitive. Returns 0 on success.
|
||||
|
||||
'''
|
||||
child_interp = _interpreters.create('legacy')
|
||||
subint_bootstrap = (
|
||||
'import trio\n'
|
||||
'async def _main():\n'
|
||||
' await trio.sleep(0.05)\n'
|
||||
' return 42\n'
|
||||
'result = trio.run(_main)\n'
|
||||
'assert result == 42, f"trio.run returned {result}"\n'
|
||||
'print(" CHILD subint: trio.run OK, result=42", '
|
||||
'flush=True)\n'
|
||||
)
|
||||
err = None
|
||||
|
||||
def _drive() -> None:
|
||||
nonlocal err
|
||||
try:
|
||||
_interpreters.exec(child_interp, subint_bootstrap)
|
||||
except BaseException as e:
|
||||
err = e
|
||||
|
||||
t = threading.Thread(
|
||||
target=_drive,
|
||||
name='child-subint-trio-thread',
|
||||
daemon=False,
|
||||
)
|
||||
t.start()
|
||||
t.join(timeout=10.0)
|
||||
|
||||
try:
|
||||
_interpreters.destroy(child_interp)
|
||||
except _interpreters.InterpreterError:
|
||||
pass
|
||||
|
||||
if t.is_alive():
|
||||
run_trio_in_subint(
|
||||
_CHILD_TRIO_BOOTSTRAP,
|
||||
thread_name='child-subint-trio-thread',
|
||||
)
|
||||
except RuntimeError as err:
|
||||
print(
|
||||
' CHILD: subint trio thread did not return in 10s',
|
||||
f' CHILD: run_trio_in_subint timed out / thread '
|
||||
f'never returned: {err}',
|
||||
flush=True,
|
||||
)
|
||||
return 3
|
||||
if err is not None:
|
||||
except BaseException as err:
|
||||
print(
|
||||
f' CHILD: subint bootstrap raised: '
|
||||
f'{type(err).__name__}: {err}',
|
||||
|
|
@ -403,9 +336,9 @@ def scenario_full_architecture() -> int:
|
|||
'[arch-full] worker-thread fork + child runs trio in a '
|
||||
'subint (end-to-end proposed arch)'
|
||||
)
|
||||
return _fork_from_worker_thread(
|
||||
return _run_worker_thread_fork_scenario(
|
||||
'full_architecture',
|
||||
child_target=_child_trio_in_subint,
|
||||
label='full_architecture',
|
||||
)
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,337 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Forkserver-style `os.fork()` primitives for the `subint`-hosted
|
||||
actor model.
|
||||
|
||||
Background
|
||||
----------
|
||||
CPython refuses `os.fork()` from a non-main sub-interpreter:
|
||||
`PyOS_AfterFork_Child()` →
|
||||
`_PyInterpreterState_DeleteExceptMain()` gates on the calling
|
||||
thread's tstate belonging to the main interpreter and aborts
|
||||
the forked child otherwise. The full walkthrough (with source
|
||||
refs) lives in
|
||||
`ai/conc-anal/subint_fork_blocked_by_cpython_post_fork_issue.md`.
|
||||
|
||||
However `os.fork()` from a regular `threading.Thread` attached
|
||||
to the *main* interpreter — i.e. a worker thread that has
|
||||
never entered a subint — works cleanly. Empirically validated
|
||||
across four scenarios by
|
||||
`ai/conc-anal/subint_fork_from_main_thread_smoketest.py` on
|
||||
py3.14.
|
||||
|
||||
This submodule lifts the validated primitives out of the
|
||||
smoke-test and into tractor proper, so they can eventually be
|
||||
wired into a real "subint forkserver" spawn backend — where:
|
||||
|
||||
- A dedicated main-interp worker thread owns all `os.fork()`
|
||||
calls (never enters a subint).
|
||||
- The tractor parent-actor's `trio.run()` lives in a
|
||||
sub-interpreter on a different worker thread.
|
||||
- When a spawn is requested, the trio-task signals the
|
||||
forkserver thread; the forkserver forks; child re-enters
|
||||
the same pattern (trio in a subint + forkserver on main).
|
||||
|
||||
This mirrors the stdlib `multiprocessing.forkserver` design
|
||||
but keeps the forkserver in-process for faster spawn latency
|
||||
and inherited parent state.
|
||||
|
||||
Status
|
||||
------
|
||||
**EXPERIMENTAL** — primitives only. Not yet wired into
|
||||
`tractor.spawn._spawn`'s backend registry. The next step is
|
||||
to drive these from a parent-side `trio.run()` and hook the
|
||||
returned child pid into tractor's normal actor-nursery/IPC
|
||||
machinery.
|
||||
|
||||
See also
|
||||
--------
|
||||
- `tractor.spawn._subint_fork` — the stub for the
|
||||
fork-from-subint strategy that DIDN'T work (kept as
|
||||
in-tree documentation of the attempt + CPython-level
|
||||
block).
|
||||
- `ai/conc-anal/subint_fork_blocked_by_cpython_post_fork_issue.md`
|
||||
— the CPython source walkthrough.
|
||||
- `ai/conc-anal/subint_fork_from_main_thread_smoketest.py`
|
||||
— the standalone feasibility check (now delegates to
|
||||
this module for the primitives it exercises).
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
import os
|
||||
import signal
|
||||
import threading
|
||||
from typing import (
|
||||
Callable,
|
||||
TYPE_CHECKING,
|
||||
)
|
||||
|
||||
from tractor.log import get_logger
|
||||
|
||||
if TYPE_CHECKING:
|
||||
pass
|
||||
|
||||
|
||||
log = get_logger('tractor')
|
||||
|
||||
|
||||
# Feature-gate: py3.14+ via the public `concurrent.interpreters`
|
||||
# wrapper. Matches the gate in `tractor.spawn._subint` —
|
||||
# see that module's docstring for why we require the public
|
||||
# API's presence even though we reach into the private
|
||||
# `_interpreters` C module for actual calls.
|
||||
try:
|
||||
from concurrent import interpreters as _public_interpreters # noqa: F401 # type: ignore
|
||||
import _interpreters # type: ignore
|
||||
_has_subints: bool = True
|
||||
except ImportError:
|
||||
_interpreters = None # type: ignore
|
||||
_has_subints: bool = False
|
||||
|
||||
|
||||
def _format_child_exit(
|
||||
status: int,
|
||||
) -> str:
|
||||
'''
|
||||
Render `os.waitpid()`-returned status as a short human
|
||||
string (`'rc=0'` / `'signal=SIGABRT'` / etc.) for log
|
||||
output.
|
||||
|
||||
'''
|
||||
if os.WIFEXITED(status):
|
||||
return f'rc={os.WEXITSTATUS(status)}'
|
||||
elif os.WIFSIGNALED(status):
|
||||
sig: int = os.WTERMSIG(status)
|
||||
return f'signal={signal.Signals(sig).name}'
|
||||
else:
|
||||
return f'raw_status={status}'
|
||||
|
||||
|
||||
def wait_child(
|
||||
pid: int,
|
||||
*,
|
||||
expect_exit_ok: bool = True,
|
||||
) -> tuple[bool, str]:
|
||||
'''
|
||||
`os.waitpid()` + classify the child's exit as
|
||||
expected-or-not.
|
||||
|
||||
`expect_exit_ok=True` → expect clean `rc=0`. `False` →
|
||||
expect abnormal death (any signal or nonzero rc). Used
|
||||
by the control-case smoke-test scenario where CPython
|
||||
is meant to abort the child.
|
||||
|
||||
Returns `(ok, status_str)` — `ok` reflects whether the
|
||||
observed outcome matches `expect_exit_ok`, `status_str`
|
||||
is a short render of the actual status.
|
||||
|
||||
'''
|
||||
_, status = os.waitpid(pid, 0)
|
||||
exited_normally: bool = (
|
||||
os.WIFEXITED(status)
|
||||
and
|
||||
os.WEXITSTATUS(status) == 0
|
||||
)
|
||||
ok: bool = (
|
||||
exited_normally
|
||||
if expect_exit_ok
|
||||
else not exited_normally
|
||||
)
|
||||
return ok, _format_child_exit(status)
|
||||
|
||||
|
||||
def fork_from_worker_thread(
|
||||
child_target: Callable[[], int] | None = None,
|
||||
*,
|
||||
thread_name: str = 'subint-forkserver',
|
||||
join_timeout: float = 10.0,
|
||||
|
||||
) -> int:
|
||||
'''
|
||||
`os.fork()` from a main-interp worker thread; return the
|
||||
forked child's pid.
|
||||
|
||||
The calling context **must** be the main interpreter
|
||||
(not a subinterpreter) — that's the whole point of this
|
||||
primitive. A regular `threading.Thread(target=...)`
|
||||
spawned from main-interp code satisfies this
|
||||
automatically because Python attaches the thread's
|
||||
tstate to the *calling* interpreter, and our main
|
||||
thread's calling interp is always main.
|
||||
|
||||
If `child_target` is provided, it runs IN the forked
|
||||
child process before `os._exit` is called. The callable
|
||||
should return an int used as the child's exit rc. If
|
||||
`child_target` is None, the child `_exit(0)`s immediately
|
||||
(useful for the baseline sanity case).
|
||||
|
||||
On the PARENT side, this function drives the worker
|
||||
thread to completion (`fork()` returns near-instantly;
|
||||
the thread is expected to exit promptly) and then
|
||||
returns the forked child's pid. Raises `RuntimeError`
|
||||
if the worker thread fails to return within
|
||||
`join_timeout` seconds — that'd be an unexpected CPython
|
||||
pathology.
|
||||
|
||||
'''
|
||||
if not _has_subints:
|
||||
raise RuntimeError(
|
||||
'subint-forkserver primitives require Python '
|
||||
'3.14+ (public `concurrent.interpreters` module '
|
||||
'not present on this runtime).'
|
||||
)
|
||||
|
||||
# Use a pipe to shuttle the forked child's pid from the
|
||||
# worker thread back to the caller.
|
||||
rfd, wfd = os.pipe()
|
||||
|
||||
def _worker() -> None:
|
||||
'''
|
||||
Runs on the forkserver worker thread. Forks; child
|
||||
runs `child_target` (if any) and exits; parent side
|
||||
writes the child pid to the pipe so the main-thread
|
||||
caller can retrieve it.
|
||||
|
||||
'''
|
||||
pid: int = os.fork()
|
||||
if pid == 0:
|
||||
# CHILD: close the pid-pipe ends (we don't use
|
||||
# them here), run the user callable if any, exit.
|
||||
os.close(rfd)
|
||||
os.close(wfd)
|
||||
rc: int = 0
|
||||
if child_target is not None:
|
||||
try:
|
||||
rc = child_target() or 0
|
||||
except BaseException as err:
|
||||
log.error(
|
||||
f'subint-forkserver child_target '
|
||||
f'raised:\n'
|
||||
f'|_{type(err).__name__}: {err}'
|
||||
)
|
||||
rc = 2
|
||||
os._exit(rc)
|
||||
else:
|
||||
# PARENT (still inside the worker thread):
|
||||
# hand the child pid back to main via pipe.
|
||||
os.write(wfd, pid.to_bytes(8, 'little'))
|
||||
|
||||
worker: threading.Thread = threading.Thread(
|
||||
target=_worker,
|
||||
name=thread_name,
|
||||
daemon=False,
|
||||
)
|
||||
worker.start()
|
||||
worker.join(timeout=join_timeout)
|
||||
if worker.is_alive():
|
||||
# Pipe cleanup best-effort before bail.
|
||||
try:
|
||||
os.close(rfd)
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
os.close(wfd)
|
||||
except OSError:
|
||||
pass
|
||||
raise RuntimeError(
|
||||
f'subint-forkserver worker thread '
|
||||
f'{thread_name!r} did not return within '
|
||||
f'{join_timeout}s — this is unexpected since '
|
||||
f'`os.fork()` should return near-instantly on '
|
||||
f'the parent side.'
|
||||
)
|
||||
|
||||
pid_bytes: bytes = os.read(rfd, 8)
|
||||
os.close(rfd)
|
||||
os.close(wfd)
|
||||
pid: int = int.from_bytes(pid_bytes, 'little')
|
||||
log.runtime(
|
||||
f'subint-forkserver forked child\n'
|
||||
f'(>\n'
|
||||
f' |_pid={pid}\n'
|
||||
)
|
||||
return pid
|
||||
|
||||
|
||||
def run_trio_in_subint(
|
||||
bootstrap: str,
|
||||
*,
|
||||
thread_name: str = 'subint-trio',
|
||||
join_timeout: float = 10.0,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Helper for use inside a forked child: create a fresh
|
||||
legacy-config sub-interpreter and drive the given
|
||||
`bootstrap` code string through `_interpreters.exec()`
|
||||
on a dedicated worker thread.
|
||||
|
||||
Typical `bootstrap` content imports `trio`, defines an
|
||||
async entry, calls `trio.run()`. See
|
||||
`tractor.spawn._subint.subint_proc` for the matching
|
||||
pattern tractor uses at the sub-actor level.
|
||||
|
||||
Destroys the subint after the thread joins.
|
||||
|
||||
'''
|
||||
if not _has_subints:
|
||||
raise RuntimeError(
|
||||
'subint-forkserver primitives require Python '
|
||||
'3.14+.'
|
||||
)
|
||||
|
||||
interp_id: int = _interpreters.create('legacy')
|
||||
log.runtime(
|
||||
f'Created child-side subint for trio.run()\n'
|
||||
f'(>\n'
|
||||
f' |_interp_id={interp_id}\n'
|
||||
)
|
||||
|
||||
err: BaseException | None = None
|
||||
|
||||
def _drive() -> None:
|
||||
nonlocal err
|
||||
try:
|
||||
_interpreters.exec(interp_id, bootstrap)
|
||||
except BaseException as e:
|
||||
err = e
|
||||
|
||||
worker: threading.Thread = threading.Thread(
|
||||
target=_drive,
|
||||
name=thread_name,
|
||||
daemon=False,
|
||||
)
|
||||
worker.start()
|
||||
worker.join(timeout=join_timeout)
|
||||
|
||||
try:
|
||||
_interpreters.destroy(interp_id)
|
||||
except _interpreters.InterpreterError as e:
|
||||
log.warning(
|
||||
f'Could not destroy child-side subint '
|
||||
f'{interp_id}: {e}'
|
||||
)
|
||||
|
||||
if worker.is_alive():
|
||||
raise RuntimeError(
|
||||
f'child-side subint trio-driver thread '
|
||||
f'{thread_name!r} did not return within '
|
||||
f'{join_timeout}s.'
|
||||
)
|
||||
if err is not None:
|
||||
raise err
|
||||
Loading…
Reference in New Issue