From c20b05e18179bdb3c52d99fc4e1b5c5e3912a7bf Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 23 Apr 2026 16:06:45 -0400 Subject: [PATCH] Use `pidfd` for cancellable `_ForkedProc.wait` MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Two coordinated improvements to the `subint_forkserver` backend: 1. Replace `trio.to_thread.run_sync(os.waitpid, ..., abandon_on_cancel=False)` in `_ForkedProc.wait()` with `trio.lowlevel.wait_readable(pidfd)`. The prior version blocked a trio cache thread on a sync syscall — outer cancel scopes couldn't unwedge it when something downstream got stuck. Same pattern `trio.Process.wait()` and `proc_waiter` (the mp backend) already use. 2. Drop the `@pytest.mark.xfail(strict=True)` from `test_orphaned_subactor_sigint_cleanup_DRAFT` — the test now PASSES after 0cd0b633 (fork-child FD scrub). Same root cause as the nested-cancel hang: inherited IPC/trio FDs were poisoning the child's event loop. Closing them lets SIGINT propagation work as designed. Deats, - `_ForkedProc.__init__` opens a pidfd via `os.pidfd_open(pid)` (Linux 5.3+, Python 3.9+) - `wait()` parks on `trio.lowlevel.wait_readable()`, then non-blocking `waitpid(WNOHANG)` to collect the exit status (correct since the pidfd signal IS the child-exit notification) - `ChildProcessError` swallow handles the rare race where someone else reaps first - pidfd closed after `wait()` completes (one-shot semantics) + `__del__` belt-and-braces for unexpected-teardown paths - test docstring's `@xfail` block replaced with a `# NOTE` comment explaining the historical context + cross-ref to the conc-anal doc; test remains in place as a regression guard The two changes are interdependent — the cancellable `wait()` matters for the same nested- cancel scenarios the FD scrub fixes, since the original deadlock had trio cache workers wedged in `os.waitpid` swallowing the outer cancel. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- tests/spawn/test_subint_forkserver.py | 25 ++++------- tractor/spawn/_subint_forkserver.py | 62 ++++++++++++++++++++++----- 2 files changed, 60 insertions(+), 27 deletions(-) diff --git a/tests/spawn/test_subint_forkserver.py b/tests/spawn/test_subint_forkserver.py index c065e66f..abc55cf5 100644 --- a/tests/spawn/test_subint_forkserver.py +++ b/tests/spawn/test_subint_forkserver.py @@ -446,22 +446,15 @@ def _process_alive(pid: int) -> bool: return False -@pytest.mark.xfail( - strict=True, - reason=( - 'subint_forkserver orphan-child SIGINT hang: trio\'s ' - 'event loop stays wedged in `epoll_wait` despite the ' - 'SIGINT handler being correctly installed and the ' - 'signal being delivered at the kernel level. NOT a ' - '"handler missing on non-main thread" issue — post-' - 'fork the worker IS `threading.main_thread()` and ' - 'trio\'s `KIManager` handler is confirmed installed. ' - 'Full analysis + ruled-out hypotheses + fix directions ' - 'in `ai/conc-anal/' - 'subint_forkserver_orphan_sigint_hang_issue.md`. ' - 'Flip this mark (or drop it) once the gap is closed.' - ), -) +# NOTE: was previously `@pytest.mark.xfail(strict=True, ...)` +# for the orphan-SIGINT hang documented in +# `ai/conc-anal/subint_forkserver_orphan_sigint_hang_issue.md` +# — now passes after the fork-child FD-hygiene fix in +# `tractor.spawn._subint_forkserver._close_inherited_fds()`: +# closing all inherited FDs (including the parent's IPC +# listener + trio-epoll + wakeup-pipe FDs) lets the child's +# trio event loop respond cleanly to external SIGINT. +# Leaving the test in place as a regression guard. @pytest.mark.timeout( 30, method='thread', diff --git a/tractor/spawn/_subint_forkserver.py b/tractor/spawn/_subint_forkserver.py index 34c5dd97..f29474e2 100644 --- a/tractor/spawn/_subint_forkserver.py +++ b/tractor/spawn/_subint_forkserver.py @@ -526,6 +526,18 @@ class _ForkedProc: self.stdin = None self.stdout = None self.stderr = None + # pidfd (Linux 5.3+, Python 3.9+) — a file descriptor + # referencing this child process which becomes readable + # once the child exits. Enables a fully trio-cancellable + # wait via `trio.lowlevel.wait_readable()` — same + # pattern `trio.Process.wait()` uses under the hood, and + # the same pattern `multiprocessing.Process.sentinel` + # uses for `tractor.spawn._spawn.proc_waiter()`. Without + # this, waiting via `trio.to_thread.run_sync(os.waitpid, + # ...)` blocks a cache thread on a sync syscall that is + # NOT trio-cancellable, which prevents outer cancel + # scopes from unwedging a stuck-child cancel cascade. + self._pidfd: int = os.pidfd_open(pid) def poll(self) -> int | None: ''' @@ -555,22 +567,40 @@ class _ForkedProc: async def wait(self) -> int: ''' - Async blocking wait for the child's exit, off-loaded - to a trio cache thread so we don't block the event - loop on `waitpid()`. Safe to call multiple times; - subsequent calls return the cached rc without - re-issuing the syscall. + Async, fully-trio-cancellable wait for the child's + exit. Uses `trio.lowlevel.wait_readable()` on the + `pidfd` sentinel — same pattern as `trio.Process.wait` + and `tractor.spawn._spawn.proc_waiter` (mp backend). + + Safe to call multiple times; subsequent calls return + the cached rc without re-issuing the syscall. ''' if self._returncode is not None: return self._returncode - _, status = await trio.to_thread.run_sync( - os.waitpid, - self.pid, - 0, - abandon_on_cancel=False, - ) + # Park until the pidfd becomes readable — the OS + # signals this exactly once on child exit. Cancellable + # via any outer trio cancel scope (this was the key + # fix vs. the prior `to_thread.run_sync(os.waitpid, + # abandon_on_cancel=False)` which blocked a thread on + # a sync syscall and swallowed cancels). + await trio.lowlevel.wait_readable(self._pidfd) + # pidfd signaled → reap non-blocking to collect the + # exit status. `WNOHANG` here is correct: by the time + # the pidfd is readable, `waitpid()` won't block. + try: + _, status = os.waitpid(self.pid, os.WNOHANG) + except ChildProcessError: + # already reaped by something else + status = 0 self._returncode = self._parse_status(status) + # pidfd is one-shot; close it so we don't leak fds + # across many spawns. + try: + os.close(self._pidfd) + except OSError: + pass + self._pidfd = -1 return self._returncode def kill(self) -> None: @@ -584,6 +614,16 @@ class _ForkedProc: except ProcessLookupError: pass + def __del__(self) -> None: + # belt-and-braces: close the pidfd if `wait()` wasn't + # called (e.g. unexpected teardown path). + fd: int = getattr(self, '_pidfd', -1) + if fd >= 0: + try: + os.close(fd) + except OSError: + pass + def _parse_status(self, status: int) -> int: if os.WIFEXITED(status): return os.WEXITSTATUS(status)