Fix subint destroy race via dedicated OS thread
`trio.to_thread.run_sync(_interpreters.exec, ...)` runs `exec()` on a cached worker thread — and when that thread is returned to the cache after the subint's `trio.run()` exits, CPython still keeps the subint's tstate attached to the (now idle) worker. Result: the teardown `_interpreters.destroy(interp_id)` in the `finally` block can block the parent's trio loop indefinitely, waiting for a tstate release that only happens when the worker either picks up a new job or exits. Manifested as intermittent mid-suite hangs under `--spawn-backend=subint` — caught by a `faulthandler.dump_traceback_later()` showing the main thread stuck in `_interpreters.destroy()` at `_subint.py:293` with only an idle trio-cache worker as the other live thread. Deats, - drive the subint on a plain `threading.Thread` (not `trio.to_thread`) so the OS thread truly exits after `_interpreters.exec()` returns, releasing tstate and unblocking destroy - signal `subint_exited.set()` back to the parent trio loop from the driver thread via `trio.from_thread.run_sync(..., trio_token=...)` — capture the token at `subint_proc` entry - swallow `trio.RunFinishedError` in that signal path for the case where parent trio has already exited (proc teardown) - in the teardown `finally`, off-load the sync `driver_thread.join()` to `trio.to_thread.run_sync` (cache thread w/ no subint tstate → safe) so we actually wait for the driver to exit before `_interpreters.destroy()` (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codesubint_forkserver_backend
parent
8a8d01e076
commit
31cbd11a5b
|
|
@ -36,7 +36,7 @@ introspectable) but `subint_proc()` raises.
|
||||||
'''
|
'''
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
import sys
|
import sys
|
||||||
from functools import partial
|
import threading
|
||||||
from typing import (
|
from typing import (
|
||||||
Any,
|
Any,
|
||||||
TYPE_CHECKING,
|
TYPE_CHECKING,
|
||||||
|
|
@ -188,28 +188,47 @@ async def subint_proc(
|
||||||
subint_exited = trio.Event()
|
subint_exited = trio.Event()
|
||||||
ipc_server: _server.Server = actor_nursery._actor.ipc_server
|
ipc_server: _server.Server = actor_nursery._actor.ipc_server
|
||||||
|
|
||||||
async def _drive_subint() -> None:
|
# Capture a trio token so the driver thread can signal
|
||||||
|
# `subint_exited.set()` back into the parent trio loop.
|
||||||
|
trio_token = trio.lowlevel.current_trio_token()
|
||||||
|
|
||||||
|
def _subint_target() -> None:
|
||||||
'''
|
'''
|
||||||
Block a worker OS-thread on `_interpreters.exec()` for
|
Dedicated OS-thread target: runs `_interpreters.exec()`
|
||||||
the lifetime of the sub-actor. When the subint's inner
|
once and exits.
|
||||||
`trio.run()` exits, `exec()` returns and the thread
|
|
||||||
naturally joins.
|
We intentionally use a plain `threading.Thread` here
|
||||||
|
rather than `trio.to_thread.run_sync()` because trio's
|
||||||
|
thread cache would *recycle* the same OS thread for
|
||||||
|
subsequent jobs — leaving CPython's subinterpreter
|
||||||
|
tstate attached to that cached worker and blocking
|
||||||
|
`_interpreters.destroy()` in the teardown block below.
|
||||||
|
A dedicated thread truly exits after `exec()` returns,
|
||||||
|
releasing the tstate so destroy can proceed.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
try:
|
try:
|
||||||
await trio.to_thread.run_sync(
|
_interpreters.exec(interp_id, bootstrap)
|
||||||
_interpreters.exec,
|
|
||||||
interp_id,
|
|
||||||
bootstrap,
|
|
||||||
abandon_on_cancel=False,
|
|
||||||
)
|
|
||||||
finally:
|
finally:
|
||||||
subint_exited.set()
|
try:
|
||||||
|
trio.from_thread.run_sync(
|
||||||
|
subint_exited.set,
|
||||||
|
trio_token=trio_token,
|
||||||
|
)
|
||||||
|
except trio.RunFinishedError:
|
||||||
|
# parent trio loop has already exited (proc
|
||||||
|
# teardown); nothing to signal.
|
||||||
|
pass
|
||||||
|
|
||||||
|
driver_thread = threading.Thread(
|
||||||
|
target=_subint_target,
|
||||||
|
name=f'subint-driver[{interp_id}]',
|
||||||
|
daemon=False,
|
||||||
|
)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
try:
|
try:
|
||||||
async with trio.open_nursery() as thread_n:
|
driver_thread.start()
|
||||||
thread_n.start_soon(_drive_subint)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
event, chan = await ipc_server.wait_for_peer(uid)
|
event, chan = await ipc_server.wait_for_peer(uid)
|
||||||
|
|
@ -259,7 +278,7 @@ async def subint_proc(
|
||||||
# Soft-kill analog: wait for the subint to exit
|
# Soft-kill analog: wait for the subint to exit
|
||||||
# naturally; on cancel, send a graceful cancel
|
# naturally; on cancel, send a graceful cancel
|
||||||
# via the IPC portal and then wait for the
|
# via the IPC portal and then wait for the
|
||||||
# driver thread to finish so `interp.close()`
|
# driver thread to finish so `_interpreters.destroy()`
|
||||||
# won't race with a running interpreter.
|
# won't race with a running interpreter.
|
||||||
try:
|
try:
|
||||||
await subint_exited.wait()
|
await subint_exited.wait()
|
||||||
|
|
@ -285,10 +304,17 @@ async def subint_proc(
|
||||||
lifecycle_n.cancel_scope.cancel()
|
lifecycle_n.cancel_scope.cancel()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# The driver thread has exited (either natural subint
|
# Ensure the driver thread is *fully* joined before
|
||||||
# completion or post-cancel teardown) so the subint is
|
# destroying the subint. `subint_exited.set()` fires
|
||||||
# no longer running — safe to destroy.
|
# from inside the thread but returns to trio before
|
||||||
|
# the thread's bootstrap cleanup finishes; calling
|
||||||
|
# `destroy()` too eagerly can race with tstate
|
||||||
|
# teardown. Off-load the blocking `.join()` to a
|
||||||
|
# cache thread (which carries no subint tstate of
|
||||||
|
# its own, so no cache conflict).
|
||||||
with trio.CancelScope(shield=True):
|
with trio.CancelScope(shield=True):
|
||||||
|
if driver_thread.is_alive():
|
||||||
|
await trio.to_thread.run_sync(driver_thread.join)
|
||||||
try:
|
try:
|
||||||
_interpreters.destroy(interp_id)
|
_interpreters.destroy(interp_id)
|
||||||
log.runtime(
|
log.runtime(
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue