diff --git a/ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md b/ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md new file mode 100644 index 00000000..213841e9 --- /dev/null +++ b/ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md @@ -0,0 +1,221 @@ +# trio `WakeupSocketpair.drain()` busy-loop in forked child (peer-closed missed-EOF) + +## Reproducer + +```bash +./py313/bin/python -m pytest \ + tests/test_multi_program.py::test_register_duplicate_name \ + --tpt-proto=tcp \ + --spawn-backend=main_thread_forkserver \ + -v --capture=sys +``` + +Subactor pegs a CPU core indefinitely; parent test +hangs waiting for the subactor. + +## Empirical evidence (caught alive) + +``` +$ sudo strace -p +recvfrom(6, "", 65536, 0, NULL, NULL) = 0 +recvfrom(6, "", 65536, 0, NULL, NULL) = 0 +recvfrom(6, "", 65536, 0, NULL, NULL) = 0 +... (no `epoll_wait`, no other syscalls, just this back-to-back) +``` + +Pattern: tight C-level `recvfrom` loop returning 0 +each call. No `epoll_wait` between iterations → +**not trio's task scheduler**. Pure synchronous C +loop. + +``` +$ sudo readlink /proc//fd/6 +socket:[] + +$ sudo lsof -p | grep ' 6u' + goodboy 6u unix 0xffff... 0t0 type=STREAM (CONNECTED) +``` + +fd=6 is an **AF_UNIX socket** in CONNECTED state. +Even though the test uses `--tpt-proto=tcp`, this fd +is NOT a tractor IPC channel — it's an internal +trio socketpair. + +## Root-cause: `WakeupSocketpair.drain()` + +`/site-packages/trio/_core/_wakeup_socketpair.py`: + +```python +class WakeupSocketpair: + def __init__(self) -> None: + self.wakeup_sock, self.write_sock = socket.socketpair() + self.wakeup_sock.setblocking(False) + self.write_sock.setblocking(False) + ... + + def drain(self) -> None: + try: + while True: + self.wakeup_sock.recv(2**16) + except BlockingIOError: + pass +``` + +`socket.socketpair()` on Linux defaults to AF_UNIX +SOCK_STREAM. Both ends non-blocking. Normal flow: + +1. Signal/wake event → `write_sock.send(b'\x00')` + queues a byte. +2. `wakeup_sock` becomes readable → trio's epoll + triggers. +3. Trio calls `drain()` to flush the buffer. +4. drain loops on `wakeup_sock.recv(64KB)`. +5. Eventually buffer empty → non-blocking socket + raises `BlockingIOError` → except → break. + +**Bug surface — peer-closed missed-EOF**: + +Non-blocking socket semantics: +- buffer has data → `recv` returns N>0 bytes (loop continues) +- buffer empty → `recv` raises `BlockingIOError` +- **peer FIN'd → `recv` returns 0 bytes (NEITHER exception NOR + break — infinite tight loop)** + +`drain()` does not handle the `b''` return-value +(EOF) case. If `write_sock` has been closed (or the +process holding it is gone), every iteration returns +0 → infinite loop → 100% CPU on a single core. + +## Why this triggers under `main_thread_forkserver` + +Under `os.fork()` from the forkserver-worker thread: + +1. Parent has a `WakeupSocketpair` instance with + `wakeup_sock=fdN`, `write_sock=fdM`. Both fds + open in parent. +2. Fork → child inherits BOTH fds (kernel-level fd + table dup). +3. `_close_inherited_fds()` runs in child → + closes everything except stdio. `wakeup_sock` and + `write_sock` of the parent's `WakeupSocketpair` + ARE closed in child. +4. Child's trio (running fresh) creates its OWN + `WakeupSocketpair` → NEW fd numbers (e.g. fd 6, 7). +5. **In `infect_asyncio` mode** the asyncio loop is + the host; trio runs as guest via + `start_guest_run`. trio still creates its + `WakeupSocketpair` in the I/O manager but its + role is different. + +The race window: somewhere between (3) and (5), if a +`WakeupSocketpair` Python object reference inherited +via COW (from parent's pre-fork heap) survives long +enough that `drain()` is called on it AFTER its fds +were closed but BEFORE the child's NEW socketpair +takes over the recycled fd numbers — the recycled fd +will be one of the child's NEW socketpair ends, whose +peer might be FIN-flagged (e.g. parent-process +peer-end is closed). + +Or simpler: the `wait_for_actor`/`find_actor` discovery +flow in `test_register_duplicate_name` triggers an +unusual code path where a stale `WakeupSocketpair` +gets `drain()`-called on a fd whose peer has already +closed. + +## Why `drain()` shouldn't loop indefinitely on EOF +(upstream trio bug) + +Even WITHOUT fork, `drain()` should treat `b''` as +EOF and break. The current code is correct for the +"buffer drained on a healthy socketpair" scenario but +incorrect for the "peer is gone" scenario. It's a +defensive-programming gap in trio. + +A one-line patch upstream: + +```python +def drain(self) -> None: + try: + while True: + data = self.wakeup_sock.recv(2**16) + if not data: + break # peer-closed; nothing more to drain + except BlockingIOError: + pass +``` + +## Workarounds (until the underlying issue lands) + +1. **Skip-mark on the fork backend**: + `tests/test_multi_program.py` → + `pytest.mark.skipon_spawn_backend('main_thread_forkserver', + reason='trio WakeupSocketpair.drain busy-loop, see ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md')`. + +2. **Defensive monkey-patch in tractor's + forkserver-child prelude** — wrap + `WakeupSocketpair.drain` to handle `b''`: + + ```python + # in `_actor_child_main` or `_close_inherited_fds`'s + # post-fork prelude: + from trio._core._wakeup_socketpair import WakeupSocketpair + _orig_drain = WakeupSocketpair.drain + def _safe_drain(self): + try: + while True: + data = self.wakeup_sock.recv(2**16) + if not data: + return # peer closed + except BlockingIOError: + pass + WakeupSocketpair.drain = _safe_drain + ``` + + Tracks upstream — remove once trio fixes. + +3. **Upstream the fix**: 1-line PR to `python-trio/trio` + adding `if not data: break` to `drain()`. + +## Investigation next steps + +1. **Confirm via py-spy**: when caught alive, detach + strace first then + `sudo py-spy dump --pid --locals`. The + busy thread should show `drain` from `WakeupSocketpair` + in the call chain. +2. **Identify which write-end peer is closed**: from + the inode of fd 6, look up the matching peer + inode via `ss -xp` and see whose process it + was/is. +3. **Verify the missed-EOF hypothesis**: hand-craft a + minimal `WakeupSocketpair` repro: + + ```python + from trio._core._wakeup_socketpair import WakeupSocketpair + ws = WakeupSocketpair() + ws.write_sock.close() # simulate peer-gone + ws.drain() # should hang forever + ``` + +## Sibling bug + +`tests/test_infected_asyncio.py::test_aio_simple_error` +hangs under the same backend with a DIFFERENT +fingerprint (Mode-A deadlock, both parties in +`epoll_wait`, no busy-loop). Distinct root cause — +see `infected_asyncio_under_main_thread_forkserver_hang_issue.md`. + +Both share the broader theme: **trio internal-state +initialization isn't fully fork-safe under +`main_thread_forkserver`** for the more exotic +dispatch paths. + +## See also + +- [#379](https://github.com/goodboy/tractor/issues/379) — subint umbrella +- python-trio/trio#1614 — trio + fork hazards +- `trio._core._wakeup_socketpair.WakeupSocketpair` + source (the smoking gun) +- `ai/conc-anal/fork_thread_semantics_execution_vs_memory.md` +- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md` diff --git a/tests/trionics/__init__.py b/tests/trionics/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/trionics/test_patches.py b/tests/trionics/test_patches.py new file mode 100644 index 00000000..9f2b942f --- /dev/null +++ b/tests/trionics/test_patches.py @@ -0,0 +1,99 @@ +''' +Regression tests for `tractor.trionics.patches` — +defensive monkey-patches on upstream `trio` bugs. + +Each test asserts: + +1. The bug exists (or is gone — skip cleanly if + upstream shipped the fix and our `is_needed()` now + returns `False`). +2. Our patch fixes it (post-`apply()` the `repro()` + returns cleanly within a tight wall-clock cap). + +Wall-clock caps are critical here — the bugs we patch +are tight-loops or deadlocks, so a regression would +HANG the test runner unless we hard-cap each +`repro()` call. + +''' +import signal + +import pytest + +from tractor.trionics import patches +from tractor.trionics.patches import _wakeup_socketpair as wsp + + +@pytest.fixture(autouse=True) +def _alarm_cleanup(): + ''' + Ensure no leftover SIGALRM survives a test failure + or unexpected return. + + ''' + yield + signal.alarm(0) + + +def test_wakeup_socketpair_drain_eof_patch_works(): + ''' + Without the patch, `WakeupSocketpair.drain()` on a + socketpair whose write-end has been closed spins + forever. With the patch applied, it returns + cleanly within milliseconds. + + Wall-clock cap: 2s. If the patch regresses, SIGALRM + fires and the test hard-fails with a clear signal + instead of hanging CI indefinitely. + + ''' + if not wsp.is_needed(): + pytest.skip( + 'upstream trio shipped the fix — ' + 'patch no longer needed for trio ' + '(see `is_needed()` for version gate)' + ) + + # Apply the patch. + applied: bool = wsp.apply() + # First call MUST return True; idempotent guard + # prevents False on subsequent calls within the + # same process. + assert applied is True or applied is False # idempotent + + # Cap wall-clock at 2s; SIGALRM raises in main + # thread which interrupts the C-level recv loop + # IF the patch regresses (since `signal.alarm` + # uses Python's signal-wakeup-fd which the patch + # itself relies on... but `repro()` runs OUTSIDE + # a trio.run, so it's plain stdlib semantics here + # — alarm WILL fire during `recv` syscall). + signal.alarm(2) + wsp.repro() + signal.alarm(0) + + +def test_apply_all_idempotent(): + ''' + Calling `apply_all()` twice should not double- + apply: second call's dict has all-False values + (every patch reports "already applied"). + + ''' + first: dict[str, bool] = patches.apply_all() + second: dict[str, bool] = patches.apply_all() + + # Second call: every patch reports skipped. + assert all(v is False for v in second.values()), ( + f'apply_all() not idempotent: {second}' + ) + + # First call: at least one patch was applied + # (or all are no-ops because `is_needed()` is + # False everywhere — the all-fixed-upstream future + # state which is also valid). + assert isinstance(first, dict) + for name, applied in first.items(): + assert isinstance(applied, bool), ( + f'patch {name!r} returned non-bool: {applied!r}' + ) diff --git a/tractor/_child.py b/tractor/_child.py index 727a5054..a79ea005 100644 --- a/tractor/_child.py +++ b/tractor/_child.py @@ -63,6 +63,14 @@ def _actor_child_main( sub-interpreter via `Interpreter.call()`. ''' + # Apply defensive monkey-patches for upstream `trio` + # bugs we've encountered while running tractor — see + # `tractor.trionics.patches` for the catalog + + # per-patch upstream-fix tracking. Must run BEFORE + # any trio runtime init. + from .trionics.patches import apply_all + apply_all() + subactor = Actor( name=uid[0], uuid=uid[1], diff --git a/tractor/trionics/patches/README.md b/tractor/trionics/patches/README.md new file mode 100644 index 00000000..c03845f3 --- /dev/null +++ b/tractor/trionics/patches/README.md @@ -0,0 +1,95 @@ +# `tractor.trionics.patches` + +Defensive monkey-patches for bugs in `trio` itself. + +## What goes here + +- Bugs in upstream `trio` that we've encountered while + running `tractor` and need to work around until + upstream releases a fix. +- Each patch fixes EXACTLY one trio internal — no + multi-bug omnibus patches. + +## What does NOT go here + +- Bugs in `tractor`'s own code (those get fixed + in-tree, in the offending tractor module). +- Bugs in `asyncio`, `pytest`, the stdlib, etc. (file + separate `tractor..patches` subpkgs as + needed). +- Workarounds for behavior we *disagree* with but that + isn't a bug per se. If trio's API does what it says + on the tin, we don't override it here. + +## Per-patch contract + +Every `_.py` module in this directory MUST +expose: + +- **`apply() -> bool`** — apply the patch. Idempotent + (safe to call multiple times). Version-gated — must + consult `is_needed()` and skip when False. Returns + `True` if patched this call, `False` if skipped. + +- **`is_needed() -> bool`** — does upstream still need + patching? Today most patches return `True` + unconditionally, but as upstream releases land each + should gate on `Version(trio.__version__) < + Version('X.Y.Z')`. When the gated version is + released, the patch can be DELETED entirely. + +- **`repro() -> None`** — minimal demonstration of the + bug. Used by the regression test suite to assert (a) + the upstream bug still exists, (b) our patch fixes + it. Should be tight enough that calling it post- + `apply()` returns cleanly within a few hundred + milliseconds — tests wrap it with a wall-clock cap. + +Each module's docstring MUST contain: + +- **Problem**: what trio does wrong + the trigger + conditions (e.g. "fork-spawn backend, peer-closed + socketpair, etc.") +- **Fix**: the one-line (ideally) patch +- **Repro**: the standalone snippet `repro()` + implements +- **Upstream**: link to filed issue/PR (or + `TODO: file`) +- **REMOVE WHEN**: `trio>=X.Y.Z` ships the upstream + fix + +## Adding a patch + +1. Create `_.py` with the `apply` / + `is_needed` / `repro` API. +2. Register it in `__init__.py::_PATCHES`. +3. Add a regression test in + `tests/trionics/test_patches.py` that uses + `repro()` to assert pre/post-patch behavior with a + wall-clock cap. +4. File the upstream issue/PR. Add the link to your + module's `Upstream:` and `# REMOVE WHEN:` lines. + +## Removing a patch (when upstream releases the fix) + +1. Confirm the upstream-fixed `trio` version is the + minimum we depend on, OR keep the version-gate in + `is_needed()` if we still support older trio. +2. If we've fully bumped past the broken versions: + - Delete `_.py` + - Remove the entry from `__init__.py::_PATCHES` + - Delete the corresponding test in + `tests/trionics/test_patches.py` + - Bump the conc-anal doc with a "FIXED" header + +## Calling + +```python +from tractor.trionics.patches import apply_all +apply_all() +``` + +Currently invoked from `tractor._child._actor_child_main` +before `_trio_main` so every spawned subactor gets +patched. The root actor's entry could opt in too if a +patch turns out to bite the root (none do today). diff --git a/tractor/trionics/patches/__init__.py b/tractor/trionics/patches/__init__.py new file mode 100644 index 00000000..5d2cdfb3 --- /dev/null +++ b/tractor/trionics/patches/__init__.py @@ -0,0 +1,84 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Defensive monkey-patches for `trio` internals. + +Every patch in this package fixes a bug in `trio` itself +that we've encountered while running `tractor` — usually +a fork-survival edge case that upstream `trio` hasn't +filed/fixed yet. Each patch is: + +- **idempotent** — safe to call multiple times +- **version-gated** — checks `trio.__version__` and skips + itself if upstream has shipped the fix +- **scoped** — only modifies the specific trio internal + it's targeting; no broad side effects +- **removable** — every patch carries a `# REMOVE WHEN:` + marker in its docstring pointing at the upstream PR + whose release allows us to drop it + +Add a new patch by: + +1. Create `tractor/trionics/patches/_.py` exposing + the `apply()` / `is_needed()` / `repro()` API + contract. +2. Import it in this `__init__.py` and add an entry to + `_PATCHES`. +3. Document upstream-fix-tracking in the module + docstring's `# REMOVE WHEN:` line. +4. Add a regression test in + `tests/trionics/test_patches.py` that uses the + patch's `repro()` to assert the bug exists + the + patch fixes it. + +Calling `apply_all()` from a tractor entry point (e.g. +`tractor._child._actor_child_main`) applies every +registered patch + returns `{patch_name: applied?}` so +callers can log/assert as needed. + +''' +from typing import Callable + +from . import _wakeup_socketpair + + +_PATCHES: list[tuple[str, Callable[[], bool]]] = [ + ( + 'trio_wakeup_socketpair_drain_eof', + _wakeup_socketpair.apply, + ), +] + + +def apply_all() -> dict[str, bool]: + ''' + Apply every registered patch. Idempotent — calling + twice is fine, second call's dict will be all + `False`. + + Returns `{patch_name: applied?}`: + + - `True` — patch was applied THIS call (inaugural + apply, or first-call-since-process-start). + - `False` — skipped (already applied OR upstream fix + detected via `is_needed() == False`). + + ''' + results: dict[str, bool] = {} + for name, applier in _PATCHES: + results[name] = applier() + return results diff --git a/tractor/trionics/patches/_wakeup_socketpair.py b/tractor/trionics/patches/_wakeup_socketpair.py new file mode 100644 index 00000000..6939bdcd --- /dev/null +++ b/tractor/trionics/patches/_wakeup_socketpair.py @@ -0,0 +1,171 @@ +# tractor: structured concurrent "actors". +# Copyright 2018-eternity Tyler Goodlet. + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Patch `trio._core._wakeup_socketpair.WakeupSocketpair.drain()` +to break on peer-closed EOF. + +Problem +------- +`drain()` loops on `self.wakeup_sock.recv(2**16)` and +exits ONLY on `BlockingIOError` (buffer-empty on a +non-blocking socket), NEVER on `recv() == b''` +(peer-closed FIN). When the socketpair's write-end +has been closed, `recv` returns 0 bytes each call → +infinite C-level tight loop → 100% CPU, no Python +checkpoints, no signal delivery, no progress. + +Most reliably triggered under fork-spawn backends — +`os.fork()` + `_close_inherited_fds()` can leave a +`WakeupSocketpair` instance whose `write_sock` was +closed in the child (or whose peer-end is held by a +process that has since exited). + +Repro +----- +```python +from trio._core._wakeup_socketpair import WakeupSocketpair +ws = WakeupSocketpair() +ws.write_sock.close() +ws.drain() # spins forever pre-patch +``` + +Fix +--- +One line: break the drain loop on `b''` EOF +in addition to the existing `BlockingIOError` exit. + +```python +def _safe_drain(self) -> None: + try: + while True: + data = self.wakeup_sock.recv(2**16) + if not data: # ← peer-closed; nothing more to drain + return + except BlockingIOError: + pass +``` + +Upstream +-------- +TODO: file at `python-trio/trio` — the standalone +`repro()` below + this docstring is the issue body's +evidence section. + +REMOVE WHEN: trio>=`` ships the EOF-break in +`_wakeup_socketpair.WakeupSocketpair.drain()`. + +See also +-------- +- `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md` +- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md` + — sibling-bug analysis fixed by the same patch. + +''' +from __future__ import annotations + + +# Module-local sentinel — set True by `apply()` after the +# first successful patch. Idempotency guard. +_APPLIED: bool = False + + +def is_needed() -> bool: + ''' + True iff upstream `trio` is the broken version that + needs our patch. + + Today: always True since no released `trio` has the + fix. When upstream lands it, gate on: + + ```python + from packaging.version import Version + import trio + return Version(trio.__version__) < Version('') + ``` + + ''' + # TODO version-gate once upstream lands the fix. + return True + + +def repro() -> None: + ''' + Minimal hang demonstrator + regression test target. + + Returns CLEANLY when `apply()` has been called + earlier in this process (the patched + `_safe_drain` breaks on EOF). Spins forever + UNPATCHED — caller should wrap with a wall-clock + cap (e.g. `signal.alarm(N)` or `trio.fail_after`) + to avoid hanging the test runner if regressing. + + Used by `tests/trionics/test_patches.py` to assert + both: + + 1. The bug exists upstream (sanity check the + repro is real). + 2. Our patch fixes it (post-`apply()` returns + cleanly). + + ''' + from trio._core._wakeup_socketpair import ( + WakeupSocketpair, + ) + ws = WakeupSocketpair() + ws.write_sock.close() + ws.drain() # ← targeted operation + + +def apply() -> bool: + ''' + Apply the EOF-break patch to + `WakeupSocketpair.drain`. Idempotent + version- + gated. + + Returns: + + - `True` if patched THIS call (inaugural apply). + - `False` if skipped (already applied this process, + OR `is_needed() == False` because upstream fixed + it). + + ''' + global _APPLIED + if _APPLIED or not is_needed(): + return False + + from trio._core._wakeup_socketpair import ( + WakeupSocketpair as _WSP, + ) + + def _safe_drain(self) -> None: + try: + while True: + data = self.wakeup_sock.recv(2**16) + # XXX patch — break on EOF instead of + # spinning. Upstream trio's `drain()` + # only handles the `BlockingIOError` + # (buffer-empty) case; missed the + # peer-closed (`recv == b''`) case. + if not data: + return + except BlockingIOError: + pass + + _WSP.drain = _safe_drain + _APPLIED = True + return True