Compare commits

...

10 Commits

Author SHA1 Message Date
Gud Boi 0952b33a9e Code-style, couple newline/ws tweaks
(cherry picked from commit 8526985c972d6f47bcb9414907c76fbc2ac561d2)
2026-06-09 20:28:42 -04:00
Gud Boi ade15b4204 Pin to latest `xonsh` release
(cherry picked from commit c4cad921b9)
2026-06-09 20:28:42 -04:00
Gud Boi d0144e52cb Bump trio `echoserver` cancel timeout 1→4s
Same trio 0.29 → 0.33 cancel-cascade slowdown that hit
`test_nested_multierrors` (ea67f1b6) — bumps the
`trio`-backend (non-debug, non-forking) budget in
`test_echoserver_detailed_mechanics` from 1s → 4s.

- The 1s budget raced the ~1s teardown deadline. On a
  deadline-fire trio 0.33 injects
  `Cancelled(source='deadline')` (cancel-reason
  metadata) that wraps the mid-stream KBI in a
  `BaseExceptionGroup`, breaking the bare
  `pytest.raises(KeyboardInterrupt)` below.
- Bump matches the forking-spawner branch (4s).
- Inline NOTE references the tracking issue
  `ai/conc-anal/trio_033_cancel_cascade_slowdown_depth3_issue.md`.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

(cherry picked from commit d7da502d93)
2026-06-09 20:28:42 -04:00
Gud Boi 57b3ea59ea Bump trio depth=3 cancel timeout 6→12s
trio 0.29 → 0.33 lock bump (c7741bba) slowed the
depth=3 cancel-cascade in `test_nested_multierrors`
from <6s to ~7-8s; the 6s deadline was firing and its
`Cancelled(source='deadline')` (trio 0.33's new
cancel-reason metadata) collapsed a BEG branch,
breaking the `RemoteActorError` assertion downstream.

- Split the `('trio', _)` case-match into per-depth
  arms: `('trio', 1)` keeps 6s (still finishes in
  ~3s); `('trio', 3)` → 12s.
- Updated inline NOTE explains the version pivot +
  links the tracking issue
  `ai/conc-anal/trio_033_cancel_cascade_slowdown_depth3_issue.md`.
- Existing MTF/`subint_forkserver` budgets unchanged.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

(cherry picked from commit ea67f1b67b)
2026-06-09 20:28:42 -04:00
Gud Boi c07cf2546b Strip ANSI + accept `_create(...)` in devx tests
Two version-compat fixes for the `devx` debugger
test-suite, both about matching upstream output that
got more verbose w/ recent lib releases.

