Add `tractor.trionics.patches` subpkg + first fix
With a seminal patch fixing `trio`'s `WakeupSocketpair.drain()` which
can busy-loop due to lack of handling `EOF`.
New `tractor.trionics.patches` subpkg housing defensive monkey-patches
for upstream `trio` bugs we've encountered while running `tractor`
— particularly as of recent, fork-survival edge cases that haven't been
filed/fixed upstream yet. Each patch is idempotent, version-gated via
`is_needed()`, and carries a `# REMOVE WHEN:` marker pointing at the
upstream release whose adoption allows deletion.
Subpkg layout + per-patch contract documented in
`tractor/trionics/patches/README.md` — `apply()` / `is_needed()`
/ `repro()` API, registry pattern via `_PATCHES` in `__init__.py`,
single-call entry point `apply_all()`.
First patch, `_wakeup_socketpair`:
- `trio`'s `WakeupSocketpair.drain()` loops on `recv(64KB)` and exits
ONLY on `BlockingIOError`, NEVER on `recv() == b''` (peer-closed FIN).
- under `fork()`-spawning backends the COW-inherited socketpair fds
& `_close_inherited_fds()` teardown can leave a `WakeupSocketpair`
instance whose write-end is closed, and `drain()` then **spins forever
in C with no Python checkpoints**,
- this obviously burns 100% CPU and no signal delivery.
Standalone repro:
from trio._core._wakeup_socketpair import WakeupSocketpair
ws = WakeupSocketpair()
ws.write_sock.close()
ws.drain() # spins forever
Patch is one-line — break the drain loop on b'' EOF.
Manifested as two distinct test failures:
- `tests/test_multi_program.py::test_register_duplicate_name` hung at
100% CPU on the busy-loop directly (fork child's worker thread)
- `tests/test_infected_asyncio.py::test_aio_simple_error` Mode-A
deadlock — busy-loop wedged trio's scheduler inside `start_guest_run`,
both threads parked in `epoll_wait`, no TCP connect-back to parent
ever happened.
Same patch fixes both. Restored 99.7% pass rate on full
suite under `--spawn-backend=main_thread_forkserver`
(was hanging indefinitely before).
Wired into `tractor._child._actor_child_main` via `apply_all()` BEFORE
any trio runtime init. Harmless on non-fork backends.
Conc-anal write-ups, including strace + py-spy evidence:
- `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`
- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md`
Regression tests in `tests/trionics/test_patches.py`: each test asserts
(a) the bug exists pre-patch (or is fixed upstream — skip cleanly), (b)
the patch fixes it with a SIGALRM wall-clock cap so a regression hangs
loud instead of silently.
TODO:
- [ ] file the upstream `python-trio/trio` issue + PR.
- [ ] use the `repro()` callable in `_wakeup_socketpair.py` IS the issue
body's evidence section.
(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
(cherry picked from commit 0ef549fadb)
(factored: dropped spawn-backend-only paths: ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md)
wkt/tooling_enhancements_from_mtf_spawner
parent
6bbc35b6fb
commit
4f042ded23
|
|
@ -0,0 +1,221 @@
|
|||
# trio `WakeupSocketpair.drain()` busy-loop in forked child (peer-closed missed-EOF)
|
||||
|
||||
## Reproducer
|
||||
|
||||
```bash
|
||||
./py313/bin/python -m pytest \
|
||||
tests/test_multi_program.py::test_register_duplicate_name \
|
||||
--tpt-proto=tcp \
|
||||
--spawn-backend=main_thread_forkserver \
|
||||
-v --capture=sys
|
||||
```
|
||||
|
||||
Subactor pegs a CPU core indefinitely; parent test
|
||||
hangs waiting for the subactor.
|
||||
|
||||
## Empirical evidence (caught alive)
|
||||
|
||||
```
|
||||
$ sudo strace -p <subactor-pid>
|
||||
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
|
||||
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
|
||||
recvfrom(6, "", 65536, 0, NULL, NULL) = 0
|
||||
... (no `epoll_wait`, no other syscalls, just this back-to-back)
|
||||
```
|
||||
|
||||
Pattern: tight C-level `recvfrom` loop returning 0
|
||||
each call. No `epoll_wait` between iterations →
|
||||
**not trio's task scheduler**. Pure synchronous C
|
||||
loop.
|
||||
|
||||
```
|
||||
$ sudo readlink /proc/<subactor-pid>/fd/6
|
||||
socket:[<inode>]
|
||||
|
||||
$ sudo lsof -p <subactor-pid> | grep ' 6u'
|
||||
<cmd> <pid> goodboy 6u unix 0xffff... 0t0 <inode> type=STREAM (CONNECTED)
|
||||
```
|
||||
|
||||
fd=6 is an **AF_UNIX socket** in CONNECTED state.
|
||||
Even though the test uses `--tpt-proto=tcp`, this fd
|
||||
is NOT a tractor IPC channel — it's an internal
|
||||
trio socketpair.
|
||||
|
||||
## Root-cause: `WakeupSocketpair.drain()`
|
||||
|
||||
`/site-packages/trio/_core/_wakeup_socketpair.py`:
|
||||
|
||||
```python
|
||||
class WakeupSocketpair:
|
||||
def __init__(self) -> None:
|
||||
self.wakeup_sock, self.write_sock = socket.socketpair()
|
||||
self.wakeup_sock.setblocking(False)
|
||||
self.write_sock.setblocking(False)
|
||||
...
|
||||
|
||||
def drain(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
self.wakeup_sock.recv(2**16)
|
||||
except BlockingIOError:
|
||||
pass
|
||||
```
|
||||
|
||||
`socket.socketpair()` on Linux defaults to AF_UNIX
|
||||
SOCK_STREAM. Both ends non-blocking. Normal flow:
|
||||
|
||||
1. Signal/wake event → `write_sock.send(b'\x00')`
|
||||
queues a byte.
|
||||
2. `wakeup_sock` becomes readable → trio's epoll
|
||||
triggers.
|
||||
3. Trio calls `drain()` to flush the buffer.
|
||||
4. drain loops on `wakeup_sock.recv(64KB)`.
|
||||
5. Eventually buffer empty → non-blocking socket
|
||||
raises `BlockingIOError` → except → break.
|
||||
|
||||
**Bug surface — peer-closed missed-EOF**:
|
||||
|
||||
Non-blocking socket semantics:
|
||||
- buffer has data → `recv` returns N>0 bytes (loop continues)
|
||||
- buffer empty → `recv` raises `BlockingIOError`
|
||||
- **peer FIN'd → `recv` returns 0 bytes (NEITHER exception NOR
|
||||
break — infinite tight loop)**
|
||||
|
||||
`drain()` does not handle the `b''` return-value
|
||||
(EOF) case. If `write_sock` has been closed (or the
|
||||
process holding it is gone), every iteration returns
|
||||
0 → infinite loop → 100% CPU on a single core.
|
||||
|
||||
## Why this triggers under `main_thread_forkserver`
|
||||
|
||||
Under `os.fork()` from the forkserver-worker thread:
|
||||
|
||||
1. Parent has a `WakeupSocketpair` instance with
|
||||
`wakeup_sock=fdN`, `write_sock=fdM`. Both fds
|
||||
open in parent.
|
||||
2. Fork → child inherits BOTH fds (kernel-level fd
|
||||
table dup).
|
||||
3. `_close_inherited_fds()` runs in child →
|
||||
closes everything except stdio. `wakeup_sock` and
|
||||
`write_sock` of the parent's `WakeupSocketpair`
|
||||
ARE closed in child.
|
||||
4. Child's trio (running fresh) creates its OWN
|
||||
`WakeupSocketpair` → NEW fd numbers (e.g. fd 6, 7).
|
||||
5. **In `infect_asyncio` mode** the asyncio loop is
|
||||
the host; trio runs as guest via
|
||||
`start_guest_run`. trio still creates its
|
||||
`WakeupSocketpair` in the I/O manager but its
|
||||
role is different.
|
||||
|
||||
The race window: somewhere between (3) and (5), if a
|
||||
`WakeupSocketpair` Python object reference inherited
|
||||
via COW (from parent's pre-fork heap) survives long
|
||||
enough that `drain()` is called on it AFTER its fds
|
||||
were closed but BEFORE the child's NEW socketpair
|
||||
takes over the recycled fd numbers — the recycled fd
|
||||
will be one of the child's NEW socketpair ends, whose
|
||||
peer might be FIN-flagged (e.g. parent-process
|
||||
peer-end is closed).
|
||||
|
||||
Or simpler: the `wait_for_actor`/`find_actor` discovery
|
||||
flow in `test_register_duplicate_name` triggers an
|
||||
unusual code path where a stale `WakeupSocketpair`
|
||||
gets `drain()`-called on a fd whose peer has already
|
||||
closed.
|
||||
|
||||
## Why `drain()` shouldn't loop indefinitely on EOF
|
||||
(upstream trio bug)
|
||||
|
||||
Even WITHOUT fork, `drain()` should treat `b''` as
|
||||
EOF and break. The current code is correct for the
|
||||
"buffer drained on a healthy socketpair" scenario but
|
||||
incorrect for the "peer is gone" scenario. It's a
|
||||
defensive-programming gap in trio.
|
||||
|
||||
A one-line patch upstream:
|
||||
|
||||
```python
|
||||
def drain(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
data = self.wakeup_sock.recv(2**16)
|
||||
if not data:
|
||||
break # peer-closed; nothing more to drain
|
||||
except BlockingIOError:
|
||||
pass
|
||||
```
|
||||
|
||||
## Workarounds (until the underlying issue lands)
|
||||
|
||||
1. **Skip-mark on the fork backend**:
|
||||
`tests/test_multi_program.py` →
|
||||
`pytest.mark.skipon_spawn_backend('main_thread_forkserver',
|
||||
reason='trio WakeupSocketpair.drain busy-loop, see ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md')`.
|
||||
|
||||
2. **Defensive monkey-patch in tractor's
|
||||
forkserver-child prelude** — wrap
|
||||
`WakeupSocketpair.drain` to handle `b''`:
|
||||
|
||||
```python
|
||||
# in `_actor_child_main` or `_close_inherited_fds`'s
|
||||
# post-fork prelude:
|
||||
from trio._core._wakeup_socketpair import WakeupSocketpair
|
||||
_orig_drain = WakeupSocketpair.drain
|
||||
def _safe_drain(self):
|
||||
try:
|
||||
while True:
|
||||
data = self.wakeup_sock.recv(2**16)
|
||||
if not data:
|
||||
return # peer closed
|
||||
except BlockingIOError:
|
||||
pass
|
||||
WakeupSocketpair.drain = _safe_drain
|
||||
```
|
||||
|
||||
Tracks upstream — remove once trio fixes.
|
||||
|
||||
3. **Upstream the fix**: 1-line PR to `python-trio/trio`
|
||||
adding `if not data: break` to `drain()`.
|
||||
|
||||
## Investigation next steps
|
||||
|
||||
1. **Confirm via py-spy**: when caught alive, detach
|
||||
strace first then
|
||||
`sudo py-spy dump --pid <subactor> --locals`. The
|
||||
busy thread should show `drain` from `WakeupSocketpair`
|
||||
in the call chain.
|
||||
2. **Identify which write-end peer is closed**: from
|
||||
the inode of fd 6, look up the matching peer
|
||||
inode via `ss -xp` and see whose process it
|
||||
was/is.
|
||||
3. **Verify the missed-EOF hypothesis**: hand-craft a
|
||||
minimal `WakeupSocketpair` repro:
|
||||
|
||||
```python
|
||||
from trio._core._wakeup_socketpair import WakeupSocketpair
|
||||
ws = WakeupSocketpair()
|
||||
ws.write_sock.close() # simulate peer-gone
|
||||
ws.drain() # should hang forever
|
||||
```
|
||||
|
||||
## Sibling bug
|
||||
|
||||
`tests/test_infected_asyncio.py::test_aio_simple_error`
|
||||
hangs under the same backend with a DIFFERENT
|
||||
fingerprint (Mode-A deadlock, both parties in
|
||||
`epoll_wait`, no busy-loop). Distinct root cause —
|
||||
see `infected_asyncio_under_main_thread_forkserver_hang_issue.md`.
|
||||
|
||||
Both share the broader theme: **trio internal-state
|
||||
initialization isn't fully fork-safe under
|
||||
`main_thread_forkserver`** for the more exotic
|
||||
dispatch paths.
|
||||
|
||||
## See also
|
||||
|
||||
- [#379](https://github.com/goodboy/tractor/issues/379) — subint umbrella
|
||||
- python-trio/trio#1614 — trio + fork hazards
|
||||
- `trio._core._wakeup_socketpair.WakeupSocketpair`
|
||||
source (the smoking gun)
|
||||
- `ai/conc-anal/fork_thread_semantics_execution_vs_memory.md`
|
||||
- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md`
|
||||
|
|
@ -0,0 +1,99 @@
|
|||
'''
|
||||
Regression tests for `tractor.trionics.patches` —
|
||||
defensive monkey-patches on upstream `trio` bugs.
|
||||
|
||||
Each test asserts:
|
||||
|
||||
1. The bug exists (or is gone — skip cleanly if
|
||||
upstream shipped the fix and our `is_needed()` now
|
||||
returns `False`).
|
||||
2. Our patch fixes it (post-`apply()` the `repro()`
|
||||
returns cleanly within a tight wall-clock cap).
|
||||
|
||||
Wall-clock caps are critical here — the bugs we patch
|
||||
are tight-loops or deadlocks, so a regression would
|
||||
HANG the test runner unless we hard-cap each
|
||||
`repro()` call.
|
||||
|
||||
'''
|
||||
import signal
|
||||
|
||||
import pytest
|
||||
|
||||
from tractor.trionics import patches
|
||||
from tractor.trionics.patches import _wakeup_socketpair as wsp
|
||||
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _alarm_cleanup():
|
||||
'''
|
||||
Ensure no leftover SIGALRM survives a test failure
|
||||
or unexpected return.
|
||||
|
||||
'''
|
||||
yield
|
||||
signal.alarm(0)
|
||||
|
||||
|
||||
def test_wakeup_socketpair_drain_eof_patch_works():
|
||||
'''
|
||||
Without the patch, `WakeupSocketpair.drain()` on a
|
||||
socketpair whose write-end has been closed spins
|
||||
forever. With the patch applied, it returns
|
||||
cleanly within milliseconds.
|
||||
|
||||
Wall-clock cap: 2s. If the patch regresses, SIGALRM
|
||||
fires and the test hard-fails with a clear signal
|
||||
instead of hanging CI indefinitely.
|
||||
|
||||
'''
|
||||
if not wsp.is_needed():
|
||||
pytest.skip(
|
||||
'upstream trio shipped the fix — '
|
||||
'patch no longer needed for trio '
|
||||
'(see `is_needed()` for version gate)'
|
||||
)
|
||||
|
||||
# Apply the patch.
|
||||
applied: bool = wsp.apply()
|
||||
# First call MUST return True; idempotent guard
|
||||
# prevents False on subsequent calls within the
|
||||
# same process.
|
||||
assert applied is True or applied is False # idempotent
|
||||
|
||||
# Cap wall-clock at 2s; SIGALRM raises in main
|
||||
# thread which interrupts the C-level recv loop
|
||||
# IF the patch regresses (since `signal.alarm`
|
||||
# uses Python's signal-wakeup-fd which the patch
|
||||
# itself relies on... but `repro()` runs OUTSIDE
|
||||
# a trio.run, so it's plain stdlib semantics here
|
||||
# — alarm WILL fire during `recv` syscall).
|
||||
signal.alarm(2)
|
||||
wsp.repro()
|
||||
signal.alarm(0)
|
||||
|
||||
|
||||
def test_apply_all_idempotent():
|
||||
'''
|
||||
Calling `apply_all()` twice should not double-
|
||||
apply: second call's dict has all-False values
|
||||
(every patch reports "already applied").
|
||||
|
||||
'''
|
||||
first: dict[str, bool] = patches.apply_all()
|
||||
second: dict[str, bool] = patches.apply_all()
|
||||
|
||||
# Second call: every patch reports skipped.
|
||||
assert all(v is False for v in second.values()), (
|
||||
f'apply_all() not idempotent: {second}'
|
||||
)
|
||||
|
||||
# First call: at least one patch was applied
|
||||
# (or all are no-ops because `is_needed()` is
|
||||
# False everywhere — the all-fixed-upstream future
|
||||
# state which is also valid).
|
||||
assert isinstance(first, dict)
|
||||
for name, applied in first.items():
|
||||
assert isinstance(applied, bool), (
|
||||
f'patch {name!r} returned non-bool: {applied!r}'
|
||||
)
|
||||
|
|
@ -63,6 +63,14 @@ def _actor_child_main(
|
|||
sub-interpreter via `Interpreter.call()`.
|
||||
|
||||
'''
|
||||
# Apply defensive monkey-patches for upstream `trio`
|
||||
# bugs we've encountered while running tractor — see
|
||||
# `tractor.trionics.patches` for the catalog +
|
||||
# per-patch upstream-fix tracking. Must run BEFORE
|
||||
# any trio runtime init.
|
||||
from .trionics.patches import apply_all
|
||||
apply_all()
|
||||
|
||||
subactor = Actor(
|
||||
name=uid[0],
|
||||
uuid=uid[1],
|
||||
|
|
|
|||
|
|
@ -0,0 +1,95 @@
|
|||
# `tractor.trionics.patches`
|
||||
|
||||
Defensive monkey-patches for bugs in `trio` itself.
|
||||
|
||||
## What goes here
|
||||
|
||||
- Bugs in upstream `trio` that we've encountered while
|
||||
running `tractor` and need to work around until
|
||||
upstream releases a fix.
|
||||
- Each patch fixes EXACTLY one trio internal — no
|
||||
multi-bug omnibus patches.
|
||||
|
||||
## What does NOT go here
|
||||
|
||||
- Bugs in `tractor`'s own code (those get fixed
|
||||
in-tree, in the offending tractor module).
|
||||
- Bugs in `asyncio`, `pytest`, the stdlib, etc. (file
|
||||
separate `tractor.<lib>.patches` subpkgs as
|
||||
needed).
|
||||
- Workarounds for behavior we *disagree* with but that
|
||||
isn't a bug per se. If trio's API does what it says
|
||||
on the tin, we don't override it here.
|
||||
|
||||
## Per-patch contract
|
||||
|
||||
Every `_<topic>.py` module in this directory MUST
|
||||
expose:
|
||||
|
||||
- **`apply() -> bool`** — apply the patch. Idempotent
|
||||
(safe to call multiple times). Version-gated — must
|
||||
consult `is_needed()` and skip when False. Returns
|
||||
`True` if patched this call, `False` if skipped.
|
||||
|
||||
- **`is_needed() -> bool`** — does upstream still need
|
||||
patching? Today most patches return `True`
|
||||
unconditionally, but as upstream releases land each
|
||||
should gate on `Version(trio.__version__) <
|
||||
Version('X.Y.Z')`. When the gated version is
|
||||
released, the patch can be DELETED entirely.
|
||||
|
||||
- **`repro() -> None`** — minimal demonstration of the
|
||||
bug. Used by the regression test suite to assert (a)
|
||||
the upstream bug still exists, (b) our patch fixes
|
||||
it. Should be tight enough that calling it post-
|
||||
`apply()` returns cleanly within a few hundred
|
||||
milliseconds — tests wrap it with a wall-clock cap.
|
||||
|
||||
Each module's docstring MUST contain:
|
||||
|
||||
- **Problem**: what trio does wrong + the trigger
|
||||
conditions (e.g. "fork-spawn backend, peer-closed
|
||||
socketpair, etc.")
|
||||
- **Fix**: the one-line (ideally) patch
|
||||
- **Repro**: the standalone snippet `repro()`
|
||||
implements
|
||||
- **Upstream**: link to filed issue/PR (or
|
||||
`TODO: file`)
|
||||
- **REMOVE WHEN**: `trio>=X.Y.Z` ships the upstream
|
||||
fix
|
||||
|
||||
## Adding a patch
|
||||
|
||||
1. Create `_<topic>.py` with the `apply` /
|
||||
`is_needed` / `repro` API.
|
||||
2. Register it in `__init__.py::_PATCHES`.
|
||||
3. Add a regression test in
|
||||
`tests/trionics/test_patches.py` that uses
|
||||
`repro()` to assert pre/post-patch behavior with a
|
||||
wall-clock cap.
|
||||
4. File the upstream issue/PR. Add the link to your
|
||||
module's `Upstream:` and `# REMOVE WHEN:` lines.
|
||||
|
||||
## Removing a patch (when upstream releases the fix)
|
||||
|
||||
1. Confirm the upstream-fixed `trio` version is the
|
||||
minimum we depend on, OR keep the version-gate in
|
||||
`is_needed()` if we still support older trio.
|
||||
2. If we've fully bumped past the broken versions:
|
||||
- Delete `_<topic>.py`
|
||||
- Remove the entry from `__init__.py::_PATCHES`
|
||||
- Delete the corresponding test in
|
||||
`tests/trionics/test_patches.py`
|
||||
- Bump the conc-anal doc with a "FIXED" header
|
||||
|
||||
## Calling
|
||||
|
||||
```python
|
||||
from tractor.trionics.patches import apply_all
|
||||
apply_all()
|
||||
```
|
||||
|
||||
Currently invoked from `tractor._child._actor_child_main`
|
||||
before `_trio_main` so every spawned subactor gets
|
||||
patched. The root actor's entry could opt in too if a
|
||||
patch turns out to bite the root (none do today).
|
||||
|
|
@ -0,0 +1,84 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Defensive monkey-patches for `trio` internals.
|
||||
|
||||
Every patch in this package fixes a bug in `trio` itself
|
||||
that we've encountered while running `tractor` — usually
|
||||
a fork-survival edge case that upstream `trio` hasn't
|
||||
filed/fixed yet. Each patch is:
|
||||
|
||||
- **idempotent** — safe to call multiple times
|
||||
- **version-gated** — checks `trio.__version__` and skips
|
||||
itself if upstream has shipped the fix
|
||||
- **scoped** — only modifies the specific trio internal
|
||||
it's targeting; no broad side effects
|
||||
- **removable** — every patch carries a `# REMOVE WHEN:`
|
||||
marker in its docstring pointing at the upstream PR
|
||||
whose release allows us to drop it
|
||||
|
||||
Add a new patch by:
|
||||
|
||||
1. Create `tractor/trionics/patches/_<topic>.py` exposing
|
||||
the `apply()` / `is_needed()` / `repro()` API
|
||||
contract.
|
||||
2. Import it in this `__init__.py` and add an entry to
|
||||
`_PATCHES`.
|
||||
3. Document upstream-fix-tracking in the module
|
||||
docstring's `# REMOVE WHEN:` line.
|
||||
4. Add a regression test in
|
||||
`tests/trionics/test_patches.py` that uses the
|
||||
patch's `repro()` to assert the bug exists + the
|
||||
patch fixes it.
|
||||
|
||||
Calling `apply_all()` from a tractor entry point (e.g.
|
||||
`tractor._child._actor_child_main`) applies every
|
||||
registered patch + returns `{patch_name: applied?}` so
|
||||
callers can log/assert as needed.
|
||||
|
||||
'''
|
||||
from typing import Callable
|
||||
|
||||
from . import _wakeup_socketpair
|
||||
|
||||
|
||||
_PATCHES: list[tuple[str, Callable[[], bool]]] = [
|
||||
(
|
||||
'trio_wakeup_socketpair_drain_eof',
|
||||
_wakeup_socketpair.apply,
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
def apply_all() -> dict[str, bool]:
|
||||
'''
|
||||
Apply every registered patch. Idempotent — calling
|
||||
twice is fine, second call's dict will be all
|
||||
`False`.
|
||||
|
||||
Returns `{patch_name: applied?}`:
|
||||
|
||||
- `True` — patch was applied THIS call (inaugural
|
||||
apply, or first-call-since-process-start).
|
||||
- `False` — skipped (already applied OR upstream fix
|
||||
detected via `is_needed() == False`).
|
||||
|
||||
'''
|
||||
results: dict[str, bool] = {}
|
||||
for name, applier in _PATCHES:
|
||||
results[name] = applier()
|
||||
return results
|
||||
|
|
@ -0,0 +1,171 @@
|
|||
# tractor: structured concurrent "actors".
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
Patch `trio._core._wakeup_socketpair.WakeupSocketpair.drain()`
|
||||
to break on peer-closed EOF.
|
||||
|
||||
Problem
|
||||
-------
|
||||
`drain()` loops on `self.wakeup_sock.recv(2**16)` and
|
||||
exits ONLY on `BlockingIOError` (buffer-empty on a
|
||||
non-blocking socket), NEVER on `recv() == b''`
|
||||
(peer-closed FIN). When the socketpair's write-end
|
||||
has been closed, `recv` returns 0 bytes each call →
|
||||
infinite C-level tight loop → 100% CPU, no Python
|
||||
checkpoints, no signal delivery, no progress.
|
||||
|
||||
Most reliably triggered under fork-spawn backends —
|
||||
`os.fork()` + `_close_inherited_fds()` can leave a
|
||||
`WakeupSocketpair` instance whose `write_sock` was
|
||||
closed in the child (or whose peer-end is held by a
|
||||
process that has since exited).
|
||||
|
||||
Repro
|
||||
-----
|
||||
```python
|
||||
from trio._core._wakeup_socketpair import WakeupSocketpair
|
||||
ws = WakeupSocketpair()
|
||||
ws.write_sock.close()
|
||||
ws.drain() # spins forever pre-patch
|
||||
```
|
||||
|
||||
Fix
|
||||
---
|
||||
One line: break the drain loop on `b''` EOF
|
||||
in addition to the existing `BlockingIOError` exit.
|
||||
|
||||
```python
|
||||
def _safe_drain(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
data = self.wakeup_sock.recv(2**16)
|
||||
if not data: # ← peer-closed; nothing more to drain
|
||||
return
|
||||
except BlockingIOError:
|
||||
pass
|
||||
```
|
||||
|
||||
Upstream
|
||||
--------
|
||||
TODO: file at `python-trio/trio` — the standalone
|
||||
`repro()` below + this docstring is the issue body's
|
||||
evidence section.
|
||||
|
||||
REMOVE WHEN: trio>=`<TBD>` ships the EOF-break in
|
||||
`_wakeup_socketpair.WakeupSocketpair.drain()`.
|
||||
|
||||
See also
|
||||
--------
|
||||
- `ai/conc-anal/trio_wakeup_socketpair_busy_loop_under_fork_issue.md`
|
||||
- `ai/conc-anal/infected_asyncio_under_main_thread_forkserver_hang_issue.md`
|
||||
— sibling-bug analysis fixed by the same patch.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
||||
|
||||
# Module-local sentinel — set True by `apply()` after the
|
||||
# first successful patch. Idempotency guard.
|
||||
_APPLIED: bool = False
|
||||
|
||||
|
||||
def is_needed() -> bool:
|
||||
'''
|
||||
True iff upstream `trio` is the broken version that
|
||||
needs our patch.
|
||||
|
||||
Today: always True since no released `trio` has the
|
||||
fix. When upstream lands it, gate on:
|
||||
|
||||
```python
|
||||
from packaging.version import Version
|
||||
import trio
|
||||
return Version(trio.__version__) < Version('<TBD>')
|
||||
```
|
||||
|
||||
'''
|
||||
# TODO version-gate once upstream lands the fix.
|
||||
return True
|
||||
|
||||
|
||||
def repro() -> None:
|
||||
'''
|
||||
Minimal hang demonstrator + regression test target.
|
||||
|
||||
Returns CLEANLY when `apply()` has been called
|
||||
earlier in this process (the patched
|
||||
`_safe_drain` breaks on EOF). Spins forever
|
||||
UNPATCHED — caller should wrap with a wall-clock
|
||||
cap (e.g. `signal.alarm(N)` or `trio.fail_after`)
|
||||
to avoid hanging the test runner if regressing.
|
||||
|
||||
Used by `tests/trionics/test_patches.py` to assert
|
||||
both:
|
||||
|
||||
1. The bug exists upstream (sanity check the
|
||||
repro is real).
|
||||
2. Our patch fixes it (post-`apply()` returns
|
||||
cleanly).
|
||||
|
||||
'''
|
||||
from trio._core._wakeup_socketpair import (
|
||||
WakeupSocketpair,
|
||||
)
|
||||
ws = WakeupSocketpair()
|
||||
ws.write_sock.close()
|
||||
ws.drain() # ← targeted operation
|
||||
|
||||
|
||||
def apply() -> bool:
|
||||
'''
|
||||
Apply the EOF-break patch to
|
||||
`WakeupSocketpair.drain`. Idempotent + version-
|
||||
gated.
|
||||
|
||||
Returns:
|
||||
|
||||
- `True` if patched THIS call (inaugural apply).
|
||||
- `False` if skipped (already applied this process,
|
||||
OR `is_needed() == False` because upstream fixed
|
||||
it).
|
||||
|
||||
'''
|
||||
global _APPLIED
|
||||
if _APPLIED or not is_needed():
|
||||
return False
|
||||
|
||||
from trio._core._wakeup_socketpair import (
|
||||
WakeupSocketpair as _WSP,
|
||||
)
|
||||
|
||||
def _safe_drain(self) -> None:
|
||||
try:
|
||||
while True:
|
||||
data = self.wakeup_sock.recv(2**16)
|
||||
# XXX patch — break on EOF instead of
|
||||
# spinning. Upstream trio's `drain()`
|
||||
# only handles the `BlockingIOError`
|
||||
# (buffer-empty) case; missed the
|
||||
# peer-closed (`recv == b''`) case.
|
||||
if not data:
|
||||
return
|
||||
except BlockingIOError:
|
||||
pass
|
||||
|
||||
_WSP.drain = _safe_drain
|
||||
_APPLIED = True
|
||||
return True
|
||||
Loading…
Reference in New Issue