From 3869a9b46836d8ead5ca52dfa56bd4b6704a1346 Mon Sep 17 00:00:00 2001 From: goodboy Date: Sat, 18 Apr 2026 00:21:49 -0400 Subject: [PATCH] Fix subint destroy race via dedicated OS thread MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `trio.to_thread.run_sync(_interpreters.exec, ...)` runs `exec()` on a cached worker thread — and when that thread is returned to the cache after the subint's `trio.run()` exits, CPython still keeps the subint's tstate attached to the (now idle) worker. Result: the teardown `_interpreters.destroy(interp_id)` in the `finally` block can block the parent's trio loop indefinitely, waiting for a tstate release that only happens when the worker either picks up a new job or exits. Manifested as intermittent mid-suite hangs under `--spawn-backend=subint` — caught by a `faulthandler.dump_traceback_later()` showing the main thread stuck in `_interpreters.destroy()` at `_subint.py:293` with only an idle trio-cache worker as the other live thread. Deats, - drive the subint on a plain `threading.Thread` (not `trio.to_thread`) so the OS thread truly exits after `_interpreters.exec()` returns, releasing tstate and unblocking destroy - signal `subint_exited.set()` back to the parent trio loop from the driver thread via `trio.from_thread.run_sync(..., trio_token=...)` — capture the token at `subint_proc` entry - swallow `trio.RunFinishedError` in that signal path for the case where parent trio has already exited (proc teardown) - in the teardown `finally`, off-load the sync `driver_thread.join()` to `trio.to_thread.run_sync` (cache thread w/ no subint tstate → safe) so we actually wait for the driver to exit before `_interpreters.destroy()` (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tractor/spawn/_subint.py | 198 ++++++++++++++++++++++----------------- 1 file changed, 112 insertions(+), 86 deletions(-) diff --git a/tractor/spawn/_subint.py b/tractor/spawn/_subint.py index a4eb9a70..4b4afa5f 100644 --- a/tractor/spawn/_subint.py +++ b/tractor/spawn/_subint.py @@ -36,7 +36,7 @@ introspectable) but `subint_proc()` raises. ''' from __future__ import annotations import sys -from functools import partial +import threading from typing import ( Any, TYPE_CHECKING, @@ -188,107 +188,133 @@ async def subint_proc( subint_exited = trio.Event() ipc_server: _server.Server = actor_nursery._actor.ipc_server - async def _drive_subint() -> None: + # Capture a trio token so the driver thread can signal + # `subint_exited.set()` back into the parent trio loop. + trio_token = trio.lowlevel.current_trio_token() + + def _subint_target() -> None: ''' - Block a worker OS-thread on `_interpreters.exec()` for - the lifetime of the sub-actor. When the subint's inner - `trio.run()` exits, `exec()` returns and the thread - naturally joins. + Dedicated OS-thread target: runs `_interpreters.exec()` + once and exits. + + We intentionally use a plain `threading.Thread` here + rather than `trio.to_thread.run_sync()` because trio's + thread cache would *recycle* the same OS thread for + subsequent jobs — leaving CPython's subinterpreter + tstate attached to that cached worker and blocking + `_interpreters.destroy()` in the teardown block below. + A dedicated thread truly exits after `exec()` returns, + releasing the tstate so destroy can proceed. ''' try: - await trio.to_thread.run_sync( - _interpreters.exec, - interp_id, - bootstrap, - abandon_on_cancel=False, - ) + _interpreters.exec(interp_id, bootstrap) finally: - subint_exited.set() + try: + trio.from_thread.run_sync( + subint_exited.set, + trio_token=trio_token, + ) + except trio.RunFinishedError: + # parent trio loop has already exited (proc + # teardown); nothing to signal. + pass + + driver_thread = threading.Thread( + target=_subint_target, + name=f'subint-driver[{interp_id}]', + daemon=False, + ) try: try: - async with trio.open_nursery() as thread_n: - thread_n.start_soon(_drive_subint) + driver_thread.start() + try: + event, chan = await ipc_server.wait_for_peer(uid) + except trio.Cancelled: + cancelled_during_spawn = True + raise + + portal = Portal(chan) + actor_nursery._children[uid] = ( + subactor, + interp_id, # proxy for the normal `proc` slot + portal, + ) + + sspec = msgtypes.SpawnSpec( + _parent_main_data=subactor._parent_main_data, + enable_modules=subactor.enable_modules, + reg_addrs=subactor.reg_addrs, + bind_addrs=bind_addrs, + _runtime_vars=_runtime_vars, + ) + log.runtime( + f'Sending spawn spec to subint child\n' + f'{{}}=> {chan.aid.reprol()!r}\n' + f'\n' + f'{pretty_struct.pformat(sspec)}\n' + ) + await chan.send(sspec) + + curr_actor: Actor = current_actor() + curr_actor._actoruid2nursery[uid] = actor_nursery + + task_status.started(portal) + + with trio.CancelScope(shield=True): + await actor_nursery._join_procs.wait() + + async with trio.open_nursery() as lifecycle_n: + if portal in actor_nursery._cancel_after_result_on_exit: + lifecycle_n.start_soon( + cancel_on_completion, + portal, + subactor, + errors, + ) + + # Soft-kill analog: wait for the subint to exit + # naturally; on cancel, send a graceful cancel + # via the IPC portal and then wait for the + # driver thread to finish so `_interpreters.destroy()` + # won't race with a running interpreter. try: - event, chan = await ipc_server.wait_for_peer(uid) + await subint_exited.wait() except trio.Cancelled: - cancelled_during_spawn = True - raise - - portal = Portal(chan) - actor_nursery._children[uid] = ( - subactor, - interp_id, # proxy for the normal `proc` slot - portal, - ) - - sspec = msgtypes.SpawnSpec( - _parent_main_data=subactor._parent_main_data, - enable_modules=subactor.enable_modules, - reg_addrs=subactor.reg_addrs, - bind_addrs=bind_addrs, - _runtime_vars=_runtime_vars, - ) - log.runtime( - f'Sending spawn spec to subint child\n' - f'{{}}=> {chan.aid.reprol()!r}\n' - f'\n' - f'{pretty_struct.pformat(sspec)}\n' - ) - await chan.send(sspec) - - curr_actor: Actor = current_actor() - curr_actor._actoruid2nursery[uid] = actor_nursery - - task_status.started(portal) - - with trio.CancelScope(shield=True): - await actor_nursery._join_procs.wait() - - async with trio.open_nursery() as lifecycle_n: - if portal in actor_nursery._cancel_after_result_on_exit: - lifecycle_n.start_soon( - cancel_on_completion, - portal, - subactor, - errors, + with trio.CancelScope(shield=True): + log.cancel( + f'Soft-killing subint sub-actor\n' + f'c)=> {chan.aid.reprol()}\n' + f' |_interp_id={interp_id}\n' ) - - # Soft-kill analog: wait for the subint to exit - # naturally; on cancel, send a graceful cancel - # via the IPC portal and then wait for the - # driver thread to finish so `interp.close()` - # won't race with a running interpreter. - try: + try: + await portal.cancel_actor() + except ( + trio.BrokenResourceError, + trio.ClosedResourceError, + ): + # channel already down — subint will + # exit on its own timeline + pass await subint_exited.wait() - except trio.Cancelled: - with trio.CancelScope(shield=True): - log.cancel( - f'Soft-killing subint sub-actor\n' - f'c)=> {chan.aid.reprol()}\n' - f' |_interp_id={interp_id}\n' - ) - try: - await portal.cancel_actor() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError, - ): - # channel already down — subint will - # exit on its own timeline - pass - await subint_exited.wait() - raise - finally: - lifecycle_n.cancel_scope.cancel() + raise + finally: + lifecycle_n.cancel_scope.cancel() finally: - # The driver thread has exited (either natural subint - # completion or post-cancel teardown) so the subint is - # no longer running — safe to destroy. + # Ensure the driver thread is *fully* joined before + # destroying the subint. `subint_exited.set()` fires + # from inside the thread but returns to trio before + # the thread's bootstrap cleanup finishes; calling + # `destroy()` too eagerly can race with tstate + # teardown. Off-load the blocking `.join()` to a + # cache thread (which carries no subint tstate of + # its own, so no cache conflict). with trio.CancelScope(shield=True): + if driver_thread.is_alive(): + await trio.to_thread.run_sync(driver_thread.join) try: _interpreters.destroy(interp_id) log.runtime(