ANSI stripping (`tests/devx/conftest.py`),
- Add `ansi_strip(text)` helper + `_ansi_re` pattern
  (regex per https://stackoverflow.com/a/14693789).
- Apply inside `in_prompt_msg()` + `assert_before()` so
  substring matches against REPL/traceback output stay
  robust to color leakage.
- Motivated by py3.13's colored tracebacks +
  `pdbp`/pygments highlighting leaking ANSI even when
  `PYTHON_COLORS=0` is set in the `spawn` fixture (not
  every renderer in the spawned subproc honors it).
- Replaces the longstanding inline TODO that linked
  the SO answer w/o impl'ing.

trio 0.30+ `Cancelled._create(` match (`test_debugger`),
- In `test_shield_pause` swap the two
  `"raise Cancelled._create()"` assertion patterns →
  `"raise Cancelled._create("` (open-paren form, no
  closing).
- trio >=0.30 raises a multi-line
  `raise Cancelled._create(source=.., reason=..,
  source_task=..)` w/ cancel-reason metadata, so the
  legacy bare-`()` form no longer matches. Inline
  comment documents the trio-version pivot.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

(cherry picked from commit 3854cf5ecb)
2026-06-09 20:28:42 -04:00
Gud Boi fd8d39c0ce Hoist proc-title prefix to `_def_prefix` const
Make the sub-actor proc-title prefix a single
authoritative constant (`_proctitle._def_prefix`) so
the reap-recognition markers and `xontrib` banner pick
it up automatically — one place to flip the prefix
shape going fwd.

Deats,
- `_proctitle._def_prefix: str = '_subactor'`. New
  module-level const consumed by everything that needs
  to know the prefix.
- `set_actor_proctitle(actor, prefix=_def_prefix)`:
  takes an explicit `prefix` arg (default = the const)
  so callers can override per-spawn if they want.
- Default proc-title format:
  `'tractor[<reprol>]'` → `f'{prefix}[<reprol>]'`
  i.e. `_subactor[<reprol>]` by default.
- `_testing/_reap.py`: cmdline + comm markers source
  the prefix from `_proctitle._def_prefix` instead of
  the hardcoded `'tractor['`. So
  `_is_tractor_subactor()` tracks the const
  automatically.
- `xontrib/tractor_diag.xsh`: `acli.reap` orphan-mode
  banner now interpolates the
  `_TRACTOR_PROC_CMDLINE_MARKERS` tuple directly so
  the human-readable mode line stays in sync if the
  prefix shape changes again.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

(cherry picked from commit 3a45dbd503)
2026-06-09 20:28:42 -04:00
Gud Boi 93558fe3c9 Add `add_log_level()` factory + register `IO`=21
Follow-up to f595acc7 (`supervise_run_process`) which
called `log.io(...)` for std-stream relay assuming an
`IO=21` level existed. Add the registration via a new
factory + tests covering both the factory and the new
level.

`add_log_level()` factory,
- One call wires the four (otherwise hand-synced) pieces:
  - `CUSTOM_LEVELS[NAME]` — drives the `stacklevel` bump
    in `StackLevelAdapter.log()` + `get_logger()`'s
    per-level audit.
  - `logging.addLevelName()` — stdlib name registration.
  - `STD_PALETTE[NAME]` + `BOLD_PALETTE['bold'][NAME]` —
    color entries consumed by `get_console_log()`'s
    `ColoredFormatter` build.
  - Same-named (lowercase) emit method bound on
    `StackLevelAdapter` so `log.<name>('msg')` works +
    `get_logger()`'s per-level method audit passes.
- Idempotent: re-registering an existing name is a
  no-op-ish refresh that won't clobber an already-bound
  method.
- Method binding uses a default-arg `_level=value` so
  the level int is captured (not late-bound across
  multiple registrations).

`IO=21` level (first user),
- Purple. Used by `tractor.trionics._subproc`'s
  std-stream relay (see f595acc7).
- Value 21 picked to sit just ABOVE stdlib `INFO`=20 so
  it's SHOWN BY DEFAULT at usual `info`/`devx` console
  levels — a `runtime`=15 relay would be silently
  filtered (footgun for daemon supervisors whose whole
  point is visibility). Still distinctly labeled +
  filterable.

Tests (`tests/test_log_sys.py`),
- `test_io_custom_level_registered`: validates the IO
  level is fully wired (`CUSTOM_LEVELS`, `addLevelName`,
  both palettes, `StackLevelAdapter.io()` callable);
  emits a record + sanity-asserts `21 >= INFO(20)`.
- `test_add_log_level_pluggable`: registers a fresh
  `XLVL=19` (cyan) via `add_log_level()`, asserts all
  four wires + the bound `xlog.xlvl()` emit, then
  try/finally cleans up the module-global mutations so
  later `get_logger()` audits don't trip on a
  half-removed level.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

(cherry picked from commit 7bd7dd50c7)
2026-06-09 20:28:42 -04:00
Gud Boi 6df9ee11bc Add `supervise_run_process` to `trionics._subproc`
A `trio.Nursery.start()`-style wrapper around
`trio.run_process()` that surfaces rc!=0 errors
deterministically, ALWAYS isolates the parent
controlling-tty, and optionally live-relays the child's
std-streams to `log.<level>` per-line. Suits both
short-lived test-runners + long-lived daemons.

`supervise_run_process()`,
- Deterministic rc!=0: pass `check=False` to `trio`
  and do our OWN post-drain rc-check from the
  supervisor coro body AFTER `own_tn.__aexit__` — NOT
  inside the internal nursery, since that would
  race-cancel the still-draining relay reader and lose
  stderr lines. (Re)build + raise a BARE
  `subprocess.CalledProcessError`: `.stderr=` for
  programmatic callers + an `add_note()`'d
  `|_.stderr:` block for human teardown logs. No
  nursery-eg-wrapped CPE to `collapse_eg` around.
- Parent controlling-tty isolation: `stdin=DEVNULL`
  always, `stdout=DEVNULL` unless relayed/overridden
  (via `stdout=` kwarg w/ `_UNSET` sentinel so explicit
  `None` = inherit still works). Prevents a spawned
  program from clobbering the launching tty's scrollback
  w/ control-seqs.
- Live per-line relay: `relay_stdout=True`/
  `relay_stderr=True` → relayed to `log.<relay_level>`
  (default `'io'`, our custom level 21). Picked to sort
  just above stdlib `INFO`=20 so it shows at usual
  `info`/`devx` levels yet stays separately filterable;
  `runtime`=15 was REJECTED as a default since it'd be
  silently filtered at usual verbosity — footgun for
  daemon supervisors whose whole point is visibility.
  STREAMED, not buffered-until-exit.
- Non-blocking `tn.start()` semantics: live
  `trio.Process` handed up via
  `task_status.started()` immediately (else
  `tn.start()` would block till child exit, losing
  the long-lived-daemon use case). Supervise/relay bg
  tasks run to completion in this coro.
- `**run_process_kwargs` forwarded verbatim (env, shell,
  cwd, start_new_session, executable, ...); MANAGED keys
  (`stdin`/`stdout`/`stderr`/`check`) win on conflict.
- Crash-handling layer intentionally NOT baked in —
  compose `maybe_open_crash_handler()` ON TOP at the
  call-site.

`_relay_stream_lines()` helper,
- Concurrent pipe-drain reader. MANDATORY whenever piping
  w/o `capture_*` since nothing else drains the OS pipe —
  child blocks on `write()` once kernel buf (~64KiB) fills
  → deadlock.
- Modes (combine freely): `emit`-only live relay,
  `accum`-only silent drain+capture (for the CPE note),
  or both. Per-line splitting handles cross-chunk
  residuals + flushes any trailing un-newline-term'd line
  at EOF.

`_add_stderr_note()` helper,
- Attaches an indented `|_.stderr:` note to a CPE via
  `add_note()` for legible rc!=0 reporting at teardown.

Tests (`tests/trionics/test_subproc.py`),
- Hermetic `trio`-only (no actor-runtime).
- `test_stdout_relayed_per_line`: per-line stdout relay.
- `test_parent_tty_isolated`: child fd1 is OUR pipe (no
  `/dev/pts/*`), fd0 pinned to `/dev/null`.
- `test_no_deadlock_on_big_unnewlined_output`: 200KiB
  no-newline output completes under `fail_after(2)` —
  exercises the concurrent drain (without it, the child
  blocks at ~64KiB).
- `test_stderr_relay_and_cpe_rebuild`: rc!=0 w/
  `relay_stderr=True` → bare `CalledProcessError` w/ the
  `.stderr` note + per-line live relay.
- `test_nonrelay_cpe_note`: rc!=0 w/o relay → same
  deterministic post-drain CPE w/ `.stderr` note (silent
  drain+capture path).

Re-export `supervise_run_process` from `tractor.trionics`.

Prompt-IO: ai/prompt-io/claude/20260601T231429Z_0e3e008b_prompt_io.md

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

(cherry picked from commit f595acc76c)
2026-06-09 20:28:42 -04:00
Gud Boi 13ed668512 Use `is not None` check for peer-connect `event`
Matches the explicit `dict.pop(uid, None)` contract one
line above; same semantics as the prior truthy check.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

(cherry picked from commit 0e3e008b0c)
2026-06-09 20:28:42 -04:00
Gud Boi 83acbaffe5 Lock us to latest `trio==0.33.0`
(cherry picked from commit c7741bbac3)
2026-06-09 20:28:42 -04:00
21 changed files with 1124 additions and 56 deletions

View File

@ -0,0 +1,102 @@
# `trio` 0.29 -> 0.33 slows the depth=3 cancel-cascade
## Symptom
After locking to `trio==0.33.0` (commit `c7741bba`, was
`0.29.0`), this test reliably trips its `fail_after`
deadline on the **`trio`** backend:
```
FAILED tests/test_cancellation.py::test_nested_multierrors[start_method=trio-depth=3]
- AssertionError: assert False
where False = isinstance(
Cancelled(source='deadline', source_task=None, reason=None),
tractor.RemoteActorError,
)
```
A `fail_after_w_trace` hang-snapshot is captured for the
test each run (deadline-injected `Cancelled` wrapped into
the actor-nursery `BaseExceptionGroup`).
## Root cause (immediate)
The test budgets `fail_after(6)` for the `trio` backend.
That 6s was chosen (commit `32955db0`, while `trio==0.29`)
with the assertion that trio finishes "well under" 6s.
The `trio` 0.29 -> 0.33 bump slowed the depth=3 cascade
past that budget, so the 6s deadline now fires mid-cascade.
trio 0.33 added **cancel-reason tracking** — every
`Cancelled` now carries `(source=, reason=, source_task=)`.
The injected exc is `Cancelled(source='deadline')`, i.e.
trio itself naming our `fail_after(6)` scope as the cancel
origin. When that `Cancelled` collapses one branch of the
nursery BEG, the test's `isinstance(subexc,
RemoteActorError)` assertion fails. The healthy outcome is
`BEG = [RemoteActorError, RemoteActorError]`; the
`Cancelled` is purely an artifact of the deadline cutting
the cascade short.
## Measurements (standalone, this machine)
```
depth=1 trio ~3.15s PASS (keeps 6s budget)
depth=3 trio ~6.8-8.2s FAIL @ 6s (now bumped to 12s)
```
depth=1 still fits comfortably; only depth=3 (deeper
recursive spawn-and-error tree => more actors to reap)
exceeds the old budget. The ~2s/depth-level cost looks
like serialized per-actor reap / `terminate_after` waits.
## Mitigation applied
`test_nested_multierrors` now splits the `trio` budget:
```python
case ('trio', 1):
timeout = 6
case ('trio', 3):
timeout = 12 # was 6; see this doc
```
This stops the deadline from firing so the cascade
completes naturally to `[RAE, RAE]`.
## Also affected — same root cause, different test
`test_echoserver_detailed_mechanics[trio-raise_error=KeyboardInterrupt]`
(`tests/test_infected_asyncio.py`) tripped the *same*
slowdown via its much tighter `trio` budget of `1s`. The
single-aio-subactor teardown now takes ~1s, so the `1s`
`fail_after` raced the deadline (PASS at 0.99s / FAIL at
1.03s across back-to-back standalone runs). On a deadline-
fire the injected `Cancelled(source='deadline')` wraps the
mid-stream `KeyboardInterrupt` into a `BaseExceptionGroup`,
which is NOT a `KeyboardInterrupt` so the bare
`pytest.raises(KeyboardInterrupt)` fails. (The sibling
`raise_error=Exception` variant only "passes" by accident:
an `ExceptionGroup` *is-a* `Exception`, so its
`pytest.raises(Exception)` still matches even when wrapped.)
Mitigation: bump that `trio` budget `1 -> 4s` (matching the
forking-spawner case). Without a deadline-fire the KBI
propagates bare and the assertion passes.
## Open follow-up (the actual regression)
The budget bump is a band-aid — the underlying question is
**why** the depth=3 `trio` cancel-cascade went from <6s to
~7-8s across `trio` 0.29 -> 0.33. Candidate avenues:
- which scope owns the per-actor `terminate_after` wait,
and are the tree's reaps concurrent or serialized?
- did trio 0.33's abort/reschedule or cancel-reason
bookkeeping change checkpoint timing on the cancel path?
If/when the cascade speeds back up under-budget, depth=3
will start completing well under 12s — at which point the
budget can be tightened back toward 6s as a regression
tripwire. Related (different backend, same cascade class):
`cancel_cascade_too_slow_under_main_thread_forkserver_issue.md`.

View File

@ -0,0 +1,146 @@
---
model: claude-opus-4-7[1m]
service: claude
session: trio-0.33-subproc-supervisor-retroactive
timestamp: 2026-06-01T23:14:29Z
git_ref: 0e3e008b
scope: code
substantive: true
raw_file: 20260601T231429Z_0e3e008b_prompt_io.raw.md
---
## Prompt
**RETROACTIVE LOG** — original session prompts not
preserved; reconstructed from the staged work product.
The work designs a `trio.Nursery.start()`-style wrapper
around `trio.run_process()` for SC-friendly subprocess
supervision. From the resulting code shape, the
prompting intent was:
1. Surface rc!=0 `CalledProcessError` DETERMINISTICALLY,
without the nursery-eg-wrapping that complicates
`collapse_eg()` usage and races the relay reader on
trio's `check=True`-driven cancel cascade.
2. ALWAYS isolate the parent controlling-tty so a
spawned child can't emit terminal control-seqs onto
the launching tty (clobbering scrollback). Default
`stdin=DEVNULL`; default `stdout=DEVNULL` unless
explicitly relayed/overridden; distinguish "caller
passed nothing" from "caller passed `None` for
inherit".
3. Optional live per-line relay of child std-streams to
the `tractor` log — STREAMED (not
buffered-until-exit) so long-lived daemon output is
visible during the run. Pick a custom log level that
shows at usual `info`/`devx` console levels but is
separately filterable.
4. Concurrent pipe-drain reader MANDATORY when piping
without `capture_*` — without it the child blocks on
`write()` once the OS pipe buffer fills (~64KiB),
causing deadlocks on output bursts.
5. Non-blocking `tn.start()` semantics: hand the live
`trio.Process` to the parent immediately;
supervise/relay run to completion in the supervisor
coro.
6. Hermetic `trio`-only unit tests (no actor-runtime)
covering each of: per-line relay, tty isolation,
no-deadlock on >64KiB unnewlined output, CPE
rebuild w/ stderr relay, CPE rebuild on the silent
drain+capture path.
## Response summary
Adds `tractor/trionics/_subproc.py` (296 LOC) +
`tests/trionics/test_subproc.py` (230 LOC) + a
re-export in `tractor/trionics/__init__.py`.
**`supervise_run_process()`** (public, re-exported)
- `check=False` is forced to `trio.run_process`; the
rc-check runs in the supervisor coro AFTER `own_tn`
unwinds (both the child AND the relay readers have
hit EOF + fully drained). A BARE
`subprocess.CalledProcessError` is rebuilt + raised
from there, with `.stderr` bytes passed in the
constructor AND attached as an `add_note()`'d
`|_.stderr:` block for legible teardown logs.
- `stdin=DEVNULL` always. `stdout` default chosen via a
`_UNSET` sentinel: `relay_stdout=True` → PIPE,
explicit `stdout=...` → as given, else `DEVNULL`.
`stderr` defaults to PIPE whenever we relay OR need
the CPE note (when `check=True`), else `DEVNULL`.
- `relay_level='io'` (custom level 21; sorts just
above stdlib `INFO`=20 so it shows at usual
`info`/`devx` levels and stays separately
filterable). `runtime`=15 would silently filter at
default levels, so it's rejected as a default.
- `task_status.started(trio_proc)` delivers the live
process immediately. The internal `own_tn`
supervises `trio.run_process` + any relay readers to
completion.
- `**run_process_kwargs` forward verbatim;
`stdin/stdout/stderr/check` are MANAGED keys
(override on conflict).
- Crash-handling deliberately NOT baked in — compose
`maybe_open_crash_handler()` on top at the call-site.
**`_relay_stream_lines()`** (internal helper)
- Three modes (combinable): `emit`-only (live per-line
relay), `accum`-only (silent drain+capture for a CPE
note), or both (live relay AND capture).
- Per-line split handles cross-chunk residuals via a
rolling `residual` bytes buffer; flushes any trailing
un-newline-term'd line at EOF.
- `async with stream:` ensures aclose at EOF/cancel
(mirrors trio's internal `_subprocess` drain idiom).
**`_add_stderr_note()`** (internal helper)
- `add_note()`s a `textwrap.indent(...)`'d
`|_.stderr:` block onto a `CalledProcessError` for
teardown logs.
**Tests** (5 hermetic, trio-only) — `_capture_relay`
fixture monkeypatches `_subproc.log.<level>` to a list:
- `test_stdout_relayed_per_line`: per-line stdout
relay carries each `line=N` to the records.
- `test_parent_tty_isolated`: `readlink /proc/self/fd/0`
and `fd/1` from the child show `pipe:` (fd1) +
`/dev/null` (fd0); NO `/dev/pts/*`.
- `test_no_deadlock_on_big_unnewlined_output`: 200KiB
of `x` with no newlines completes inside
`fail_after(2)` — exercises the concurrent drain.
- `test_stderr_relay_and_cpe_rebuild`: rc=3 with
`relay_stderr=True` raises bare CPE
(via `collapse_eg()`) with `b'boom' in cpe.stderr`,
the note attached, AND per-line live relay.
- `test_nonrelay_cpe_note`: rc=7 with no relay still
produces CPE with `.stderr` + note via the silent
drain+capture path.
## Files changed
- `tractor/trionics/_subproc.py` — NEW. Public
`supervise_run_process()` + helpers
`_relay_stream_lines()` / `_add_stderr_note()` + the
`_UNSET` sentinel.
- `tests/trionics/test_subproc.py` — NEW. 5 hermetic
trio-only tests + `_capture_relay` monkeypatch
fixture.
- `tractor/trionics/__init__.py` — re-export
`supervise_run_process`.
## Human edits
**RETROACTIVE**: this log is being written from the
staged diff, not from a live session. The code as
staged is the canonical artifact; any human edits the
user made during the originating design session are
already integrated and cannot be separated post-hoc.
The `.raw.md` sibling is a diff-pointer placeholder,
NOT a pre-edit transcript.
Future prompt-io entries for in-flight work should be
written DURING the design session per the skill
contract so the pre-edit `.raw.md` captures the
unedited model output for genuine provenance.

View File

@ -0,0 +1,106 @@
---
model: claude-opus-4-7[1m]
service: claude
timestamp: 2026-06-01T23:14:29Z
git_ref: 0e3e008b
diff_cmd: git diff HEAD~1..HEAD
---
# RETROACTIVE — original model output not preserved
This `.raw.md` would normally contain the verbatim
pre-human-edit response from the design session that
produced the staged `_subproc.py` module + tests. That
session's transcript is not available, so this file
serves as a diff-pointer placeholder + transparency
note.
## Authoritative artifact
The committed code IS the artifact of record. Once the
companion commit lands, the unified diff is:
> `git diff HEAD~1..HEAD -- tractor/trionics/_subproc.py`
> `git diff HEAD~1..HEAD -- tests/trionics/test_subproc.py`
> `git diff HEAD~1..HEAD -- tractor/trionics/__init__.py`
Before committing, substitute `--cached` for the
pre-commit form.
## What is NOT here
Because this is retroactive:
- No verbatim chain-of-thought / discussion prose from
the design session.
- No rejected alternatives the model considered before
arriving at the final shape (e.g. whether the
rc-check should live inside `own_tn` vs after it; the
`_UNSET` sentinel vs a `None`-means-DEVNULL
convention; `io` vs `info` as the default relay
level).
- No pre-edit code blocks as the model first emitted
them, separable from any user cleanup applied before
the diff was staged.
## Inferred design choices visible in the final code
(Documented here because they're the kind of decision
detail an unedited raw transcript would have captured.)
1. **Post-drain rc-check in the supervisor coro body,
AFTER `own_tn.__aexit__`.** Placing the
`CalledProcessError` raise here (not inside
`own_tn`) means the EG-unwrap happens at the OUTER
`tn.start()` boundary — callers do `collapse_eg()`
if they want bare. Doing the raise INSIDE `own_tn`
would cancel the still-draining relay reader
mid-flight and lose stderr lines.
2. **`_UNSET` sentinel for `stdout`.** A plain default
of `None` couldn't distinguish "use the safe
`DEVNULL` default" from "caller explicitly passed
`None` (inherit, presumably knowingly)". The
sentinel keeps the SAFE default while letting power
users opt into inherit.
3. **`relay_level='io'` (custom level 21).** Chosen to
sort just above stdlib `INFO`=20 so a default
`--ll info` shows the relay, but it remains a
distinct level so users can filter
`tractor.trionics:io` separately. Picking
`runtime`=15 would have made the relay invisible at
default verbosity (a footgun for daemon supervisors
whose whole point is "I want to see this output").
4. **Reader is MANDATORY, not opt-in cosmetic.** With
`stdout=PIPE` / `stderr=PIPE` we OWN the drain
responsibility — there's no `trio.capture_*` running
under the hood here. The ~64KiB OS pipe buffer
means a child writing more than that without us
reading hangs at `write()` — a deadlock that won't
show up in small-output tests, which is why the
200KiB-no-newline test is in the suite.
5. **`task_status.started(trio_proc)` BEFORE the
`own_tn` exits.** Without this, `tn.start()` would
block until the child exits — losing the "start a
long-lived daemon and continue with parent work"
use case. With it, the parent gets the live process
handle immediately and the supervise+relay tasks
run in the supervisor coro until the child exits.
6. **`__notes__` via `add_note()` for the CPE
`.stderr`.** The `.stderr` attribute is what
`subprocess` callers expect; the `add_note()` is
what trio's exception-rendering shows. Both wired so
programmatic AND human consumers see the stderr at
teardown.
## Honesty statement
This file's content is RECONSTRUCTED from the staged
code, not extracted from a verbatim model transcript.
The prompt-io skill's intent is for the `.raw.md` to
be a pre-edit fossil; that's not possible here. Future
work should write the prompt-io entry DURING the
design session.

View File

@ -105,7 +105,7 @@ testing = [
repl = [ repl = [
"pyperclip>=1.9.0", "pyperclip>=1.9.0",
"prompt-toolkit>=3.0.50", "prompt-toolkit>=3.0.50",
"xonsh>=0.23.0", "xonsh>=0.23.8",
"psutil>=7.0.0", "psutil>=7.0.0",
] ]
lint = [ lint = [

View File

@ -5,6 +5,7 @@
from __future__ import annotations from __future__ import annotations
import platform import platform
import os import os
import re
import signal import signal
import time import time
from typing import ( from typing import (
@ -294,6 +295,26 @@ def expect(
PROMPT = r"\(Pdb\+\)" PROMPT = r"\(Pdb\+\)"
# Strip terminal color / ANSI-VT100 escape sequences so
# substring matching against REPL + traceback output stays
# robust to color leakage — Python 3.13's colored tracebacks,
# `pdbp`'s pygments highlighting, etc. — even when
# `PYTHON_COLORS=0` (set in the `spawn` fixture) isn't honored
# by every renderer in the spawned subproc.
# Regex per https://stackoverflow.com/a/14693789
_ansi_re: re.Pattern = re.compile(
r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])'
)
def ansi_strip(text: str) -> str:
'''
Remove ANSI/VT100 escape sequences from `text`.
'''
return _ansi_re.sub('', text)
def in_prompt_msg( def in_prompt_msg(
child: SpawnBase, child: SpawnBase,
parts: list[str], parts: list[str],
@ -313,7 +334,7 @@ def in_prompt_msg(
''' '''
__tracebackhide__: bool = False __tracebackhide__: bool = False
before: str = str(child.before.decode()) before: str = ansi_strip(str(child.before.decode()))
for part in parts: for part in parts:
if part not in before: if part not in before:
if pause_on_false: if pause_on_false:
@ -333,9 +354,9 @@ def in_prompt_msg(
return True return True
# TODO: todo support terminal color-chars stripping so we can match # NB: color-char stripping (so we can match against call-stack
# against call stack frame output from the the 'll' command the like! # frame output from the `ll` command and the like) is handled by
# -[ ] SO answer for stipping ANSI codes: https://stackoverflow.com/a/14693789 # `ansi_strip()` applied inside `in_prompt_msg()` + below.
def assert_before( def assert_before(
child: SpawnBase, child: SpawnBase,
patts: list[str], patts: list[str],
@ -356,7 +377,7 @@ def assert_before(
err_on_false=True, err_on_false=True,
**kwargs **kwargs
) )
before: str = str(child.before.decode()) before: str = ansi_strip(str(child.before.decode()))
return before return before

View File

@ -1186,7 +1186,12 @@ def test_shield_pause(
"('cancelled_before_pause'", # actor name "('cancelled_before_pause'", # actor name
_repl_fail_msg, _repl_fail_msg,
"trio.Cancelled", "trio.Cancelled",
"raise Cancelled._create()", # trio >=0.30 raises via a multi-line
# `raise Cancelled._create(source=.., reason=..,
# source_task=..)` (cancel-reason metadata), so
# match the open-paren form only, NOT the legacy
# bare `()`.
"raise Cancelled._create(",
# we should be handling a taskc inside # we should be handling a taskc inside
# the first `.port_mortem()` sin-shield! # the first `.port_mortem()` sin-shield!
@ -1204,7 +1209,12 @@ def test_shield_pause(
"('root'", # actor name "('root'", # actor name
_repl_fail_msg, _repl_fail_msg,
"trio.Cancelled", "trio.Cancelled",
"raise Cancelled._create()", # trio >=0.30 raises via a multi-line
# `raise Cancelled._create(source=.., reason=..,
# source_task=..)` (cancel-reason metadata), so
# match the open-paren form only, NOT the legacy
# bare `()`.
"raise Cancelled._create(",
# handling a taskc inside the first unshielded # handling a taskc inside the first unshielded
# `.port_mortem()`. # `.port_mortem()`.

View File

@ -8,9 +8,9 @@ after `Actor` construction, so any spawned sub-actor process
should: should:
- have `argv[0]` (== `/proc/<pid>/cmdline`) start with - have `argv[0]` (== `/proc/<pid>/cmdline`) start with
`tractor[<aid.reprol()>]` `<_def_prefix>[<aid.reprol()>]` (currently `_subactor[]`)
- have `/proc/<pid>/comm` start with `tractor[` (kernel - have `/proc/<pid>/comm` start with `<_def_prefix>[`
truncates to ~15 bytes) (kernel truncates to ~15 bytes)
- be detected as a tractor sub-actor by - be detected as a tractor sub-actor by
`_is_tractor_subactor(pid)` via the cmdline marker. `_is_tractor_subactor(pid)` via the cmdline marker.
@ -27,7 +27,10 @@ import trio
import tractor import tractor
from tractor.runtime._runtime import Actor from tractor.runtime._runtime import Actor
from tractor.devx._proctitle import set_actor_proctitle from tractor.devx._proctitle import (
set_actor_proctitle,
_def_prefix,
)
from tractor._testing._reap import ( from tractor._testing._reap import (
_is_tractor_subactor, _is_tractor_subactor,
_read_cmdline, _read_cmdline,
@ -41,8 +44,9 @@ _non_linux: bool = platform.system() != 'Linux'
def test_set_actor_proctitle_format(): def test_set_actor_proctitle_format():
''' '''
`set_actor_proctitle()` returns the canonical `set_actor_proctitle()` returns the canonical
`tractor[<aid.reprol()>]` form and actually mutates `<_def_prefix>[<aid.reprol()>]` form (currently
the running proc's title. `_subactor[]`) and actually mutates the running
proc's title.
''' '''
pytest.importorskip( pytest.importorskip(
@ -60,12 +64,14 @@ def test_set_actor_proctitle_format():
) )
title: str = set_actor_proctitle(actor) title: str = set_actor_proctitle(actor)
# canonical wrapping: `tractor[<aid.reprol()>]`. We # canonical wrapping: `<_def_prefix>[<aid.reprol()>]`.
# compare against the runtime-computed `reprol()` # We source BOTH the prefix (`_def_prefix`) and the
# rather than a hard-coded value so the test stays # runtime-computed `reprol()` rather than hard-coding,
# decoupled from `Aid.reprol()`'s internal format # so the test stays decoupled from the prefix shape
# (currently `<name>@<pid>`, but could evolve). # (flipped to `_subactor` in `3a45dbd5`) AND from
expected: str = f'tractor[{actor.aid.reprol()}]' # `Aid.reprol()`'s internal format (currently
# `<name>@<pid>`, but could evolve).
expected: str = f'{_def_prefix}[{actor.aid.reprol()}]'
assert title == expected assert title == expected
# sanity: the actor's name must be in the title # sanity: the actor's name must be in the title
# somewhere (so a future `reprol()` change that # somewhere (so a future `reprol()` change that
@ -140,15 +146,17 @@ def test_subactor_proctitle_visible_via_proc():
) )
pid, info = matched[0] pid, info = matched[0]
# canonical proctitle prefix in cmdline (full form) # canonical proctitle prefix in cmdline (full form);
assert info['cmdline'].startswith('tractor[proctitle_boi@'), ( # prefix sourced from `_def_prefix` so it tracks the
f'cmdline missing `tractor[proctitle_boi@…]` prefix: ' # `3a45dbd5` flip (`tractor[` -> `_subactor[`).
assert info['cmdline'].startswith(f'{_def_prefix}[proctitle_boi@'), (
f'cmdline missing `{_def_prefix}[proctitle_boi@…]` prefix: '
f'{info["cmdline"]!r}' f'{info["cmdline"]!r}'
) )
# comm is kernel-truncated to ~15 bytes — just check the # comm is kernel-truncated to ~15 bytes — just check the
# `tractor[` prefix made it. # `<_def_prefix>[` prefix made it.
assert info['comm'].startswith('tractor['), ( assert info['comm'].startswith(f'{_def_prefix}['), (
f'comm missing `tractor[` prefix: {info["comm"]!r}' f'comm missing `{_def_prefix}[` prefix: {info["comm"]!r}'
) )
# intrinsic-signal detector should match. # intrinsic-signal detector should match.
assert info['is_tractor'] is True assert info['is_tractor'] is True

View File

@ -605,17 +605,28 @@ async def test_nested_multierrors(
) )
) )
# 6s budget: in the non-hang case (and on the trio # Per-backend/-depth budgets: in the non-hang case the
# backend) the whole spawn + cancel-cascade should # whole spawn + cancel-cascade should complete in well
# complete in well under that. On the borderline hang # under these. On the borderline hang case the
# case the `fail_after_w_trace` fires `TooSlowError` # `fail_after_w_trace` fires `TooSlowError` AND captures a
# AND captures a ptree/wchan/py-spy snapshot to # ptree/wchan/py-spy snapshot to
# `$XDG_CACHE_HOME/tractor/hung-dumps/` for offline # `$XDG_CACHE_HOME/tractor/hung-dumps/` for offline
# inspection. See # inspection. See
# `ai/conc-anal/cancel_cascade_too_slow_under_main_thread_forkserver_issue.md`. # `ai/conc-anal/cancel_cascade_too_slow_under_main_thread_forkserver_issue.md`.
#
# NOTE: the `trio` depth=3 budget was bumped 6 -> 12s after
# the `trio` 0.29 -> 0.33 lock bump (commit c7741bba) slowed
# the depth-3 cancel-cascade from <6s to ~7-8s; the 6s
# deadline was firing and its `Cancelled(source='deadline')`
# (trio 0.33 cancel-reason metadata) collapsed a BEG branch,
# breaking the `RemoteActorError` assertion below. depth=1
# still finishes in ~3s so keeps the 6s budget. See
# `ai/conc-anal/trio_033_cancel_cascade_slowdown_depth3_issue.md`.
match (start_method, depth): match (start_method, depth):
case ('trio', _): case ('trio', 1):
timeout = 6 timeout = 6
case ('trio', 3):
timeout = 12
case ('main_thread_forkserver', 1): case ('main_thread_forkserver', 1):
timeout = 16 timeout = 16
case ('main_thread_forkserver', 3): case ('main_thread_forkserver', 3):

View File

@ -860,7 +860,14 @@ def test_echoserver_detailed_mechanics(
timeout: float = ( timeout: float = (
999 if tractor.debug_mode() 999 if tractor.debug_mode()
else 4 if is_forking_spawner else 4 if is_forking_spawner
else 1 # was 1; the `trio` 0.29 -> 0.33 bump slowed the
# cancel-cascade so a 1s budget raced the ~1s teardown
# deadline. On a deadline-fire the injected
# `Cancelled(source='deadline')` wraps the mid-stream
# KBI in a `BaseExceptionGroup`, breaking the bare
# `pytest.raises(KeyboardInterrupt)` below. See
# `ai/conc-anal/trio_033_cancel_cascade_slowdown_depth3_issue.md`.
else 4
) )
# body factored out so the `fail_after_w_trace`-wrapping # body factored out so the `fail_after_w_trace`-wrapping

View File

@ -162,6 +162,66 @@ def test_implicit_mod_name_applied_for_child(
assert submod.log.logger in sub_logs assert submod.log.logger in sub_logs
def test_io_custom_level_registered():
'''
The `IO`(21) level (registered via `add_log_level()` at
import, for `tractor.trionics._subproc`'s std-stream relay)
is fully wired and SHOWN BY DEFAULT at `info`-level consoles
since `21 >= INFO(20)`.
'''
import logging
assert log.CUSTOM_LEVELS.get('IO') == 21
assert logging.getLevelName(21) == 'IO'
assert log.STD_PALETTE.get('IO')
assert log.BOLD_PALETTE['bold'].get('IO')
iolog = log.get_logger('io_lvl_test')
assert callable(getattr(iolog, 'io', None))
# emit must not raise
iolog.io('hello from the IO level')
# 21 >= INFO(20) -> shown when console set to `info`
assert 21 >= logging.INFO
def test_add_log_level_pluggable():
'''
`add_log_level()` is the single pluggable entry-point: one
call wires `CUSTOM_LEVELS` + `addLevelName` + both palettes +
a same-named `StackLevelAdapter` emit method (so
`get_logger()`'s per-level audit passes).
'''
import logging
name: str = 'XLVL'
val: int = 19
try:
log.add_log_level(name, val, 'cyan')
assert log.CUSTOM_LEVELS[name] == val
assert logging.getLevelName(val) == name
assert log.STD_PALETTE[name] == 'cyan'
assert log.BOLD_PALETTE['bold'][name] == 'bold_cyan'
# the audit in `get_logger()` (asserts a method per
# `CUSTOM_LEVELS` entry) must still pass.
xlog = log.get_logger('xlvl_test')
emit = getattr(xlog, name.lower(), None)
assert callable(emit)
emit('hello from a plugged-in level')
finally:
# best-effort cleanup of our module-global mutations so
# later `get_logger()` audits don't see a half-removed
# level.
log.CUSTOM_LEVELS.pop(name, None)
log.STD_PALETTE.pop(name, None)
log.BOLD_PALETTE['bold'].pop(name, None)
if hasattr(log.StackLevelAdapter, name.lower()):
delattr(log.StackLevelAdapter, name.lower())
# TODO, moar tests against existing feats: # TODO, moar tests against existing feats:
# ------ - ------ # ------ - ------
# - [ ] color settings? # - [ ] color settings?

View File

@ -0,0 +1,230 @@
'''
Unit tests for `tractor.trionics.supervise_run_process` (in
`tractor.trionics._subproc`) and its per-line std-stream relay.
Hermetic `trio`-only coverage (no actor-runtime needed):
- per-line stdout relay -> `log.io`
- parent controlling-tty isolation (child fd1 is a pipe, fd0
`/dev/null` never the parent `/dev/pts/*`)
- mandatory concurrent pipe-drain (no deadlock on >64KiB
no-newline output)
- live stderr relay + `CalledProcessError` rebuild (rc!=0 note)
- legacy capture-stderr CPE note path
'''
from functools import partial
import subprocess
import pytest
import trio
from tractor.trionics import (
_subproc,
collapse_eg,
supervise_run_process,
)
def _capture_relay(monkeypatch, level: str = 'io') -> list[str]:
'''
Redirect `_subproc.log.<level>` (the relay's emit method —
`io` by default, see `supervise_run_process(relay_level=...)`)
into a list so tests can assert on the relayed lines.
'''
records: list[str] = []
monkeypatch.setattr(
_subproc.log,
level,
lambda msg, *a, **k: records.append(msg),
)
return records
def test_stdout_relayed_per_line(monkeypatch):
records = _capture_relay(monkeypatch)
cmd = [
'sh', '-c',
'for i in 1 2 3; do echo line=$i; done',
]
async def main():
async with trio.open_nursery() as tn:
await tn.start(
partial(
supervise_run_process,
cmd,
label='t-out',
relay_stdout=True,
)
)
trio.run(main)
out_lines = [r for r in records if '[t-out:out]' in r]
assert any('line=1' in r for r in out_lines)
assert any('line=2' in r for r in out_lines)
assert any('line=3' in r for r in out_lines)
def test_parent_tty_isolated(monkeypatch):
records = _capture_relay(monkeypatch)
cmd = [
'sh', '-c',
'readlink /proc/self/fd/0; readlink /proc/self/fd/1',
]
async def main():
async with trio.open_nursery() as tn:
await tn.start(
partial(
supervise_run_process,
cmd,
label='t-tty',
relay_stdout=True,
)
)
trio.run(main)
relayed = '\n'.join(records)
# fd1 (stdout) must be OUR pipe, never a controlling tty.
assert 'pipe:' in relayed
assert '/dev/pts/' not in relayed
# fd0 (stdin) is pinned to DEVNULL.
assert '/dev/null' in relayed
def test_no_deadlock_on_big_unnewlined_output(monkeypatch):
'''
>64KiB of output with NO newline: only completes because the
relay reader concurrently drains the pipe (else the child
blocks on `write()` when the OS pipe buffer fills).
'''
records = _capture_relay(monkeypatch)
cmd = [
'sh', '-c',
'head -c 200000 /dev/zero | tr "\\0" x',
]
async def main():
# generous vs the ~ms real runtime, but bounded so a
# genuine pipe-fill deadlock fails fast.
with trio.fail_after(2):
async with trio.open_nursery() as tn:
await tn.start(
partial(
supervise_run_process,
cmd,
label='t-big',
relay_stdout=True,
)
)
trio.run(main)
big = ''.join(
r.split('] ', 1)[-1]
for r in records
if '[t-big:out]' in r
)
assert len(big) == 200_000
def test_stderr_relay_and_cpe_rebuild(monkeypatch):
'''
`relay_stderr=True` PIPEs stderr ourselves (mutually
exclusive with trio's `capture_stderr`), so on rc!=0 the
wrapper rebuilds a `CalledProcessError` from the live
accumulator and `.add_note()`s its `.stderr` AND the
stderr is relayed per-line live.
'''
records = _capture_relay(monkeypatch)
cmd = [
'sh', '-c',
'echo boom 1>&2; exit 3',
]
async def main():
# `collapse_eg()` unwraps the parent-nursery's single-exc
# eg so the bare CPE bubbles straight out (mirrors real
# caller usage).
async with (
collapse_eg(),
trio.open_nursery() as tn,
):
await tn.start(
partial(
supervise_run_process,
cmd,
label='t-err',
relay_stderr=True,
check=True,
)
)
with pytest.raises(subprocess.CalledProcessError) as ei:
trio.run(main)
cpe = ei.value
assert cpe.returncode == 3
# rebuilt `.stderr` (trio did NOT capture since we PIPE'd it).
assert b'boom' in (cpe.stderr or b'')
# note attached for legible teardown reporting.
assert any(
'boom' in n
for n in getattr(cpe, '__notes__', [])
)
# AND it was relayed live per-line.
assert any(
'[t-err:err]' in r and 'boom' in r
for r in records
)
def test_nonrelay_cpe_note(monkeypatch):
'''
No live relay: stderr is silently drained + captured (NOT
emitted), and on rc!=0 the wrapper rebuilds the
`CalledProcessError` from that accumulator with a `.stderr`
note same deterministic post-drain path as the relay case.
'''
cmd = [
'sh', '-c',
'echo nope 1>&2; exit 7',
]
async def main():
async with (
collapse_eg(),
trio.open_nursery() as tn,
):
await tn.start(
partial(
supervise_run_process,
cmd,
label='t-legacy',
check=True,
# relay_* default False -> silent
# drain+capture for the CPE note.
)
)
with pytest.raises(subprocess.CalledProcessError) as ei:
trio.run(main)
cpe = ei.value
assert cpe.returncode == 7
assert b'nope' in (cpe.stderr or b'')
assert any(
'nope' in n
for n in getattr(cpe, '__notes__', [])
)

View File

@ -155,7 +155,6 @@ async def maybe_block_bp(
os.environ.pop('PYTHONBREAKPOINT', None) os.environ.pop('PYTHONBREAKPOINT', None)
@acm @acm
async def open_root_actor( async def open_root_actor(
*, *,
@ -186,6 +185,7 @@ async def open_root_actor(
# enables the multi-process debugger support # enables the multi-process debugger support
debug_mode: bool = False, debug_mode: bool = False,
maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support maybe_enable_greenback: bool = False, # `.pause_from_sync()/breakpoint()` support
# ^XXX NOTE^ the perf implications of use, # ^XXX NOTE^ the perf implications of use,
# https://greenback.readthedocs.io/en/latest/principle.html#performance # https://greenback.readthedocs.io/en/latest/principle.html#performance
enable_stack_on_sig: bool = False, enable_stack_on_sig: bool = False,

View File

@ -90,7 +90,6 @@ keys are caller-defined).
''' '''
from __future__ import annotations from __future__ import annotations
import os import os
import pathlib import pathlib
import re import re
@ -99,6 +98,9 @@ import stat
import sys import sys
import time import time
from tractor.devx import _proctitle
# `/dev/shm` is the POSIX-shm filesystem on Linux + FreeBSD. # `/dev/shm` is the POSIX-shm filesystem on Linux + FreeBSD.
# macOS uses `shm_open` syscalls without a fs-visible path, # macOS uses `shm_open` syscalls without a fs-visible path,
# so the shm helpers refuse to run there. # so the shm helpers refuse to run there.
@ -230,9 +232,9 @@ def _read_comm(pid: int) -> str:
# while `cmdline` for zombies often reads as empty. # while `cmdline` for zombies often reads as empty.
_TRACTOR_PROC_CMDLINE_MARKERS: tuple[str, ...] = ( _TRACTOR_PROC_CMDLINE_MARKERS: tuple[str, ...] = (
'tractor._child', 'tractor._child',
'tractor[', _proctitle._def_prefix,
) )
_TRACTOR_PROC_COMM_MARKER: str = 'tractor[' _TRACTOR_PROC_COMM_MARKER: str = _proctitle._def_prefix
def _is_tractor_subactor(pid: int) -> bool: def _is_tractor_subactor(pid: int) -> bool:

View File

@ -24,7 +24,10 @@ which" at a glance without needing to read full
`/proc/<pid>/cmdline`. `/proc/<pid>/cmdline`.
Format: Format:
``tractor[<aid.reprol()>]`` e.g. ``tractor[doggy@1027301b]`` ``<_def_prefix>[<aid.reprol()>]`` e.g. ``_subactor[doggy@1027301b]``
(prefix from the `_def_prefix` const, flipped `tractor` ->
`_subactor` so sub-actor procs are visually distinct from the
root in `ps`/`htop` and the reap-recognition markers.)
Uses the canonical `Aid.reprol()` form Uses the canonical `Aid.reprol()` form
(``<name>@<uuid_short>``) so the proc-title matches the (``<name>@<uuid_short>``) so the proc-title matches the
@ -52,7 +55,13 @@ except ImportError:
_stp = None _stp = None
def set_actor_proctitle(actor: 'Actor') -> str | None: _def_prefix: str = '_subactor'
def set_actor_proctitle(
actor: 'Actor',
prefix: str = _def_prefix,
) -> str | None:
''' '''
Set the calling process's proc-title to identify it as a Set the calling process's proc-title to identify it as a
tractor sub-actor. tractor sub-actor.
@ -69,6 +78,6 @@ def set_actor_proctitle(actor: 'Actor') -> str | None:
if _stp is None: if _stp is None:
return None return None
title: str = f'tractor[{actor.aid.reprol()}]' title: str = f'{prefix}[{actor.aid.reprol()}]'
_stp.setproctitle(title) _stp.setproctitle(title)
return title return title

View File

@ -398,7 +398,7 @@ async def handle_stream_from_peer(
uid, uid,
None, None,
) )
if event: if event is not None:
con_status_steps += ( con_status_steps += (
' -> Waking subactor spawn waiters: ' ' -> Waking subactor spawn waiters: '
f'{event.statistics().tasks_waiting}\n' f'{event.statistics().tasks_waiting}\n'

View File

@ -262,6 +262,63 @@ class StackLevelAdapter(LoggerAdapter):
) )
def add_log_level(
name: str,
value: int,
color: str = 'white',
) -> None:
'''
Register a new custom log level with `tractor`'s logging
machinery in ONE call the single pluggable entry-point that
keeps the (otherwise hand-synced) pieces consistent:
- `CUSTOM_LEVELS[name]` (drives the `stacklevel` bump in
`StackLevelAdapter.log()` + the `get_logger()` audit).
- `logging.addLevelName()` registration.
- `STD_PALETTE`/`BOLD_PALETTE` color entries (consumed when
`get_console_log()` builds its `ColoredFormatter`).
- a same-named (lowercase) emit method bound on
`StackLevelAdapter` so `log.<name>('msg')` works (and so
`get_logger()`'s per-level method audit passes).
Idempotent: re-registering an existing name is a no-op-ish
refresh (won't clobber an already-bound method).
'''
name_up: str = name.upper()
name_lo: str = name.lower()
CUSTOM_LEVELS[name_up] = value
logging.addLevelName(value, name_up)
STD_PALETTE[name_up] = color
BOLD_PALETTE['bold'][name_up] = f'bold_{color}'
if not hasattr(StackLevelAdapter, name_lo):
# bind via default-arg so `value` is captured (not
# late-bound); delegates to `.log()` exactly like the
# hand-written level methods above.
def _emit(
self,
msg: str,
*,
_level: int = value,
) -> None:
return self.log(_level, msg)
_emit.__name__ = name_lo
_emit.__qualname__ = f'StackLevelAdapter.{name_lo}'
setattr(StackLevelAdapter, name_lo, _emit)
# `IO`: child-subproc std-stream relay (see
# `tractor.trionics._subproc`). Value 21 sits just ABOVE
# `INFO`(20) so it's SHOWN BY DEFAULT at the usual `info`/`devx`
# console levels (a `runtime`(15) relay would be silently
# filtered) yet still distinctly labelled/colored + separately
# filterable.
add_log_level('IO', 21, 'purple')
# TODO IDEAs: # TODO IDEAs:
# -[ ] move to `.devx.pformat`? # -[ ] move to `.devx.pformat`?
# -[ ] do per task-name and actor-name color coding # -[ ] do per task-name and actor-name color coding

View File

@ -140,7 +140,7 @@ _RUNTIME_VARS_DEFAULTS: dict[str, Any] = {
# `debug_mode: bool` settings # `debug_mode: bool` settings
'_debug_mode': False, # bool '_debug_mode': False, # bool
'repl_fixture': False, # |AbstractContextManager[bool] 'repl_fixture': False, # |AbstractContextManager[bool]
'use_greenback': False, # `.pause_from_sync()`/`breakpoint()` 'use_greenback': False, # `.pause_from_sync()`/`breakpoint()`
'use_stackscope': False, # trio-task-stack dumps on SIGUSR1 'use_stackscope': False, # trio-task-stack dumps on SIGUSR1

View File

@ -38,3 +38,6 @@ from ._taskc import (
maybe_raise_from_masking_exc as maybe_raise_from_masking_exc, maybe_raise_from_masking_exc as maybe_raise_from_masking_exc,
start_or_cancel as start_or_cancel, start_or_cancel as start_or_cancel,
) )
from ._subproc import (
supervise_run_process as supervise_run_process,
)

View File

@ -0,0 +1,296 @@
# tractor: distributed structured concurrency.
# Copyright 2018-eternity Tyler Goodlet.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
SC-friendly `trio.run_process()` supervision: a `tn.start()`
style wrapper which surfaces rc!=0 errors deterministically and
(optionally) live-relays the child's std-streams to the `tractor`
log.
'''
from __future__ import annotations
from functools import partial
import subprocess
import textwrap
from typing import (
Callable,
)
import trio
from ..log import get_logger
log = get_logger()
# sentinel so `supervise_run_process(stdout=...)` can tell
# "caller passed nothing" (-> tty-safe `DEVNULL` default) from
# an explicit `stdout=None` (inherit) override.
_UNSET = object()
def _add_stderr_note(
cpe: subprocess.CalledProcessError,
stderr_bytes: bytes,
) -> None:
'''
Attach an indented `|_.stderr:` note to a
`CalledProcessError` for legible rc!=0 reporting at
teardown.
'''
stderr_str: str = stderr_bytes.decode(errors='replace')
cpe.add_note(
f'|_.stderr:\n'
f'{textwrap.indent(stderr_str, prefix=" "*3)}'
)
async def _relay_stream_lines(
stream: trio.abc.ReceiveStream,
*,
emit: Callable[[str], None]|None = None,
tag: str = '',
accum: bytearray|None = None,
) -> None:
'''
Concurrently drain a child subproc's `stdout`/`stderr`
PIPE; relay each COMPLETE line to `emit` (a bound
`log.<level>` method) prefixed with `tag` (e.g.
`f'{label}:out'`) and/or append raw bytes to `accum`.
This reader is MANDATORY whenever a bare
`stdout=`/`stderr=PIPE` is used WITHOUT `trio`'s
`capture_*` (which would spawn trio's own internal drain
task): nothing else drains the OS pipe, so once its kernel
buffer (~64KiB) fills the child blocks on `write()` ->
deadlock.
Modes (combine freely):
- `emit`-only: live per-line relay (e.g. `relay_stdout`).
- `accum`-only: silent drain + capture (e.g. stderr kept
for a `CalledProcessError` note WITHOUT relaying it).
- both: relay AND capture (e.g. `relay_stderr` with `check=True`).
'''
# NOTE, mirrors `trio._subprocess`'s internal
# `async with stream: async for ...` drain idiom — except
# here we EMIT per-line (and/or accumulate) instead of
# only accumulating.
residual: bytes = b''
async with stream: # aclose at EOF/cancel
async for chunk in stream: # ends at child-exit EOF
if accum is not None:
accum += chunk
if emit is None:
continue # drain(+accum)-only
buf: bytes = residual + chunk
*lines, residual = buf.split(b'\n')
for raw in lines:
line: str = raw.decode(
errors='replace',
).rstrip('\r')
emit(f'[{tag}] {line}')
# flush any trailing partial (un-newline-term'd) line @ EOF
if (
emit is not None
and
residual
):
line: str = residual.decode(
errors='replace',
).rstrip('\r')
emit(f'[{tag}] {line}')
async def supervise_run_process(
cmd: list[str]|str,
*,
check: bool = True,
label: str|None = None,
# per-line `log.*` relay of the child's std-streams
# (tty-safe, capture-safe, STREAMED — not
# buffered-until-exit, so it suits long-lived daemons).
relay_stdout: bool = False,
relay_stderr: bool = False,
# default `io` (our custom level, value 21): the relay
# exists to make windowless-spawn output VISIBLE, and
# `IO`(21) sorts just ABOVE `INFO`(20) so it shows at the
# usual `info`/`devx` console levels (a `runtime`(15) relay
# would be silently filtered) while staying distinctly
# labelled + separately filterable.
relay_level: str = 'io',
# non-relay `stdout` override; defaults (via `_UNSET`) to
# `DEVNULL` so we NEVER inherit (+ thus can't clobber) the
# parent controlling-tty.
stdout: int = _UNSET,
task_status: trio.TaskStatus[
trio.Process
] = trio.TASK_STATUS_IGNORED,
# any other `trio.run_process()` kwarg (env, shell, cwd,
# start_new_session, executable, ...) forwarded verbatim;
# our MANAGED keys (stdin/stdout/stderr/check) are set
# below and WIN on conflict.
**run_process_kwargs,
) -> None:
'''
A `trio.Nursery.start()`-style `trio.run_process()`
wrapper which,
- surfaces a rc!=0 `subprocess.CalledProcessError`
DETERMINISTICALLY: we pass `check=False` to `trio` and
do our OWN post-drain rc-check, (re)building + raising a
BARE CPE (with a `.stderr` note) from this coro's body
AFTER the child exits so there's no nursery-eg-wrapped
CPE to catch/`collapse_eg`, and the relay reader is never
race-cancelled mid-drain.
- ALWAYS isolates the parent controlling-tty
(`stdin=DEVNULL`, and `stdout=DEVNULL` unless
relayed/overridden) so a spawned program can't emit
terminal control-seqs onto the launching tty (which
would clobber its scrollback).
- optionally live-relays `stdout`/`stderr` per-line to
`log.<relay_level>` via concurrent reader tasks (see
`_relay_stream_lines`).
Delivers the live `trio.Process` via
`task_status.started()` then SUPERVISES it (the
`run_process` bg task + any relay readers) to completion
in this coro i.e. the parent `tn.start()` returns
immediately/non-blocking.
NOTE: any crash-handling / `repl_fixture` layer is
intentionally NOT baked in here compose it ON TOP at the
call-site, e.g.
async with maybe_open_crash_handler():
await tn.start(
partial(supervise_run_process, cmd, ...),
)
'''
emit: Callable[[str], None] = getattr(log, relay_level)
tag: str = (
label
or
(cmd if isinstance(cmd, str) else ' '.join(cmd))
)
# forward any extra `trio.run_process` kwargs verbatim;
# MANAGED keys below override on conflict.
rp_kwargs: dict = dict(run_process_kwargs)
# XXX ALWAYS isolate the controlling-tty's stdin.
rp_kwargs['stdin'] = subprocess.DEVNULL
# stdout: relay -> our own PIPE (drained by the reader
# below); else an explicit override; else tty-safe
# `DEVNULL`.
if relay_stdout:
rp_kwargs['stdout'] = subprocess.PIPE
elif stdout is not _UNSET:
rp_kwargs['stdout'] = stdout
else:
rp_kwargs['stdout'] = subprocess.DEVNULL
# stderr: PIPE (+ our reader) when we either RELAY it OR
# need it captured for a rc!=0 CPE note; else tty-safe
# `DEVNULL`. We accumulate ONLY when `check` (the note is
# the only consumer).
#
# XXX we ALWAYS pass `check=False` to `trio` and do our
# OWN deterministic post-drain rc-check (below) so `trio`
# never raises a nursery-eg-wrapped CPE — no `collapse_eg`
# workaround, no reader race-cancel.
want_stderr_pipe: bool = relay_stderr or check
stderr_accum: bytearray|None = bytearray() if check else None
rp_kwargs['check'] = False
rp_kwargs['stderr'] = (
subprocess.PIPE if want_stderr_pipe
else subprocess.DEVNULL
)
async with trio.open_nursery() as own_tn:
trio_proc: trio.Process = await own_tn.start(
partial(
trio.run_process,
cmd,
**rp_kwargs,
)
)
# spin up the concurrent pipe-drain relay reader(s) —
# see `_relay_stream_lines` for why these are mandatory
# (not cosmetic) when piping without `capture_*`.
if relay_stdout:
own_tn.start_soon(
partial(
_relay_stream_lines,
trio_proc.stdout,
emit=emit,
tag=f'{tag}:out',
)
)
if want_stderr_pipe:
own_tn.start_soon(
partial(
_relay_stream_lines,
trio_proc.stderr,
# relay live only if asked; else silent
# drain+capture for the CPE note.
emit=emit if relay_stderr else None,
tag=f'{tag}:err',
accum=stderr_accum,
)
)
# hand the live proc up to the parent WITHOUT blocking
# on the bg supervise/relay tasks (keeps non-blocking
# `tn.start()` semantics).
task_status.started(trio_proc)
# ===== deterministic post-drain rc-check (BOTH paths) =====
# `own_tn` only unwinds once `run_process` AND the relay
# reader(s) have hit EOF + FULLY drained — so `stderr_accum`
# is COMPLETE here (no race vs an early CPE-cancel). Rebuild
# + raise a BARE `CalledProcessError` (the parent `tn` will
# eg-wrap it like any task-raise; callers `collapse_eg()` if
# they want it bare).
if (
check
and
trio_proc.returncode
):
stderr_bytes: bytes = (
bytes(stderr_accum)
if stderr_accum is not None
else b''
)
cpe = subprocess.CalledProcessError(
returncode=trio_proc.returncode,
cmd=trio_proc.args,
stderr=stderr_bytes,
)
_add_stderr_note(cpe, stderr_bytes)
raise cpe

22
uv.lock
View File

@ -797,7 +797,7 @@ dev = [
{ name = "pytest-timeout", specifier = ">=2.3" }, { name = "pytest-timeout", specifier = ">=2.3" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" }, { name = "stackscope", specifier = ">=0.2.2,<0.3" },
{ name = "typing-extensions", specifier = ">=4.14.1" }, { name = "typing-extensions", specifier = ">=4.14.1" },
{ name = "xonsh", specifier = ">=0.23.0" }, { name = "xonsh", specifier = ">=0.23.8" },
] ]
devx = [ devx = [
{ name = "stackscope", specifier = ">=0.2.2,<0.3" }, { name = "stackscope", specifier = ">=0.2.2,<0.3" },
@ -809,7 +809,7 @@ repl = [
{ name = "prompt-toolkit", specifier = ">=3.0.50" }, { name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "psutil", specifier = ">=7.0.0" }, { name = "psutil", specifier = ">=7.0.0" },
{ name = "pyperclip", specifier = ">=1.9.0" }, { name = "pyperclip", specifier = ">=1.9.0" },
{ name = "xonsh", specifier = ">=0.23.0" }, { name = "xonsh", specifier = ">=0.23.8" },
] ]
subints = [{ name = "msgspec", marker = "python_full_version >= '3.14'", specifier = ">=0.21.0" }] subints = [{ name = "msgspec", marker = "python_full_version >= '3.14'", specifier = ">=0.21.0" }]
sync-pause = [{ name = "greenback", marker = "python_full_version == '3.13.*'", specifier = ">=1.2.1,<2" }] sync-pause = [{ name = "greenback", marker = "python_full_version == '3.13.*'", specifier = ">=1.2.1,<2" }]
@ -834,7 +834,7 @@ wheels = [
[[package]] [[package]]
name = "trio" name = "trio"
version = "0.29.0" version = "0.33.0"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "attrs" }, { name = "attrs" },
@ -844,9 +844,9 @@ dependencies = [
{ name = "sniffio" }, { name = "sniffio" },
{ name = "sortedcontainers" }, { name = "sortedcontainers" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/a1/47/f62e62a1a6f37909aed0bf8f5d5411e06fa03846cfcb64540cd1180ccc9f/trio-0.29.0.tar.gz", hash = "sha256:ea0d3967159fc130acb6939a0be0e558e364fee26b5deeecc893a6b08c361bdf", size = 588952, upload-time = "2025-02-14T07:13:50.724Z" } sdist = { url = "https://files.pythonhosted.org/packages/52/b6/c744031c6f89b18b3f5f4f7338603ab381d740a7f45938c4607b2302481f/trio-0.33.0.tar.gz", hash = "sha256:a29b92b73f09d4b48ed249acd91073281a7f1063f09caba5dc70465b5c7aa970", size = 605109, upload-time = "2026-02-14T18:40:55.386Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/c9/55/c4d9bea8b3d7937901958f65124123512419ab0eb73695e5f382521abbfb/trio-0.29.0-py3-none-any.whl", hash = "sha256:d8c463f1a9cc776ff63e331aba44c125f423a5a13c684307e828d930e625ba66", size = 492920, upload-time = "2025-02-14T07:13:48.696Z" }, { url = "https://files.pythonhosted.org/packages/1c/93/dab25dc87ac48da0fe0f6419e07d0bfd98799bed4e05e7b9e0f85a1a4b4b/trio-0.33.0-py3-none-any.whl", hash = "sha256:3bd5d87f781d9b0192d592aef28691f8951d6c2e41b7e1da4c25cde6c180ae9b", size = 510294, upload-time = "2026-02-14T18:40:53.313Z" },
] ]
[[package]] [[package]]
@ -923,14 +923,14 @@ wheels = [
[[package]] [[package]]
name = "xonsh" name = "xonsh"
version = "0.23.2" version = "0.23.8"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/60/e5/2dfa99e21a8118bed0e73ed50e91962fdad01b900e23497064e8810b03b5/xonsh-0.23.2.tar.gz", hash = "sha256:633608c8292938af0f242f05326cc2912f25fa72bd808824ab0534a6df304402", size = 1030659, upload-time = "2026-04-26T19:28:40.744Z" } sdist = { url = "https://files.pythonhosted.org/packages/8b/77/0c4c39ad866d4ea1ef553f325d16e804d1bf1eeecc591f0e81b057aa37db/xonsh-0.23.8.tar.gz", hash = "sha256:541bb976c93a81571792644403bae8737145023da5f48d4c493909ab5c04ba0f", size = 1172271, upload-time = "2026-05-30T04:47:22.53Z" }
wheels = [ wheels = [
{ url = "https://files.pythonhosted.org/packages/53/0d/bf7869dd57b40888ea1da8fc88f70d8e94ec2f8ee236ea4c22a757593235/xonsh-0.23.2-py311-none-any.whl", hash = "sha256:a38dd84e23e97fc42e0156c80024b3449474dfcbb6c3a344bd38c45a2b2de44d", size = 756215, upload-time = "2026-04-26T19:28:38.875Z" }, { url = "https://files.pythonhosted.org/packages/ca/4a/2aab8300ad218dfc7678c34d5f703f09df5681fecc6e66d48c951ef58049/xonsh-0.23.8-py311-none-any.whl", hash = "sha256:4bab3e405643df2cc78ec2cac13241471841796fe710386d2179666aae8a5f9c", size = 799846, upload-time = "2026-05-30T04:47:21.211Z" },
{ url = "https://files.pythonhosted.org/packages/f7/9f/b1bb0c15bf2120469c94b062f4b854588370ab94c7a1679c84ff646bf50b/xonsh-0.23.2-py312-none-any.whl", hash = "sha256:190a348fa19774de8e697af5f44c9adb95aca687fa475b31dda23d1a3462a3c6", size = 756224, upload-time = "2026-04-26T19:28:39.17Z" }, { url = "https://files.pythonhosted.org/packages/87/ec/aa66ef6046f90769dd8fcb3ddca9d00282d12e3d73645abbf12f190f17cf/xonsh-0.23.8-py312-none-any.whl", hash = "sha256:c7d0f0fba0cafe0bd75bf202820aeffc74b52943fa27d98d3b4346793f6ba493", size = 799868, upload-time = "2026-05-30T04:47:19.158Z" },
{ url = "https://files.pythonhosted.org/packages/83/23/8e037579ac86d8f266b4116338f902eab04175b88574a6438ee739dd3084/xonsh-0.23.2-py313-none-any.whl", hash = "sha256:4ebbf42a94f505d25694f154556ca0caa149a3f59870ec850bd13ad8df519dce", size = 756728, upload-time = "2026-04-26T19:28:39.493Z" }, { url = "https://files.pythonhosted.org/packages/12/fe/2d757d82b57332f1c6cd3f8c168fbcf060a275895a763542255ae1c53d75/xonsh-0.23.8-py313-none-any.whl", hash = "sha256:1b7335522a6ecd63f0d84151977a7a9050874d3ecec00cf79919d0770bebb1b4", size = 800388, upload-time = "2026-05-30T04:47:18.47Z" },
{ url = "https://files.pythonhosted.org/packages/05/ec/090300d9c5f14f58b5a684302f43535457f733a62f11673aa3ac38460717/xonsh-0.23.2-py314-none-any.whl", hash = "sha256:5efcd0f6db8f9f1dace256de2c04c3c044f2d86b48434187c43a69d602283a9e", size = 756767, upload-time = "2026-04-26T19:28:37.218Z" }, { url = "https://files.pythonhosted.org/packages/80/96/567bb3131655ff73c821e8a030c53707ced6c8840330a859f67bbaefbd16/xonsh-0.23.8-py314-none-any.whl", hash = "sha256:2a411fc47958c6107b3e13372655d18c52be98368e2159a1910cfde77124b3b1", size = 800352, upload-time = "2026-05-30T04:47:14.812Z" },
] ]
[[package]] [[package]]

View File

@ -488,6 +488,7 @@ def _tractor_reap(args):
reap, reap,
reap_shm, reap_shm,
reap_uds, reap_uds,
_TRACTOR_PROC_CMDLINE_MARKERS,
) )
rc: int = 0 rc: int = 0
@ -500,9 +501,8 @@ def _tractor_reap(args):
else: else:
pids = find_orphans() pids = find_orphans()
mode = ( mode = (
'orphans (PPid==1, intrinsic ' f'orphans (PPid==1, intrinsic '
'cmdline/comm match — `tractor[…]` or ' f'cmdline/comm match — {_TRACTOR_PROC_CMDLINE_MARKERS}'
'`tractor._child`)'
) )
if not pids: if not pids: