Compare commits
No commits in common. "0952b33a9e6d06ec8f68340a014c4ae90c44d214" and "0b8033fdaaee8cd6f2439767a3c4f76daa0acebf" have entirely different histories.
0952b33a9e
...
0b8033fdaa
|
|
@ -1,102 +0,0 @@
|
|||
# `trio` 0.29 -> 0.33 slows the depth=3 cancel-cascade
|
||||
|
||||
## Symptom
|
||||
|
||||
After locking to `trio==0.33.0` (commit `c7741bba`, was
|
||||
`0.29.0`), this test reliably trips its `fail_after`
|
||||
deadline on the **`trio`** backend:
|
||||
|
||||
```
|
||||
FAILED tests/test_cancellation.py::test_nested_multierrors[start_method=trio-depth=3]
|
||||
- AssertionError: assert False
|
||||
where False = isinstance(
|
||||
Cancelled(source='deadline', source_task=None, reason=None),
|
||||
tractor.RemoteActorError,
|
||||
)
|
||||
```
|
||||
|
||||
A `fail_after_w_trace` hang-snapshot is captured for the
|
||||
test each run (deadline-injected `Cancelled` wrapped into
|
||||
the actor-nursery `BaseExceptionGroup`).
|
||||
|
||||
## Root cause (immediate)
|
||||
|
||||
The test budgets `fail_after(6)` for the `trio` backend.
|
||||
That 6s was chosen (commit `32955db0`, while `trio==0.29`)
|
||||
with the assertion that trio finishes "well under" 6s.
|
||||
The `trio` 0.29 -> 0.33 bump slowed the depth=3 cascade
|
||||
past that budget, so the 6s deadline now fires mid-cascade.
|
||||
|
||||
trio 0.33 added **cancel-reason tracking** — every
|
||||
`Cancelled` now carries `(source=, reason=, source_task=)`.
|
||||
The injected exc is `Cancelled(source='deadline')`, i.e.
|
||||
trio itself naming our `fail_after(6)` scope as the cancel
|
||||
origin. When that `Cancelled` collapses one branch of the
|
||||
nursery BEG, the test's `isinstance(subexc,
|
||||
RemoteActorError)` assertion fails. The healthy outcome is
|
||||
`BEG = [RemoteActorError, RemoteActorError]`; the
|
||||
`Cancelled` is purely an artifact of the deadline cutting
|
||||
the cascade short.
|
||||
|
||||
## Measurements (standalone, this machine)
|
||||
|
||||
```
|
||||
depth=1 trio ~3.15s PASS (keeps 6s budget)
|
||||
depth=3 trio ~6.8-8.2s FAIL @ 6s (now bumped to 12s)
|
||||
```
|
||||
|
||||
depth=1 still fits comfortably; only depth=3 (deeper
|
||||
recursive spawn-and-error tree => more actors to reap)
|
||||
exceeds the old budget. The ~2s/depth-level cost looks
|
||||
like serialized per-actor reap / `terminate_after` waits.
|
||||
|
||||
## Mitigation applied
|
||||
|
||||
`test_nested_multierrors` now splits the `trio` budget:
|
||||
|
||||
```python
|
||||
case ('trio', 1):
|
||||
timeout = 6
|
||||
case ('trio', 3):
|
||||
timeout = 12 # was 6; see this doc
|
||||
```
|
||||
|
||||
This stops the deadline from firing so the cascade
|
||||
completes naturally to `[RAE, RAE]`.
|
||||
|
||||
## Also affected — same root cause, different test
|
||||
|
||||
`test_echoserver_detailed_mechanics[trio-raise_error=KeyboardInterrupt]`
|
||||
(`tests/test_infected_asyncio.py`) tripped the *same*
|
||||
slowdown via its much tighter `trio` budget of `1s`. The
|
||||
single-aio-subactor teardown now takes ~1s, so the `1s`
|
||||
`fail_after` raced the deadline (PASS at 0.99s / FAIL at
|
||||
1.03s across back-to-back standalone runs). On a deadline-
|
||||
fire the injected `Cancelled(source='deadline')` wraps the
|
||||
mid-stream `KeyboardInterrupt` into a `BaseExceptionGroup`,
|
||||
which is NOT a `KeyboardInterrupt` so the bare
|
||||
`pytest.raises(KeyboardInterrupt)` fails. (The sibling
|
||||
`raise_error=Exception` variant only "passes" by accident:
|
||||
an `ExceptionGroup` *is-a* `Exception`, so its
|
||||
`pytest.raises(Exception)` still matches even when wrapped.)
|
||||
|
||||
Mitigation: bump that `trio` budget `1 -> 4s` (matching the
|
||||
forking-spawner case). Without a deadline-fire the KBI
|
||||
propagates bare and the assertion passes.
|
||||
|
||||
## Open follow-up (the actual regression)
|
||||
|
||||
The budget bump is a band-aid — the underlying question is
|
||||
**why** the depth=3 `trio` cancel-cascade went from <6s to
|
||||
~7-8s across `trio` 0.29 -> 0.33. Candidate avenues:
|
||||
|
||||
- which scope owns the per-actor `terminate_after` wait,
|
||||
and are the tree's reaps concurrent or serialized?
|
||||
- did trio 0.33's abort/reschedule or cancel-reason
|
||||
bookkeeping change checkpoint timing on the cancel path?
|
||||
|
||||
If/when the cascade speeds back up under-budget, depth=3
|
||||
will start completing well under 12s — at which point the
|
||||
budget can be tightened back toward 6s as a regression
|
||||
tripwire. Related (different backend, same cascade class):
|
||||
`cancel_cascade_too_slow_under_main_thread_forkserver_issue.md`.
|
||||
|
|
@ -1,146 +0,0 @@
|
|||
---
|
||||
model: claude-opus-4-7[1m]
|
||||
service: claude
|
||||
session: trio-0.33-subproc-supervisor-retroactive
|
||||
timestamp: 2026-06-01T23:14:29Z
|
||||
git_ref: 0e3e008b
|
||||
scope: code
|
||||
substantive: true
|
||||
raw_file: 20260601T231429Z_0e3e008b_prompt_io.raw.md
|
||||
---
|
||||
|
||||
## Prompt
|
||||
|
||||
**RETROACTIVE LOG** — original session prompts not
|
||||
preserved; reconstructed from the staged work product.
|
||||
|
||||
The work designs a `trio.Nursery.start()`-style wrapper
|
||||
around `trio.run_process()` for SC-friendly subprocess
|
||||
supervision. From the resulting code shape, the
|
||||
prompting intent was:
|
||||
|
||||
1. Surface rc!=0 `CalledProcessError` DETERMINISTICALLY,
|
||||
without the nursery-eg-wrapping that complicates
|
||||
`collapse_eg()` usage and races the relay reader on
|
||||
trio's `check=True`-driven cancel cascade.
|
||||
2. ALWAYS isolate the parent controlling-tty so a
|
||||
spawned child can't emit terminal control-seqs onto
|
||||
the launching tty (clobbering scrollback). Default
|
||||
`stdin=DEVNULL`; default `stdout=DEVNULL` unless
|
||||
explicitly relayed/overridden; distinguish "caller
|
||||
passed nothing" from "caller passed `None` for
|
||||
inherit".
|
||||
3. Optional live per-line relay of child std-streams to
|
||||
the `tractor` log — STREAMED (not
|
||||
buffered-until-exit) so long-lived daemon output is
|
||||
visible during the run. Pick a custom log level that
|
||||
shows at usual `info`/`devx` console levels but is
|
||||
separately filterable.
|
||||
4. Concurrent pipe-drain reader MANDATORY when piping
|
||||
without `capture_*` — without it the child blocks on
|
||||
`write()` once the OS pipe buffer fills (~64KiB),
|
||||
causing deadlocks on output bursts.
|
||||
5. Non-blocking `tn.start()` semantics: hand the live
|
||||
`trio.Process` to the parent immediately;
|
||||
supervise/relay run to completion in the supervisor
|
||||
coro.
|
||||
6. Hermetic `trio`-only unit tests (no actor-runtime)
|
||||
covering each of: per-line relay, tty isolation,
|
||||
no-deadlock on >64KiB unnewlined output, CPE
|
||||
rebuild w/ stderr relay, CPE rebuild on the silent
|
||||
drain+capture path.
|
||||
|
||||
## Response summary
|
||||
|
||||
Adds `tractor/trionics/_subproc.py` (296 LOC) +
|
||||
`tests/trionics/test_subproc.py` (230 LOC) + a
|
||||
re-export in `tractor/trionics/__init__.py`.
|
||||
|
||||
**`supervise_run_process()`** (public, re-exported)
|
||||
- `check=False` is forced to `trio.run_process`; the
|
||||
rc-check runs in the supervisor coro AFTER `own_tn`
|
||||
unwinds (both the child AND the relay readers have
|
||||
hit EOF + fully drained). A BARE
|
||||
`subprocess.CalledProcessError` is rebuilt + raised
|
||||
from there, with `.stderr` bytes passed in the
|
||||
constructor AND attached as an `add_note()`'d
|
||||
`|_.stderr:` block for legible teardown logs.
|
||||
- `stdin=DEVNULL` always. `stdout` default chosen via a
|
||||
`_UNSET` sentinel: `relay_stdout=True` → PIPE,
|
||||
explicit `stdout=...` → as given, else `DEVNULL`.
|
||||
`stderr` defaults to PIPE whenever we relay OR need
|
||||
the CPE note (when `check=True`), else `DEVNULL`.
|
||||
- `relay_level='io'` (custom level 21; sorts just
|
||||
above stdlib `INFO`=20 so it shows at usual
|
||||
`info`/`devx` levels and stays separately
|
||||
filterable). `runtime`=15 would silently filter at
|
||||
default levels, so it's rejected as a default.
|
||||
- `task_status.started(trio_proc)` delivers the live
|
||||
process immediately. The internal `own_tn`
|
||||
supervises `trio.run_process` + any relay readers to
|
||||
completion.
|
||||
- `**run_process_kwargs` forward verbatim;
|
||||
`stdin/stdout/stderr/check` are MANAGED keys
|
||||
(override on conflict).
|
||||
- Crash-handling deliberately NOT baked in — compose
|
||||
`maybe_open_crash_handler()` on top at the call-site.
|
||||
|
||||
**`_relay_stream_lines()`** (internal helper)
|
||||
- Three modes (combinable): `emit`-only (live per-line
|
||||
relay), `accum`-only (silent drain+capture for a CPE
|
||||
note), or both (live relay AND capture).
|
||||
- Per-line split handles cross-chunk residuals via a
|
||||
rolling `residual` bytes buffer; flushes any trailing
|
||||
un-newline-term'd line at EOF.
|
||||
- `async with stream:` ensures aclose at EOF/cancel
|
||||
(mirrors trio's internal `_subprocess` drain idiom).
|
||||
|
||||
**`_add_stderr_note()`** (internal helper)
|
||||
- `add_note()`s a `textwrap.indent(...)`'d
|
||||
`|_.stderr:` block onto a `CalledProcessError` for
|
||||
teardown logs.
|
||||
|
||||
**Tests** (5 hermetic, trio-only) — `_capture_relay`
|
||||
fixture monkeypatches `_subproc.log.<level>` to a list:
|
||||
- `test_stdout_relayed_per_line`: per-line stdout
|
||||
relay carries each `line=N` to the records.
|
||||
- `test_parent_tty_isolated`: `readlink /proc/self/fd/0`
|
||||
and `fd/1` from the child show `pipe:` (fd1) +
|
||||
`/dev/null` (fd0); NO `/dev/pts/*`.
|
||||
- `test_no_deadlock_on_big_unnewlined_output`: 200KiB
|
||||
of `x` with no newlines completes inside
|
||||
`fail_after(2)` — exercises the concurrent drain.
|
||||
- `test_stderr_relay_and_cpe_rebuild`: rc=3 with
|
||||
`relay_stderr=True` raises bare CPE
|
||||
(via `collapse_eg()`) with `b'boom' in cpe.stderr`,
|
||||
the note attached, AND per-line live relay.
|
||||
- `test_nonrelay_cpe_note`: rc=7 with no relay still
|
||||
produces CPE with `.stderr` + note via the silent
|
||||
drain+capture path.
|
||||
|
||||
## Files changed
|
||||
|
||||
- `tractor/trionics/_subproc.py` — NEW. Public
|
||||
`supervise_run_process()` + helpers
|
||||
`_relay_stream_lines()` / `_add_stderr_note()` + the
|
||||
`_UNSET` sentinel.
|
||||
- `tests/trionics/test_subproc.py` — NEW. 5 hermetic
|
||||
trio-only tests + `_capture_relay` monkeypatch
|
||||
fixture.
|
||||
- `tractor/trionics/__init__.py` — re-export
|
||||
`supervise_run_process`.
|
||||
|
||||
## Human edits
|
||||
|
||||
**RETROACTIVE**: this log is being written from the
|
||||
staged diff, not from a live session. The code as
|
||||
staged is the canonical artifact; any human edits the
|
||||
user made during the originating design session are
|
||||
already integrated and cannot be separated post-hoc.
|
||||
The `.raw.md` sibling is a diff-pointer placeholder,
|
||||
NOT a pre-edit transcript.
|
||||
|
||||
Future prompt-io entries for in-flight work should be
|
||||
written DURING the design session per the skill
|
||||
contract so the pre-edit `.raw.md` captures the
|
||||
unedited model output for genuine provenance.
|
||||
|
|
@ -1,106 +0,0 @@
|
|||
---
|
||||
model: claude-opus-4-7[1m]
|
||||
service: claude
|
||||
timestamp: 2026-06-01T23:14:29Z
|
||||
git_ref: 0e3e008b
|
||||
diff_cmd: git diff HEAD~1..HEAD
|
||||
---
|
||||
|
||||
# RETROACTIVE — original model output not preserved
|
||||
|
||||
This `.raw.md` would normally contain the verbatim
|
||||
pre-human-edit response from the design session that
|
||||
produced the staged `_subproc.py` module + tests. That
|
||||
session's transcript is not available, so this file
|
||||
serves as a diff-pointer placeholder + transparency
|
||||
note.
|
||||
|
||||
## Authoritative artifact
|
||||
|
||||
The committed code IS the artifact of record. Once the
|
||||
companion commit lands, the unified diff is:
|
||||
|
||||
> `git diff HEAD~1..HEAD -- tractor/trionics/_subproc.py`
|
||||
> `git diff HEAD~1..HEAD -- tests/trionics/test_subproc.py`
|
||||
> `git diff HEAD~1..HEAD -- tractor/trionics/__init__.py`
|
||||
|
||||
Before committing, substitute `--cached` for the
|
||||
pre-commit form.
|
||||
|
||||
## What is NOT here
|
||||
|
||||
Because this is retroactive:
|
||||
- No verbatim chain-of-thought / discussion prose from
|
||||
the design session.
|
||||
- No rejected alternatives the model considered before
|
||||
arriving at the final shape (e.g. whether the
|
||||
rc-check should live inside `own_tn` vs after it; the
|
||||
`_UNSET` sentinel vs a `None`-means-DEVNULL
|
||||
convention; `io` vs `info` as the default relay
|
||||
level).
|
||||
- No pre-edit code blocks as the model first emitted
|
||||
them, separable from any user cleanup applied before
|
||||
the diff was staged.
|
||||
|
||||
## Inferred design choices visible in the final code
|
||||
|
||||
(Documented here because they're the kind of decision
|
||||
detail an unedited raw transcript would have captured.)
|
||||
|
||||
1. **Post-drain rc-check in the supervisor coro body,
|
||||
AFTER `own_tn.__aexit__`.** Placing the
|
||||
`CalledProcessError` raise here (not inside
|
||||
`own_tn`) means the EG-unwrap happens at the OUTER
|
||||
`tn.start()` boundary — callers do `collapse_eg()`
|
||||
if they want bare. Doing the raise INSIDE `own_tn`
|
||||
would cancel the still-draining relay reader
|
||||
mid-flight and lose stderr lines.
|
||||
|
||||
2. **`_UNSET` sentinel for `stdout`.** A plain default
|
||||
of `None` couldn't distinguish "use the safe
|
||||
`DEVNULL` default" from "caller explicitly passed
|
||||
`None` (inherit, presumably knowingly)". The
|
||||
sentinel keeps the SAFE default while letting power
|
||||
users opt into inherit.
|
||||
|
||||
3. **`relay_level='io'` (custom level 21).** Chosen to
|
||||
sort just above stdlib `INFO`=20 so a default
|
||||
`--ll info` shows the relay, but it remains a
|
||||
distinct level so users can filter
|
||||
`tractor.trionics:io` separately. Picking
|
||||
`runtime`=15 would have made the relay invisible at
|
||||
default verbosity (a footgun for daemon supervisors
|
||||
whose whole point is "I want to see this output").
|
||||
|
||||
4. **Reader is MANDATORY, not opt-in cosmetic.** With
|
||||
`stdout=PIPE` / `stderr=PIPE` we OWN the drain
|
||||
responsibility — there's no `trio.capture_*` running
|
||||
under the hood here. The ~64KiB OS pipe buffer
|
||||
means a child writing more than that without us
|
||||
reading hangs at `write()` — a deadlock that won't
|
||||
show up in small-output tests, which is why the
|
||||
200KiB-no-newline test is in the suite.
|
||||
|
||||
5. **`task_status.started(trio_proc)` BEFORE the
|
||||
`own_tn` exits.** Without this, `tn.start()` would
|
||||
block until the child exits — losing the "start a
|
||||
long-lived daemon and continue with parent work"
|
||||
use case. With it, the parent gets the live process
|
||||
handle immediately and the supervise+relay tasks
|
||||
run in the supervisor coro until the child exits.
|
||||
|
||||
6. **`__notes__` via `add_note()` for the CPE
|
||||
`.stderr`.** The `.stderr` attribute is what
|
||||
`subprocess` callers expect; the `add_note()` is
|
||||
what trio's exception-rendering shows. Both wired so
|
||||
programmatic AND human consumers see the stderr at
|
||||
teardown.
|
||||
|
||||
## Honesty statement
|
||||
|
||||
This file's content is RECONSTRUCTED from the staged
|
||||
code, not extracted from a verbatim model transcript.
|
||||
The prompt-io skill's intent is for the `.raw.md` to
|
||||
be a pre-edit fossil; that's not possible here. Future
|
||||
work should write the prompt-io entry DURING the
|
||||
design session.
|
||||
|
|
@ -105,7 +105,7 @@ testing = [
|
|||
repl = [
|
||||
"pyperclip>=1.9.0",
|
||||
"prompt-toolkit>=3.0.50",
|
||||
"xonsh>=0.23.8",
|
||||
"xonsh>=0.23.0",
|
||||
"psutil>=7.0.0",
|
||||
]
|
||||
lint = [
|
||||
|
|
|
|||
|
|
@ -5,7 +5,6 @@
|
|||
from __future__ import annotations
|
||||
import platform
|
||||
import os
|
||||
import re
|
||||
import signal
|
||||
import time
|
||||
from typing import (
|
||||
|
|
@ -295,26 +294,6 @@ def expect(
|
|||
PROMPT = r"\(Pdb\+\)"
|
||||
|
||||
|
||||
# Strip terminal color / ANSI-VT100 escape sequences so
|
||||
# substring matching against REPL + traceback output stays
|
||||
# robust to color leakage — Python 3.13's colored tracebacks,
|
||||
# `pdbp`'s pygments highlighting, etc. — even when
|
||||
# `PYTHON_COLORS=0` (set in the `spawn` fixture) isn't honored
|
||||
# by every renderer in the spawned subproc.
|
||||
# Regex per https://stackoverflow.com/a/14693789
|
||||
_ansi_re: re.Pattern = re.compile(
|
||||
r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])'
|
||||
)
|
||||
|
||||
|
||||
def ansi_strip(text: str) -> str:
|
||||
'''
|
||||
Remove ANSI/VT100 escape sequences from `text`.
|
||||
|
||||
'''
|
||||
return _ansi_re.sub('', text)
|
||||
|
||||
|
||||
def in_prompt_msg(
|
||||
child: SpawnBase,
|
||||
parts: list[str],
|
||||
|
|
@ -334,7 +313,7 @@ def in_prompt_msg(
|
|||
'''
|
||||
__tracebackhide__: bool = False
|
||||
|
||||
before: str = ansi_strip(str(child.before.decode()))
|
||||
before: str = str(child.before.decode())
|
||||
for part in parts:
|
||||
if part not in before:
|
||||
if pause_on_false:
|
||||
|
|
@ -354,9 +333,9 @@ def in_prompt_msg(
|
|||
return True
|
||||
|
||||
|
||||
# NB: color-char stripping (so we can match against call-stack
|
||||
# frame output from the `ll` command and the like) is handled by
|
||||
# `ansi_strip()` applied inside `in_prompt_msg()` + below.
|
||||
# TODO: todo support terminal color-chars stripping so we can match
|
||||
# against call stack frame output from the the 'll' command the like!
|
||||
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789
|
||||
def assert_before(
|
||||
child: SpawnBase,
|
||||
patts: list[str],
|
||||
|
|
@ -377,7 +356,7 @@ def assert_before(
|
|||
err_on_false=True,
|
||||
**kwargs
|
||||
)
|
||||
before: str = ansi_strip(str(child.before.decode()))
|
||||
before: str = str(child.before.decode())
|
||||
return before
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1186,12 +1186,7 @@ def test_shield_pause(
|
|||
"('cancelled_before_pause'", # actor name
|
||||
_repl_fail_msg,
|
||||
"trio.Cancelled",
|
||||
# trio >=0.30 raises via a multi-line
|
||||
# `raise Cancelled._create(source=.., reason=..,
|
||||
# source_task=..)` (cancel-reason metadata), so
|
||||
# match the open-paren form only, NOT the legacy
|
||||
# bare `()`.
|
||||
"raise Cancelled._create(",
|
||||
"raise Cancelled._create()",
|
||||
|
||||
# we should be handling a taskc inside
|
||||
# the first `.port_mortem()` sin-shield!
|
||||
|
|
@ -1209,12 +1204,7 @@ def test_shield_pause(
|
|||
"('root'", # actor name
|
||||
_repl_fail_msg,
|
||||
"trio.Cancelled",
|
||||
# trio >=0.30 raises via a multi-line
|
||||
# `raise Cancelled._create(source=.., reason=..,
|
||||
# source_task=..)` (cancel-reason metadata), so
|
||||
# match the open-paren form only, NOT the legacy
|
||||
# bare `()`.
|
||||
"raise Cancelled._create(",
|
||||
"raise Cancelled._create()",
|
||||
|
||||
# handling a taskc inside the first unshielded
|
||||
# `.port_mortem()`.
|
||||
|
|
|
|||
|
|
@ -8,9 +8,9 @@ after `Actor` construction, so any spawned sub-actor process
|
|||
should:
|
||||
|
||||
- have `argv[0]` (== `/proc/<pid>/cmdline`) start with
|
||||
`<_def_prefix>[<aid.reprol()>]` (currently `_subactor[…]`)
|
||||
- have `/proc/<pid>/comm` start with `<_def_prefix>[`
|
||||
(kernel truncates to ~15 bytes)
|
||||
`tractor[<aid.reprol()>]`
|
||||
- have `/proc/<pid>/comm` start with `tractor[` (kernel
|
||||
truncates to ~15 bytes)
|
||||
- be detected as a tractor sub-actor by
|
||||
`_is_tractor_subactor(pid)` via the cmdline marker.
|
||||
|
||||
|
|
@ -27,10 +27,7 @@ import trio
|
|||
import tractor
|
||||
|
||||
from tractor.runtime._runtime import Actor
|
||||
from tractor.devx._proctitle import (
|
||||
set_actor_proctitle,
|
||||
_def_prefix,
|
||||
)
|
||||
from tractor.devx._proctitle import set_actor_proctitle
|
||||
from tractor._testing._reap import (
|
||||
_is_tractor_subactor,
|
||||
_read_cmdline,
|
||||
|
|
@ -44,9 +41,8 @@ _non_linux: bool = platform.system() != 'Linux'
|
|||
def test_set_actor_proctitle_format():
|
||||
'''
|
||||
`set_actor_proctitle()` returns the canonical
|
||||
`<_def_prefix>[<aid.reprol()>]` form (currently
|
||||
`_subactor[…]`) and actually mutates the running
|
||||
proc's title.
|
||||
`tractor[<aid.reprol()>]` form and actually mutates
|
||||
the running proc's title.
|
||||
|
||||
'''
|
||||
pytest.importorskip(
|
||||
|
|
@ -64,14 +60,12 @@ def test_set_actor_proctitle_format():
|
|||
)
|
||||
title: str = set_actor_proctitle(actor)
|
||||
|
||||
# canonical wrapping: `<_def_prefix>[<aid.reprol()>]`.
|
||||
# We source BOTH the prefix (`_def_prefix`) and the
|
||||
# runtime-computed `reprol()` rather than hard-coding,
|
||||
# so the test stays decoupled from the prefix shape
|
||||
# (flipped to `_subactor` in `3a45dbd5`) AND from
|
||||
# `Aid.reprol()`'s internal format (currently
|
||||
# `<name>@<pid>`, but could evolve).
|
||||
expected: str = f'{_def_prefix}[{actor.aid.reprol()}]'
|
||||
# canonical wrapping: `tractor[<aid.reprol()>]`. We
|
||||
# compare against the runtime-computed `reprol()`
|
||||
# rather than a hard-coded value so the test stays
|
||||
# decoupled from `Aid.reprol()`'s internal format
|
||||
# (currently `<name>@<pid>`, but could evolve).
|
||||
expected: str = f'tractor[{actor.aid.reprol()}]'
|
||||
assert title == expected
|
||||
# sanity: the actor's name must be in the title
|
||||
# somewhere (so a future `reprol()` change that
|
||||
|
|
@ -146,17 +140,15 @@ def test_subactor_proctitle_visible_via_proc():
|
|||
)
|
||||
|
||||
pid, info = matched[0]
|
||||
# canonical proctitle prefix in cmdline (full form);
|
||||
# prefix sourced from `_def_prefix` so it tracks the
|
||||
# `3a45dbd5` flip (`tractor[` -> `_subactor[`).
|
||||
assert info['cmdline'].startswith(f'{_def_prefix}[proctitle_boi@'), (
|
||||
f'cmdline missing `{_def_prefix}[proctitle_boi@…]` prefix: '
|
||||
# canonical proctitle prefix in cmdline (full form)
|
||||
assert info['cmdline'].startswith('tractor[proctitle_boi@'), (
|
||||
f'cmdline missing `tractor[proctitle_boi@…]` prefix: '
|
||||
f'{info["cmdline"]!r}'
|
||||
)
|
||||
# comm is kernel-truncated to ~15 bytes — just check the
|
||||
# `<_def_prefix>[` prefix made it.
|
||||
assert info['comm'].startswith(f'{_def_prefix}['), (
|
||||
f'comm missing `{_def_prefix}[` prefix: {info["comm"]!r}'
|
||||
# `tractor[` prefix made it.
|
||||
assert info['comm'].startswith('tractor['), (
|
||||
f'comm missing `tractor[` prefix: {info["comm"]!r}'
|
||||
)
|
||||
# intrinsic-signal detector should match.
|
||||
assert info['is_tractor'] is True
|
||||
|
|
|
|||
|
|
@ -605,28 +605,17 @@ async def test_nested_multierrors(
|
|||
)
|
||||
)
|
||||
|
||||
# Per-backend/-depth budgets: in the non-hang case the
|
||||
# whole spawn + cancel-cascade should complete in well
|
||||
# under these. On the borderline hang case the
|
||||
# `fail_after_w_trace` fires `TooSlowError` AND captures a
|
||||
# ptree/wchan/py-spy snapshot to
|
||||
# 6s budget: in the non-hang case (and on the trio
|
||||
# backend) the whole spawn + cancel-cascade should
|
||||
# complete in well under that. On the borderline hang
|
||||
# case the `fail_after_w_trace` fires `TooSlowError`
|
||||
# AND captures a ptree/wchan/py-spy snapshot to
|
||||
# `$XDG_CACHE_HOME/tractor/hung-dumps/` for offline
|
||||
# inspection. See
|
||||
# `ai/conc-anal/cancel_cascade_too_slow_under_main_thread_forkserver_issue.md`.
|
||||
#
|
||||
# NOTE: the `trio` depth=3 budget was bumped 6 -> 12s after
|
||||
# the `trio` 0.29 -> 0.33 lock bump (commit c7741bba) slowed
|
||||
# the depth-3 cancel-cascade from <6s to ~7-8s; the 6s
|
||||
# deadline was firing and its `Cancelled(source='deadline')`
|
||||
# (trio 0.33 cancel-reason metadata) collapsed a BEG branch,
|
||||
# breaking the `RemoteActorError` assertion below. depth=1
|
||||
# still finishes in ~3s so keeps the 6s budget. See
|
||||
# `ai/conc-anal/trio_033_cancel_cascade_slowdown_depth3_issue.md`.
|
||||
match (start_method, depth):
|
||||
case ('trio', 1):
|
||||
case ('trio', _):
|
||||
timeout = 6
|
||||
case ('trio', 3):
|
||||
timeout = 12
|
||||
case ('main_thread_forkserver', 1):
|
||||
timeout = 16
|
||||
case ('main_thread_forkserver', 3):
|
||||
|
|
|
|||
|
|
@ -860,14 +860,7 @@ def test_echoserver_detailed_mechanics(
|
|||
timeout: float = (
|
||||
999 if tractor.debug_mode()
|
||||
else 4 if is_forking_spawner
|
||||
# was 1; the `trio` 0.29 -> 0.33 bump slowed the
|
||||
# cancel-cascade so a 1s budget raced the ~1s teardown
|
||||
# deadline. On a deadline-fire the injected
|
||||
# `Cancelled(source='deadline')` wraps the mid-stream
|
||||
# KBI in a `BaseExceptionGroup`, breaking the bare
|
||||
# `pytest.raises(KeyboardInterrupt)` below. See
|
||||
# `ai/conc-anal/trio_033_cancel_cascade_slowdown_depth3_issue.md`.
|
||||
else 4
|
||||
else 1
|
||||
)
|
||||
|
||||
# body factored out so the `fail_after_w_trace`-wrapping
|
||||
|
|
|
|||
|
|
@ -162,66 +162,6 @@ def test_implicit_mod_name_applied_for_child(
|
|||
assert submod.log.logger in sub_logs
|
||||
|
||||
|
||||
def test_io_custom_level_registered():
|
||||
'''
|
||||
The `IO`(21) level (registered via `add_log_level()` at
|
||||
import, for `tractor.trionics._subproc`'s std-stream relay)
|
||||
is fully wired and SHOWN BY DEFAULT at `info`-level consoles
|
||||
since `21 >= INFO(20)`.
|
||||
|
||||
'''
|
||||
import logging
|
||||
assert log.CUSTOM_LEVELS.get('IO') == 21
|
||||
assert logging.getLevelName(21) == 'IO'
|
||||
assert log.STD_PALETTE.get('IO')
|
||||
assert log.BOLD_PALETTE['bold'].get('IO')
|
||||
|
||||
iolog = log.get_logger('io_lvl_test')
|
||||
assert callable(getattr(iolog, 'io', None))
|
||||
# emit must not raise
|
||||
iolog.io('hello from the IO level')
|
||||
|
||||
# 21 >= INFO(20) -> shown when console set to `info`
|
||||
assert 21 >= logging.INFO
|
||||
|
||||
|
||||
def test_add_log_level_pluggable():
|
||||
'''
|
||||
`add_log_level()` is the single pluggable entry-point: one
|
||||
call wires `CUSTOM_LEVELS` + `addLevelName` + both palettes +
|
||||
a same-named `StackLevelAdapter` emit method (so
|
||||
`get_logger()`'s per-level audit passes).
|
||||
|
||||
'''
|
||||
import logging
|
||||
name: str = 'XLVL'
|
||||
val: int = 19
|
||||
try:
|
||||
log.add_log_level(name, val, 'cyan')
|
||||
|
||||
assert log.CUSTOM_LEVELS[name] == val
|
||||
assert logging.getLevelName(val) == name
|
||||
assert log.STD_PALETTE[name] == 'cyan'
|
||||
assert log.BOLD_PALETTE['bold'][name] == 'bold_cyan'
|
||||
|
||||
# the audit in `get_logger()` (asserts a method per
|
||||
# `CUSTOM_LEVELS` entry) must still pass.
|
||||
xlog = log.get_logger('xlvl_test')
|
||||
emit = getattr(xlog, name.lower(), None)
|
||||
assert callable(emit)
|
||||
emit('hello from a plugged-in level')
|
||||
|
||||
finally:
|
||||
# best-effort cleanup of our module-global mutations so
|
||||
# later `get_logger()` audits don't see a half-removed
|
||||
# level.
|
||||
log.CUSTOM_LEVELS.pop(name, None)
|
||||
log.STD_PALETTE.pop(name, None)
|
||||
log.BOLD_PALETTE['bold'].pop(name, None)
|
||||
if hasattr(log.StackLevelAdapter, name.lower()):
|
||||
delattr(log.StackLevelAdapter, name.lower())
|
||||
|
||||
|
||||
# TODO, moar tests against existing feats:
|
||||
# ------ - ------
|
||||
# - [ ] color settings?
|
||||
|
|
|
|||
|
|
@ -1,230 +0,0 @@
|
|||
'''
|
||||
Unit tests for `tractor.trionics.supervise_run_process` (in
|
||||
`tractor.trionics._subproc`) and its per-line std-stream relay.
|
||||
|
||||
Hermetic `trio`-only coverage (no actor-runtime needed):
|
||||
|
||||
- per-line stdout relay -> `log.io`
|
||||
- parent controlling-tty isolation (child fd1 is a pipe, fd0
|
||||
`/dev/null` — never the parent `/dev/pts/*`)
|
||||
- mandatory concurrent pipe-drain (no deadlock on >64KiB
|
||||
no-newline output)
|
||||
- live stderr relay + `CalledProcessError` rebuild (rc!=0 note)
|
||||
- legacy capture-stderr CPE note path
|
||||
|
||||
'''
|
||||
from functools import partial
|
||||
import subprocess
|
||||
|
||||
import pytest
|
||||
import trio
|
||||
|
||||
from tractor.trionics import (
|
||||
_subproc,
|
||||
collapse_eg,
|
||||
supervise_run_process,
|
||||
)
|
||||
|
||||
|
||||
def _capture_relay(monkeypatch, level: str = 'io') -> list[str]:
|
||||
'''
|
||||
Redirect `_subproc.log.<level>` (the relay's emit method —
|
||||
`io` by default, see `supervise_run_process(relay_level=...)`)
|
||||
into a list so tests can assert on the relayed lines.
|
||||
|
||||
'''
|
||||
records: list[str] = []
|
||||
monkeypatch.setattr(
|
||||
_subproc.log,
|
||||
level,
|
||||
lambda msg, *a, **k: records.append(msg),
|
||||
)
|
||||
return records
|
||||
|
||||
|
||||
def test_stdout_relayed_per_line(monkeypatch):
|
||||
records = _capture_relay(monkeypatch)
|
||||
|
||||
cmd = [
|
||||
'sh', '-c',
|
||||
'for i in 1 2 3; do echo line=$i; done',
|
||||
]
|
||||
|
||||
async def main():
|
||||
async with trio.open_nursery() as tn:
|
||||
await tn.start(
|
||||
partial(
|
||||
supervise_run_process,
|
||||
cmd,
|
||||
label='t-out',
|
||||
relay_stdout=True,
|
||||
)
|
||||
)
|
||||
|
||||
trio.run(main)
|
||||
|
||||
out_lines = [r for r in records if '[t-out:out]' in r]
|
||||
assert any('line=1' in r for r in out_lines)
|
||||
assert any('line=2' in r for r in out_lines)
|
||||
assert any('line=3' in r for r in out_lines)
|
||||
|
||||
|
||||
def test_parent_tty_isolated(monkeypatch):
|
||||
records = _capture_relay(monkeypatch)
|
||||
|
||||
cmd = [
|
||||
'sh', '-c',
|
||||
'readlink /proc/self/fd/0; readlink /proc/self/fd/1',
|
||||
]
|
||||
|
||||
async def main():
|
||||
async with trio.open_nursery() as tn:
|
||||
await tn.start(
|
||||
partial(
|
||||
supervise_run_process,
|
||||
cmd,
|
||||
label='t-tty',
|
||||
relay_stdout=True,
|
||||
)
|
||||
)
|
||||
|
||||
trio.run(main)
|
||||
|
||||
relayed = '\n'.join(records)
|
||||
# fd1 (stdout) must be OUR pipe, never a controlling tty.
|
||||
assert 'pipe:' in relayed
|
||||
assert '/dev/pts/' not in relayed
|
||||
# fd0 (stdin) is pinned to DEVNULL.
|
||||
assert '/dev/null' in relayed
|
||||
|
||||
|
||||
def test_no_deadlock_on_big_unnewlined_output(monkeypatch):
|
||||
'''
|
||||
>64KiB of output with NO newline: only completes because the
|
||||
relay reader concurrently drains the pipe (else the child
|
||||
blocks on `write()` when the OS pipe buffer fills).
|
||||
|
||||
'''
|
||||
records = _capture_relay(monkeypatch)
|
||||
|
||||
cmd = [
|
||||
'sh', '-c',
|
||||
'head -c 200000 /dev/zero | tr "\\0" x',
|
||||
]
|
||||
|
||||
async def main():
|
||||
# generous vs the ~ms real runtime, but bounded so a
|
||||
# genuine pipe-fill deadlock fails fast.
|
||||
with trio.fail_after(2):
|
||||
async with trio.open_nursery() as tn:
|
||||
await tn.start(
|
||||
partial(
|
||||
supervise_run_process,
|
||||
cmd,
|
||||
label='t-big',
|
||||
relay_stdout=True,
|
||||
)
|
||||
)
|
||||
|
||||
trio.run(main)
|
||||
|
||||
big = ''.join(
|
||||
r.split('] ', 1)[-1]
|
||||
for r in records
|
||||
if '[t-big:out]' in r
|
||||
)
|
||||
assert len(big) == 200_000
|
||||
|
||||
|
||||
def test_stderr_relay_and_cpe_rebuild(monkeypatch):
|
||||
'''
|
||||
`relay_stderr=True` PIPEs stderr ourselves (mutually
|
||||
exclusive with trio's `capture_stderr`), so on rc!=0 the
|
||||
wrapper rebuilds a `CalledProcessError` from the live
|
||||
accumulator and `.add_note()`s its `.stderr` — AND the
|
||||
stderr is relayed per-line live.
|
||||
|
||||
'''
|
||||
records = _capture_relay(monkeypatch)
|
||||
|
||||
cmd = [
|
||||
'sh', '-c',
|
||||
'echo boom 1>&2; exit 3',
|
||||
]
|
||||
|
||||
async def main():
|
||||
# `collapse_eg()` unwraps the parent-nursery's single-exc
|
||||
# eg so the bare CPE bubbles straight out (mirrors real
|
||||
# caller usage).
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
await tn.start(
|
||||
partial(
|
||||
supervise_run_process,
|
||||
cmd,
|
||||
label='t-err',
|
||||
relay_stderr=True,
|
||||
check=True,
|
||||
)
|
||||
)
|
||||
|
||||
with pytest.raises(subprocess.CalledProcessError) as ei:
|
||||
trio.run(main)
|
||||
|
||||
cpe = ei.value
|
||||
assert cpe.returncode == 3
|
||||
# rebuilt `.stderr` (trio did NOT capture since we PIPE'd it).
|
||||
assert b'boom' in (cpe.stderr or b'')
|
||||
# note attached for legible teardown reporting.
|
||||
assert any(
|
||||
'boom' in n
|
||||
for n in getattr(cpe, '__notes__', [])
|
||||
)
|
||||
# AND it was relayed live per-line.
|
||||
assert any(
|
||||
'[t-err:err]' in r and 'boom' in r
|
||||
for r in records
|
||||
)
|
||||
|
||||
|
||||
def test_nonrelay_cpe_note(monkeypatch):
|
||||
'''
|
||||
No live relay: stderr is silently drained + captured (NOT
|
||||
emitted), and on rc!=0 the wrapper rebuilds the
|
||||
`CalledProcessError` from that accumulator with a `.stderr`
|
||||
note — same deterministic post-drain path as the relay case.
|
||||
|
||||
'''
|
||||
cmd = [
|
||||
'sh', '-c',
|
||||
'echo nope 1>&2; exit 7',
|
||||
]
|
||||
|
||||
async def main():
|
||||
async with (
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
await tn.start(
|
||||
partial(
|
||||
supervise_run_process,
|
||||
cmd,
|
||||
label='t-legacy',
|
||||
check=True,
|
||||
# relay_* default False -> silent
|
||||
# drain+capture for the CPE note.
|
||||
)
|
||||
)
|
||||
|
||||
with pytest.raises(subprocess.CalledProcessError) as ei:
|
||||
trio.run(main)
|
||||
|
||||
cpe = ei.value
|
||||
assert cpe.returncode == 7
|
||||
assert b'nope' in (cpe.stderr or b'')
|
||||
assert any(
|
||||
'nope' in n
|
||||
for n in getattr(cpe, '__notes__', [])
|
||||
)
|
||||
|
|
@ -155,6 +155,7 @@ async def maybe_block_bp(
|
|||
os.environ.pop('PYTHONBREAKPOINT', None)
|
||||
|
||||
|
||||
|
||||
@acm
|
||||
async def open_root_actor(
|
||||
*,
|
||||
|
|
@ -185,7 +186,6 @@ async def open_root_actor(
|
|||
# enables the multi-process debugger support
|
||||
debug_mode: bool = False,
|
||||
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
|
||||
|
||||
# ^XXX NOTE^ the perf implications of use,
|
||||
# https://greenback.readthedocs.io/en/latest/principle.html#performance
|
||||
enable_stack_on_sig: bool = False,
|
||||
|
|
|
|||
|
|
@ -90,6 +90,7 @@ keys are caller-defined).
|
|||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
|
|
@ -98,9 +99,6 @@ import stat
|
|||
import sys
|
||||
import time
|
||||
|
||||
|
||||
from tractor.devx import _proctitle
|
||||
|
||||
# `/dev/shm` is the POSIX-shm filesystem on Linux + FreeBSD.
|
||||
# macOS uses `shm_open` syscalls without a fs-visible path,
|
||||
# so the shm helpers refuse to run there.
|
||||
|
|
@ -232,9 +230,9 @@ def _read_comm(pid: int) -> str:
|
|||
# while `cmdline` for zombies often reads as empty.
|
||||
_TRACTOR_PROC_CMDLINE_MARKERS: tuple[str, ...] = (
|
||||
'tractor._child',
|
||||
_proctitle._def_prefix,
|
||||
'tractor[',
|
||||
)
|
||||
_TRACTOR_PROC_COMM_MARKER: str = _proctitle._def_prefix
|
||||
_TRACTOR_PROC_COMM_MARKER: str = 'tractor['
|
||||
|
||||
|
||||
def _is_tractor_subactor(pid: int) -> bool:
|
||||
|
|
|
|||
|
|
@ -24,10 +24,7 @@ which" at a glance without needing to read full
|
|||
`/proc/<pid>/cmdline`.
|
||||
|
||||
Format:
|
||||
``<_def_prefix>[<aid.reprol()>]`` e.g. ``_subactor[doggy@1027301b]``
|
||||
(prefix from the `_def_prefix` const, flipped `tractor` ->
|
||||
`_subactor` so sub-actor procs are visually distinct from the
|
||||
root in `ps`/`htop` and the reap-recognition markers.)
|
||||
``tractor[<aid.reprol()>]`` e.g. ``tractor[doggy@1027301b]``
|
||||
|
||||
Uses the canonical `Aid.reprol()` form
|
||||
(``<name>@<uuid_short>``) so the proc-title matches the
|
||||
|
|
@ -55,13 +52,7 @@ except ImportError:
|
|||
_stp = None
|
||||
|
||||
|
||||
_def_prefix: str = '_subactor'
|
||||
|
||||
|
||||
def set_actor_proctitle(
|
||||
actor: 'Actor',
|
||||
prefix: str = _def_prefix,
|
||||
) -> str | None:
|
||||
def set_actor_proctitle(actor: 'Actor') -> str | None:
|
||||
'''
|
||||
Set the calling process's proc-title to identify it as a
|
||||
tractor sub-actor.
|
||||
|
|
@ -78,6 +69,6 @@ def set_actor_proctitle(
|
|||
if _stp is None:
|
||||
return None
|
||||
|
||||
title: str = f'{prefix}[{actor.aid.reprol()}]'
|
||||
title: str = f'tractor[{actor.aid.reprol()}]'
|
||||
_stp.setproctitle(title)
|
||||
return title
|
||||
|
|
|
|||
|
|
@ -398,7 +398,7 @@ async def handle_stream_from_peer(
|
|||
uid,
|
||||
None,
|
||||
)
|
||||
if event is not None:
|
||||
if event:
|
||||
con_status_steps += (
|
||||
' -> Waking subactor spawn waiters: '
|
||||
f'{event.statistics().tasks_waiting}\n'
|
||||
|
|
|
|||
|
|
@ -262,63 +262,6 @@ class StackLevelAdapter(LoggerAdapter):
|
|||
)
|
||||
|
||||
|
||||
def add_log_level(
|
||||
name: str,
|
||||
value: int,
|
||||
color: str = 'white',
|
||||
) -> None:
|
||||
'''
|
||||
Register a new custom log level with `tractor`'s logging
|
||||
machinery in ONE call — the single pluggable entry-point that
|
||||
keeps the (otherwise hand-synced) pieces consistent:
|
||||
|
||||
- `CUSTOM_LEVELS[name]` (drives the `stacklevel` bump in
|
||||
`StackLevelAdapter.log()` + the `get_logger()` audit).
|
||||
- `logging.addLevelName()` registration.
|
||||
- `STD_PALETTE`/`BOLD_PALETTE` color entries (consumed when
|
||||
`get_console_log()` builds its `ColoredFormatter`).
|
||||
- a same-named (lowercase) emit method bound on
|
||||
`StackLevelAdapter` so `log.<name>('msg')` works (and so
|
||||
`get_logger()`'s per-level method audit passes).
|
||||
|
||||
Idempotent: re-registering an existing name is a no-op-ish
|
||||
refresh (won't clobber an already-bound method).
|
||||
|
||||
'''
|
||||
name_up: str = name.upper()
|
||||
name_lo: str = name.lower()
|
||||
|
||||
CUSTOM_LEVELS[name_up] = value
|
||||
logging.addLevelName(value, name_up)
|
||||
STD_PALETTE[name_up] = color
|
||||
BOLD_PALETTE['bold'][name_up] = f'bold_{color}'
|
||||
|
||||
if not hasattr(StackLevelAdapter, name_lo):
|
||||
# bind via default-arg so `value` is captured (not
|
||||
# late-bound); delegates to `.log()` exactly like the
|
||||
# hand-written level methods above.
|
||||
def _emit(
|
||||
self,
|
||||
msg: str,
|
||||
*,
|
||||
_level: int = value,
|
||||
) -> None:
|
||||
return self.log(_level, msg)
|
||||
|
||||
_emit.__name__ = name_lo
|
||||
_emit.__qualname__ = f'StackLevelAdapter.{name_lo}'
|
||||
setattr(StackLevelAdapter, name_lo, _emit)
|
||||
|
||||
|
||||
# `IO`: child-subproc std-stream relay (see
|
||||
# `tractor.trionics._subproc`). Value 21 sits just ABOVE
|
||||
# `INFO`(20) so it's SHOWN BY DEFAULT at the usual `info`/`devx`
|
||||
# console levels (a `runtime`(15) relay would be silently
|
||||
# filtered) yet still distinctly labelled/colored + separately
|
||||
# filterable.
|
||||
add_log_level('IO', 21, 'purple')
|
||||
|
||||
|
||||
# TODO IDEAs:
|
||||
# -[ ] move to `.devx.pformat`?
|
||||
# -[ ] do per task-name and actor-name color coding
|
||||
|
|
|
|||
|
|
@ -38,6 +38,3 @@ from ._taskc import (
|
|||
maybe_raise_from_masking_exc as maybe_raise_from_masking_exc,
|
||||
start_or_cancel as start_or_cancel,
|
||||
)
|
||||
from ._subproc import (
|
||||
supervise_run_process as supervise_run_process,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -1,296 +0,0 @@
|
|||
# tractor: distributed structured concurrency.
|
||||
# Copyright 2018-eternity Tyler Goodlet.
|
||||
|
||||
# This program is free software: you can redistribute it and/or modify
|
||||
# it under the terms of the GNU Affero General Public License as published by
|
||||
# the Free Software Foundation, either version 3 of the License, or
|
||||
# (at your option) any later version.
|
||||
|
||||
# This program is distributed in the hope that it will be useful,
|
||||
# but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
# GNU Affero General Public License for more details.
|
||||
|
||||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <https://www.gnu.org/licenses/>.
|
||||
|
||||
'''
|
||||
SC-friendly `trio.run_process()` supervision: a `tn.start()`
|
||||
style wrapper which surfaces rc!=0 errors deterministically and
|
||||
(optionally) live-relays the child's std-streams to the `tractor`
|
||||
log.
|
||||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from functools import partial
|
||||
import subprocess
|
||||
import textwrap
|
||||
from typing import (
|
||||
Callable,
|
||||
)
|
||||
|
||||
import trio
|
||||
|
||||
from ..log import get_logger
|
||||
|
||||
log = get_logger()
|
||||
|
||||
|
||||
# sentinel so `supervise_run_process(stdout=...)` can tell
|
||||
# "caller passed nothing" (-> tty-safe `DEVNULL` default) from
|
||||
# an explicit `stdout=None` (inherit) override.
|
||||
_UNSET = object()
|
||||
|
||||
|
||||
def _add_stderr_note(
|
||||
cpe: subprocess.CalledProcessError,
|
||||
stderr_bytes: bytes,
|
||||
) -> None:
|
||||
'''
|
||||
Attach an indented `|_.stderr:` note to a
|
||||
`CalledProcessError` for legible rc!=0 reporting at
|
||||
teardown.
|
||||
|
||||
'''
|
||||
stderr_str: str = stderr_bytes.decode(errors='replace')
|
||||
cpe.add_note(
|
||||
f'|_.stderr:\n'
|
||||
f'{textwrap.indent(stderr_str, prefix=" "*3)}'
|
||||
)
|
||||
|
||||
|
||||
async def _relay_stream_lines(
|
||||
stream: trio.abc.ReceiveStream,
|
||||
*,
|
||||
emit: Callable[[str], None]|None = None,
|
||||
tag: str = '',
|
||||
accum: bytearray|None = None,
|
||||
) -> None:
|
||||
'''
|
||||
Concurrently drain a child subproc's `stdout`/`stderr`
|
||||
PIPE; relay each COMPLETE line to `emit` (a bound
|
||||
`log.<level>` method) prefixed with `tag` (e.g.
|
||||
`f'{label}:out'`) and/or append raw bytes to `accum`.
|
||||
|
||||
This reader is MANDATORY whenever a bare
|
||||
`stdout=`/`stderr=PIPE` is used WITHOUT `trio`'s
|
||||
`capture_*` (which would spawn trio's own internal drain
|
||||
task): nothing else drains the OS pipe, so once its kernel
|
||||
buffer (~64KiB) fills the child blocks on `write()` ->
|
||||
deadlock.
|
||||
|
||||
Modes (combine freely):
|
||||
- `emit`-only: live per-line relay (e.g. `relay_stdout`).
|
||||
- `accum`-only: silent drain + capture (e.g. stderr kept
|
||||
for a `CalledProcessError` note WITHOUT relaying it).
|
||||
- both: relay AND capture (e.g. `relay_stderr` with `check=True`).
|
||||
|
||||
'''
|
||||
# NOTE, mirrors `trio._subprocess`'s internal
|
||||
# `async with stream: async for ...` drain idiom — except
|
||||
# here we EMIT per-line (and/or accumulate) instead of
|
||||
# only accumulating.
|
||||
residual: bytes = b''
|
||||
async with stream: # aclose at EOF/cancel
|
||||
async for chunk in stream: # ends at child-exit EOF
|
||||
if accum is not None:
|
||||
accum += chunk
|
||||
if emit is None:
|
||||
continue # drain(+accum)-only
|
||||
buf: bytes = residual + chunk
|
||||
*lines, residual = buf.split(b'\n')
|
||||
for raw in lines:
|
||||
line: str = raw.decode(
|
||||
errors='replace',
|
||||
).rstrip('\r')
|
||||
emit(f'[{tag}] {line}')
|
||||
|
||||
# flush any trailing partial (un-newline-term'd) line @ EOF
|
||||
if (
|
||||
emit is not None
|
||||
and
|
||||
residual
|
||||
):
|
||||
line: str = residual.decode(
|
||||
errors='replace',
|
||||
).rstrip('\r')
|
||||
emit(f'[{tag}] {line}')
|
||||
|
||||
|
||||
async def supervise_run_process(
|
||||
cmd: list[str]|str,
|
||||
*,
|
||||
check: bool = True,
|
||||
label: str|None = None,
|
||||
|
||||
# per-line `log.*` relay of the child's std-streams
|
||||
# (tty-safe, capture-safe, STREAMED — not
|
||||
# buffered-until-exit, so it suits long-lived daemons).
|
||||
relay_stdout: bool = False,
|
||||
relay_stderr: bool = False,
|
||||
|
||||
# default `io` (our custom level, value 21): the relay
|
||||
# exists to make windowless-spawn output VISIBLE, and
|
||||
# `IO`(21) sorts just ABOVE `INFO`(20) so it shows at the
|
||||
# usual `info`/`devx` console levels (a `runtime`(15) relay
|
||||
# would be silently filtered) while staying distinctly
|
||||
# labelled + separately filterable.
|
||||
relay_level: str = 'io',
|
||||
|
||||
# non-relay `stdout` override; defaults (via `_UNSET`) to
|
||||
# `DEVNULL` so we NEVER inherit (+ thus can't clobber) the
|
||||
# parent controlling-tty.
|
||||
stdout: int = _UNSET,
|
||||
|
||||
task_status: trio.TaskStatus[
|
||||
trio.Process
|
||||
] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
# any other `trio.run_process()` kwarg (env, shell, cwd,
|
||||
# start_new_session, executable, ...) forwarded verbatim;
|
||||
# our MANAGED keys (stdin/stdout/stderr/check) are set
|
||||
# below and WIN on conflict.
|
||||
**run_process_kwargs,
|
||||
) -> None:
|
||||
'''
|
||||
A `trio.Nursery.start()`-style `trio.run_process()`
|
||||
wrapper which,
|
||||
|
||||
- surfaces a rc!=0 `subprocess.CalledProcessError`
|
||||
DETERMINISTICALLY: we pass `check=False` to `trio` and
|
||||
do our OWN post-drain rc-check, (re)building + raising a
|
||||
BARE CPE (with a `.stderr` note) from this coro's body
|
||||
AFTER the child exits — so there's no nursery-eg-wrapped
|
||||
CPE to catch/`collapse_eg`, and the relay reader is never
|
||||
race-cancelled mid-drain.
|
||||
|
||||
- ALWAYS isolates the parent controlling-tty
|
||||
(`stdin=DEVNULL`, and `stdout=DEVNULL` unless
|
||||
relayed/overridden) so a spawned program can't emit
|
||||
terminal control-seqs onto the launching tty (which
|
||||
would clobber its scrollback).
|
||||
|
||||
- optionally live-relays `stdout`/`stderr` per-line to
|
||||
`log.<relay_level>` via concurrent reader tasks (see
|
||||
`_relay_stream_lines`).
|
||||
|
||||
Delivers the live `trio.Process` via
|
||||
`task_status.started()` then SUPERVISES it (the
|
||||
`run_process` bg task + any relay readers) to completion
|
||||
in this coro — i.e. the parent `tn.start()` returns
|
||||
immediately/non-blocking.
|
||||
|
||||
NOTE: any crash-handling / `repl_fixture` layer is
|
||||
intentionally NOT baked in here — compose it ON TOP at the
|
||||
call-site, e.g.
|
||||
|
||||
async with maybe_open_crash_handler():
|
||||
await tn.start(
|
||||
partial(supervise_run_process, cmd, ...),
|
||||
)
|
||||
|
||||
'''
|
||||
emit: Callable[[str], None] = getattr(log, relay_level)
|
||||
tag: str = (
|
||||
label
|
||||
or
|
||||
(cmd if isinstance(cmd, str) else ' '.join(cmd))
|
||||
)
|
||||
|
||||
# forward any extra `trio.run_process` kwargs verbatim;
|
||||
# MANAGED keys below override on conflict.
|
||||
rp_kwargs: dict = dict(run_process_kwargs)
|
||||
|
||||
# XXX ALWAYS isolate the controlling-tty's stdin.
|
||||
rp_kwargs['stdin'] = subprocess.DEVNULL
|
||||
|
||||
# stdout: relay -> our own PIPE (drained by the reader
|
||||
# below); else an explicit override; else tty-safe
|
||||
# `DEVNULL`.
|
||||
if relay_stdout:
|
||||
rp_kwargs['stdout'] = subprocess.PIPE
|
||||
elif stdout is not _UNSET:
|
||||
rp_kwargs['stdout'] = stdout
|
||||
else:
|
||||
rp_kwargs['stdout'] = subprocess.DEVNULL
|
||||
|
||||
# stderr: PIPE (+ our reader) when we either RELAY it OR
|
||||
# need it captured for a rc!=0 CPE note; else tty-safe
|
||||
# `DEVNULL`. We accumulate ONLY when `check` (the note is
|
||||
# the only consumer).
|
||||
#
|
||||
# XXX we ALWAYS pass `check=False` to `trio` and do our
|
||||
# OWN deterministic post-drain rc-check (below) so `trio`
|
||||
# never raises a nursery-eg-wrapped CPE — no `collapse_eg`
|
||||
# workaround, no reader race-cancel.
|
||||
want_stderr_pipe: bool = relay_stderr or check
|
||||
stderr_accum: bytearray|None = bytearray() if check else None
|
||||
rp_kwargs['check'] = False
|
||||
rp_kwargs['stderr'] = (
|
||||
subprocess.PIPE if want_stderr_pipe
|
||||
else subprocess.DEVNULL
|
||||
)
|
||||
|
||||
async with trio.open_nursery() as own_tn:
|
||||
trio_proc: trio.Process = await own_tn.start(
|
||||
partial(
|
||||
trio.run_process,
|
||||
cmd,
|
||||
**rp_kwargs,
|
||||
)
|
||||
)
|
||||
|
||||
# spin up the concurrent pipe-drain relay reader(s) —
|
||||
# see `_relay_stream_lines` for why these are mandatory
|
||||
# (not cosmetic) when piping without `capture_*`.
|
||||
if relay_stdout:
|
||||
own_tn.start_soon(
|
||||
partial(
|
||||
_relay_stream_lines,
|
||||
trio_proc.stdout,
|
||||
emit=emit,
|
||||
tag=f'{tag}:out',
|
||||
)
|
||||
)
|
||||
if want_stderr_pipe:
|
||||
own_tn.start_soon(
|
||||
partial(
|
||||
_relay_stream_lines,
|
||||
trio_proc.stderr,
|
||||
# relay live only if asked; else silent
|
||||
# drain+capture for the CPE note.
|
||||
emit=emit if relay_stderr else None,
|
||||
tag=f'{tag}:err',
|
||||
accum=stderr_accum,
|
||||
)
|
||||
)
|
||||
|
||||
# hand the live proc up to the parent WITHOUT blocking
|
||||
# on the bg supervise/relay tasks (keeps non-blocking
|
||||
# `tn.start()` semantics).
|
||||
task_status.started(trio_proc)
|
||||
|
||||
# ===== deterministic post-drain rc-check (BOTH paths) =====
|
||||
# `own_tn` only unwinds once `run_process` AND the relay
|
||||
# reader(s) have hit EOF + FULLY drained — so `stderr_accum`
|
||||
# is COMPLETE here (no race vs an early CPE-cancel). Rebuild
|
||||
# + raise a BARE `CalledProcessError` (the parent `tn` will
|
||||
# eg-wrap it like any task-raise; callers `collapse_eg()` if
|
||||
# they want it bare).
|
||||
if (
|
||||
check
|
||||
and
|
||||
trio_proc.returncode
|
||||
):
|
||||
stderr_bytes: bytes = (
|
||||
bytes(stderr_accum)
|
||||
if stderr_accum is not None
|
||||
else b''
|
||||
)
|
||||
cpe = subprocess.CalledProcessError(
|
||||
returncode=trio_proc.returncode,
|
||||
cmd=trio_proc.args,
|
||||
stderr=stderr_bytes,
|
||||
)
|
||||
_add_stderr_note(cpe, stderr_bytes)
|
||||
raise cpe
|
||||
22
uv.lock
22
uv.lock
|
|
@ -797,7 +797,7 @@ dev = [
|
|||
{ name = "pytest-timeout", specifier = ">=2.3" },
|
||||
{ name = "stackscope", specifier = ">=0.2.2,<0.3" },
|
||||
{ name = "typing-extensions", specifier = ">=4.14.1" },
|
||||
{ name = "xonsh", specifier = ">=0.23.8" },
|
||||
{ name = "xonsh", specifier = ">=0.23.0" },
|
||||
]
|
||||
devx = [
|
||||
{ name = "stackscope", specifier = ">=0.2.2,<0.3" },
|
||||
|
|
@ -809,7 +809,7 @@ repl = [
|
|||
{ name = "prompt-toolkit", specifier = ">=3.0.50" },
|
||||
{ name = "psutil", specifier = ">=7.0.0" },
|
||||
{ name = "pyperclip", specifier = ">=1.9.0" },
|
||||
{ name = "xonsh", specifier = ">=0.23.8" },
|
||||
{ name = "xonsh", specifier = ">=0.23.0" },
|
||||
]
|
||||
subints = [{ name = "msgspec", marker = "python_full_version >= '3.14'", specifier = ">=0.21.0" }]
|
||||
sync-pause = [{ name = "greenback", marker = "python_full_version == '3.13.*'", specifier = ">=1.2.1,<2" }]
|
||||
|
|
@ -834,7 +834,7 @@ wheels = [
|
|||
|
||||
[[package]]
|
||||
name = "trio"
|
||||
version = "0.33.0"
|
||||
version = "0.29.0"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
dependencies = [
|
||||
{ name = "attrs" },
|
||||
|
|
@ -844,9 +844,9 @@ dependencies = [
|
|||
{ name = "sniffio" },
|
||||
{ name = "sortedcontainers" },
|
||||
]
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/52/b6/c744031c6f89b18b3f5f4f7338603ab381d740a7f45938c4607b2302481f/trio-0.33.0.tar.gz", hash = "sha256:a29b92b73f09d4b48ed249acd91073281a7f1063f09caba5dc70465b5c7aa970", size = 605109, upload-time = "2026-02-14T18:40:55.386Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/a1/47/f62e62a1a6f37909aed0bf8f5d5411e06fa03846cfcb64540cd1180ccc9f/trio-0.29.0.tar.gz", hash = "sha256:ea0d3967159fc130acb6939a0be0e558e364fee26b5deeecc893a6b08c361bdf", size = 588952, upload-time = "2025-02-14T07:13:50.724Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/1c/93/dab25dc87ac48da0fe0f6419e07d0bfd98799bed4e05e7b9e0f85a1a4b4b/trio-0.33.0-py3-none-any.whl", hash = "sha256:3bd5d87f781d9b0192d592aef28691f8951d6c2e41b7e1da4c25cde6c180ae9b", size = 510294, upload-time = "2026-02-14T18:40:53.313Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/c9/55/c4d9bea8b3d7937901958f65124123512419ab0eb73695e5f382521abbfb/trio-0.29.0-py3-none-any.whl", hash = "sha256:d8c463f1a9cc776ff63e331aba44c125f423a5a13c684307e828d930e625ba66", size = 492920, upload-time = "2025-02-14T07:13:48.696Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
@ -923,14 +923,14 @@ wheels = [
|
|||
|
||||
[[package]]
|
||||
name = "xonsh"
|
||||
version = "0.23.8"
|
||||
version = "0.23.2"
|
||||
source = { registry = "https://pypi.org/simple" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/8b/77/0c4c39ad866d4ea1ef553f325d16e804d1bf1eeecc591f0e81b057aa37db/xonsh-0.23.8.tar.gz", hash = "sha256:541bb976c93a81571792644403bae8737145023da5f48d4c493909ab5c04ba0f", size = 1172271, upload-time = "2026-05-30T04:47:22.53Z" }
|
||||
sdist = { url = "https://files.pythonhosted.org/packages/60/e5/2dfa99e21a8118bed0e73ed50e91962fdad01b900e23497064e8810b03b5/xonsh-0.23.2.tar.gz", hash = "sha256:633608c8292938af0f242f05326cc2912f25fa72bd808824ab0534a6df304402", size = 1030659, upload-time = "2026-04-26T19:28:40.744Z" }
|
||||
wheels = [
|
||||
{ url = "https://files.pythonhosted.org/packages/ca/4a/2aab8300ad218dfc7678c34d5f703f09df5681fecc6e66d48c951ef58049/xonsh-0.23.8-py311-none-any.whl", hash = "sha256:4bab3e405643df2cc78ec2cac13241471841796fe710386d2179666aae8a5f9c", size = 799846, upload-time = "2026-05-30T04:47:21.211Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/87/ec/aa66ef6046f90769dd8fcb3ddca9d00282d12e3d73645abbf12f190f17cf/xonsh-0.23.8-py312-none-any.whl", hash = "sha256:c7d0f0fba0cafe0bd75bf202820aeffc74b52943fa27d98d3b4346793f6ba493", size = 799868, upload-time = "2026-05-30T04:47:19.158Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/12/fe/2d757d82b57332f1c6cd3f8c168fbcf060a275895a763542255ae1c53d75/xonsh-0.23.8-py313-none-any.whl", hash = "sha256:1b7335522a6ecd63f0d84151977a7a9050874d3ecec00cf79919d0770bebb1b4", size = 800388, upload-time = "2026-05-30T04:47:18.47Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/80/96/567bb3131655ff73c821e8a030c53707ced6c8840330a859f67bbaefbd16/xonsh-0.23.8-py314-none-any.whl", hash = "sha256:2a411fc47958c6107b3e13372655d18c52be98368e2159a1910cfde77124b3b1", size = 800352, upload-time = "2026-05-30T04:47:14.812Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/53/0d/bf7869dd57b40888ea1da8fc88f70d8e94ec2f8ee236ea4c22a757593235/xonsh-0.23.2-py311-none-any.whl", hash = "sha256:a38dd84e23e97fc42e0156c80024b3449474dfcbb6c3a344bd38c45a2b2de44d", size = 756215, upload-time = "2026-04-26T19:28:38.875Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/f7/9f/b1bb0c15bf2120469c94b062f4b854588370ab94c7a1679c84ff646bf50b/xonsh-0.23.2-py312-none-any.whl", hash = "sha256:190a348fa19774de8e697af5f44c9adb95aca687fa475b31dda23d1a3462a3c6", size = 756224, upload-time = "2026-04-26T19:28:39.17Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/83/23/8e037579ac86d8f266b4116338f902eab04175b88574a6438ee739dd3084/xonsh-0.23.2-py313-none-any.whl", hash = "sha256:4ebbf42a94f505d25694f154556ca0caa149a3f59870ec850bd13ad8df519dce", size = 756728, upload-time = "2026-04-26T19:28:39.493Z" },
|
||||
{ url = "https://files.pythonhosted.org/packages/05/ec/090300d9c5f14f58b5a684302f43535457f733a62f11673aa3ac38460717/xonsh-0.23.2-py314-none-any.whl", hash = "sha256:5efcd0f6db8f9f1dace256de2c04c3c044f2d86b48434187c43a69d602283a9e", size = 756767, upload-time = "2026-04-26T19:28:37.218Z" },
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
|
|
|||
|
|
@ -488,7 +488,6 @@ def _tractor_reap(args):
|
|||
reap,
|
||||
reap_shm,
|
||||
reap_uds,
|
||||
_TRACTOR_PROC_CMDLINE_MARKERS,
|
||||
)
|
||||
|
||||
rc: int = 0
|
||||
|
|
@ -501,8 +500,9 @@ def _tractor_reap(args):
|
|||
else:
|
||||
pids = find_orphans()
|
||||
mode = (
|
||||
f'orphans (PPid==1, intrinsic '
|
||||
f'cmdline/comm match — {_TRACTOR_PROC_CMDLINE_MARKERS}'
|
||||
'orphans (PPid==1, intrinsic '
|
||||
'cmdline/comm match — `tractor[…]` or '
|
||||
'`tractor._child`)'
|
||||
)
|
||||
|
||||
if not pids:
|
||||
|
|
|
|||
Loading…
Reference in New Issue