Use `pidfd` for cancellable `_ForkedProc.wait`
Two coordinated improvements to the `subint_forkserver` backend:
1. Replace `trio.to_thread.run_sync(os.waitpid, ...,
abandon_on_cancel=False)` in `_ForkedProc.wait()`
with `trio.lowlevel.wait_readable(pidfd)`. The
prior version blocked a trio cache thread on a
sync syscall — outer cancel scopes couldn't
unwedge it when something downstream got stuck.
Same pattern `trio.Process.wait()` and
`proc_waiter` (the mp backend) already use.
2. Drop the `@pytest.mark.xfail(strict=True)` from
`test_orphaned_subactor_sigint_cleanup_DRAFT` —
the test now PASSES after 0cd0b633 (fork-child
FD scrub). Same root cause as the nested-cancel
hang: inherited IPC/trio FDs were poisoning the
child's event loop. Closing them lets SIGINT
propagation work as designed.
Deats,
- `_ForkedProc.__init__` opens a pidfd via
`os.pidfd_open(pid)` (Linux 5.3+, Python 3.9+)
- `wait()` parks on `trio.lowlevel.wait_readable()`,
then non-blocking `waitpid(WNOHANG)` to collect
the exit status (correct since the pidfd signal
IS the child-exit notification)
- `ChildProcessError` swallow handles the rare race
where someone else reaps first
- pidfd closed after `wait()` completes (one-shot
semantics) + `__del__` belt-and-braces for
unexpected-teardown paths
- test docstring's `@xfail` block replaced with a
`# NOTE` comment explaining the historical
context + cross-ref to the conc-anal doc; test
remains in place as a regression guard
The two changes are interdependent — the
cancellable `wait()` matters for the same nested-
cancel scenarios the FD scrub fixes, since the
original deadlock had trio cache workers wedged in
`os.waitpid` swallowing the outer cancel.
(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
subint_forkserver_backend
parent
9993db0193
commit
c20b05e181
|
|
@ -446,22 +446,15 @@ def _process_alive(pid: int) -> bool:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.xfail(
|
# NOTE: was previously `@pytest.mark.xfail(strict=True, ...)`
|
||||||
strict=True,
|
# for the orphan-SIGINT hang documented in
|
||||||
reason=(
|
# `ai/conc-anal/subint_forkserver_orphan_sigint_hang_issue.md`
|
||||||
'subint_forkserver orphan-child SIGINT hang: trio\'s '
|
# — now passes after the fork-child FD-hygiene fix in
|
||||||
'event loop stays wedged in `epoll_wait` despite the '
|
# `tractor.spawn._subint_forkserver._close_inherited_fds()`:
|
||||||
'SIGINT handler being correctly installed and the '
|
# closing all inherited FDs (including the parent's IPC
|
||||||
'signal being delivered at the kernel level. NOT a '
|
# listener + trio-epoll + wakeup-pipe FDs) lets the child's
|
||||||
'"handler missing on non-main thread" issue — post-'
|
# trio event loop respond cleanly to external SIGINT.
|
||||||
'fork the worker IS `threading.main_thread()` and '
|
# Leaving the test in place as a regression guard.
|
||||||
'trio\'s `KIManager` handler is confirmed installed. '
|
|
||||||
'Full analysis + ruled-out hypotheses + fix directions '
|
|
||||||
'in `ai/conc-anal/'
|
|
||||||
'subint_forkserver_orphan_sigint_hang_issue.md`. '
|
|
||||||
'Flip this mark (or drop it) once the gap is closed.'
|
|
||||||
),
|
|
||||||
)
|
|
||||||
@pytest.mark.timeout(
|
@pytest.mark.timeout(
|
||||||
30,
|
30,
|
||||||
method='thread',
|
method='thread',
|
||||||
|
|
|
||||||
|
|
@ -526,6 +526,18 @@ class _ForkedProc:
|
||||||
self.stdin = None
|
self.stdin = None
|
||||||
self.stdout = None
|
self.stdout = None
|
||||||
self.stderr = None
|
self.stderr = None
|
||||||
|
# pidfd (Linux 5.3+, Python 3.9+) — a file descriptor
|
||||||
|
# referencing this child process which becomes readable
|
||||||
|
# once the child exits. Enables a fully trio-cancellable
|
||||||
|
# wait via `trio.lowlevel.wait_readable()` — same
|
||||||
|
# pattern `trio.Process.wait()` uses under the hood, and
|
||||||
|
# the same pattern `multiprocessing.Process.sentinel`
|
||||||
|
# uses for `tractor.spawn._spawn.proc_waiter()`. Without
|
||||||
|
# this, waiting via `trio.to_thread.run_sync(os.waitpid,
|
||||||
|
# ...)` blocks a cache thread on a sync syscall that is
|
||||||
|
# NOT trio-cancellable, which prevents outer cancel
|
||||||
|
# scopes from unwedging a stuck-child cancel cascade.
|
||||||
|
self._pidfd: int = os.pidfd_open(pid)
|
||||||
|
|
||||||
def poll(self) -> int | None:
|
def poll(self) -> int | None:
|
||||||
'''
|
'''
|
||||||
|
|
@ -555,22 +567,40 @@ class _ForkedProc:
|
||||||
|
|
||||||
async def wait(self) -> int:
|
async def wait(self) -> int:
|
||||||
'''
|
'''
|
||||||
Async blocking wait for the child's exit, off-loaded
|
Async, fully-trio-cancellable wait for the child's
|
||||||
to a trio cache thread so we don't block the event
|
exit. Uses `trio.lowlevel.wait_readable()` on the
|
||||||
loop on `waitpid()`. Safe to call multiple times;
|
`pidfd` sentinel — same pattern as `trio.Process.wait`
|
||||||
subsequent calls return the cached rc without
|
and `tractor.spawn._spawn.proc_waiter` (mp backend).
|
||||||
re-issuing the syscall.
|
|
||||||
|
Safe to call multiple times; subsequent calls return
|
||||||
|
the cached rc without re-issuing the syscall.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
if self._returncode is not None:
|
if self._returncode is not None:
|
||||||
return self._returncode
|
return self._returncode
|
||||||
_, status = await trio.to_thread.run_sync(
|
# Park until the pidfd becomes readable — the OS
|
||||||
os.waitpid,
|
# signals this exactly once on child exit. Cancellable
|
||||||
self.pid,
|
# via any outer trio cancel scope (this was the key
|
||||||
0,
|
# fix vs. the prior `to_thread.run_sync(os.waitpid,
|
||||||
abandon_on_cancel=False,
|
# abandon_on_cancel=False)` which blocked a thread on
|
||||||
)
|
# a sync syscall and swallowed cancels).
|
||||||
|
await trio.lowlevel.wait_readable(self._pidfd)
|
||||||
|
# pidfd signaled → reap non-blocking to collect the
|
||||||
|
# exit status. `WNOHANG` here is correct: by the time
|
||||||
|
# the pidfd is readable, `waitpid()` won't block.
|
||||||
|
try:
|
||||||
|
_, status = os.waitpid(self.pid, os.WNOHANG)
|
||||||
|
except ChildProcessError:
|
||||||
|
# already reaped by something else
|
||||||
|
status = 0
|
||||||
self._returncode = self._parse_status(status)
|
self._returncode = self._parse_status(status)
|
||||||
|
# pidfd is one-shot; close it so we don't leak fds
|
||||||
|
# across many spawns.
|
||||||
|
try:
|
||||||
|
os.close(self._pidfd)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
self._pidfd = -1
|
||||||
return self._returncode
|
return self._returncode
|
||||||
|
|
||||||
def kill(self) -> None:
|
def kill(self) -> None:
|
||||||
|
|
@ -584,6 +614,16 @@ class _ForkedProc:
|
||||||
except ProcessLookupError:
|
except ProcessLookupError:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
def __del__(self) -> None:
|
||||||
|
# belt-and-braces: close the pidfd if `wait()` wasn't
|
||||||
|
# called (e.g. unexpected teardown path).
|
||||||
|
fd: int = getattr(self, '_pidfd', -1)
|
||||||
|
if fd >= 0:
|
||||||
|
try:
|
||||||
|
os.close(fd)
|
||||||
|
except OSError:
|
||||||
|
pass
|
||||||
|
|
||||||
def _parse_status(self, status: int) -> int:
|
def _parse_status(self, status: int) -> int:
|
||||||
if os.WIFEXITED(status):
|
if os.WIFEXITED(status):
|
||||||
return os.WEXITSTATUS(status)
|
return os.WEXITSTATUS(status)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue