Merge pull request #435 from goodboy/moc_coverage_test_by_claude
`.trionics.maybe_open_context()` race-edge-case coveragesubint_spawner_backend
commit
8b106b9144
|
|
@ -1,10 +1,18 @@
|
||||||
name: CI
|
name: CI
|
||||||
|
|
||||||
|
# NOTE distilled from,
|
||||||
|
# https://github.com/orgs/community/discussions/26276
|
||||||
on:
|
on:
|
||||||
# any time someone pushes a new branch to origin
|
# any time a new update to 'main'
|
||||||
push:
|
push:
|
||||||
|
branches:
|
||||||
|
- main
|
||||||
|
|
||||||
# Allows you to run this workflow manually from the Actions tab
|
# for on all (forked) PRs to repo
|
||||||
|
# NOTE, use a draft PR if you just want CI triggered..
|
||||||
|
pull_request:
|
||||||
|
|
||||||
|
# to run workflow manually from the "Actions" tab
|
||||||
workflow_dispatch:
|
workflow_dispatch:
|
||||||
|
|
||||||
jobs:
|
jobs:
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,54 @@
|
||||||
|
---
|
||||||
|
model: claude-opus-4-6
|
||||||
|
service: claude
|
||||||
|
session: (ad-hoc, not tracked via conf.toml)
|
||||||
|
timestamp: 2026-04-06T17:28:48Z
|
||||||
|
git_ref: 02b2ef1
|
||||||
|
scope: tests
|
||||||
|
substantive: true
|
||||||
|
raw_file: 20260406T172848Z_02b2ef1_prompt_io.raw.md
|
||||||
|
---
|
||||||
|
|
||||||
|
## Prompt
|
||||||
|
|
||||||
|
User asked to extend `tests/test_resource_cache.py` with a test
|
||||||
|
that reproduces the edge case fixed in commit `02b2ef18` (per-key
|
||||||
|
locking+user tracking in `maybe_open_context()`). The bug was
|
||||||
|
originally triggered in piker's `brokerd.kraken` backend where the
|
||||||
|
same `acm_func` was called with different kwargs, and the old
|
||||||
|
global `_Cache.users` counter caused:
|
||||||
|
|
||||||
|
- teardown skipped for one `ctx_key` bc another key's users kept
|
||||||
|
the global count > 0
|
||||||
|
- re-entry hitting `assert not resources.get(ctx_key)` during the
|
||||||
|
teardown window
|
||||||
|
|
||||||
|
User requested a test that would fail under the old code and pass
|
||||||
|
with the fix.
|
||||||
|
|
||||||
|
## Response summary
|
||||||
|
|
||||||
|
Designed and implemented `test_per_ctx_key_resource_lifecycle`
|
||||||
|
which verifies per-`ctx_key` resource isolation by:
|
||||||
|
|
||||||
|
1. Holding resource `'a'` open in a bg task
|
||||||
|
2. Opening+closing resource `'b'` (same `acm_func`, different
|
||||||
|
kwargs) while `'a'` is still alive
|
||||||
|
3. Re-opening `'b'` and asserting cache MISS — proving `'b'` was
|
||||||
|
torn down independently despite `'a'` keeping its own user
|
||||||
|
count > 0
|
||||||
|
|
||||||
|
With the old global counter, phase 3 would produce a stale cache
|
||||||
|
HIT (leaked resource) or crash on the assert.
|
||||||
|
|
||||||
|
Also added a trivial `acm_with_resource(resource_id)` ACM helper
|
||||||
|
at module level.
|
||||||
|
|
||||||
|
## Files changed
|
||||||
|
|
||||||
|
- `tests/test_resource_cache.py` — add `acm_with_resource` ACM +
|
||||||
|
`test_per_ctx_key_resource_lifecycle` test fn
|
||||||
|
|
||||||
|
## Human edits
|
||||||
|
|
||||||
|
None — committed as generated (pending user review).
|
||||||
|
|
@ -0,0 +1,57 @@
|
||||||
|
---
|
||||||
|
model: claude-opus-4-6
|
||||||
|
service: claude
|
||||||
|
session: (ad-hoc, not tracked via conf.toml)
|
||||||
|
timestamp: 2026-04-06T19:31:25Z
|
||||||
|
git_ref: 85f9c5d
|
||||||
|
scope: tests
|
||||||
|
substantive: true
|
||||||
|
raw_file: 20260406T193125Z_85f9c5d_prompt_io.raw.md
|
||||||
|
---
|
||||||
|
|
||||||
|
## Prompt
|
||||||
|
|
||||||
|
User asked to reproduce the exact `assert not resources.get(ctx_key)`
|
||||||
|
crash originally seen in piker's `brokerd.kraken` backend via
|
||||||
|
`open_cached_client('kraken')`. Key constraints from user:
|
||||||
|
|
||||||
|
- In piker, kwargs were the **same** (empty) so all callers
|
||||||
|
share one `ctx_key = (fid, ())`
|
||||||
|
- The root issue is `_Cache.locks` being indexed by `fid`
|
||||||
|
rather than `ctx_key`, plus the race window between
|
||||||
|
`values.pop()` and `resources.pop()` in `_Cache.run_ctx`
|
||||||
|
(the acm `__aexit__` has checkpoints in between)
|
||||||
|
- Mark the test `@pytest.mark.xfail` so it can be committed
|
||||||
|
against the unpatched code and later adjusted once the fix
|
||||||
|
from `9e49eddd` is applied
|
||||||
|
|
||||||
|
User pointed to piker's `open_cached_client('kraken')` usage
|
||||||
|
as the real-world pattern to model.
|
||||||
|
|
||||||
|
## Response summary
|
||||||
|
|
||||||
|
Added `test_moc_reentry_during_teardown` which deterministically
|
||||||
|
reproduces the teardown race:
|
||||||
|
|
||||||
|
1. A `cached_client()` acm (no params, like
|
||||||
|
`kraken.api.get_client()`) signals via `in_aexit` event
|
||||||
|
when its `__aexit__` starts, then sleeps to hold the
|
||||||
|
window open
|
||||||
|
2. `use_and_exit` task enters+exits the cached resource
|
||||||
|
3. `reenter_during_teardown` waits on `in_aexit` then
|
||||||
|
re-enters — landing exactly in the window where
|
||||||
|
`values` is popped but `resources` still exists
|
||||||
|
4. The old `assert not resources.get(ctx_key)` fires
|
||||||
|
5. `@pytest.mark.xfail(raises=AssertionError)` catches it
|
||||||
|
|
||||||
|
Test confirmed: `XFAIL` with the exact traceback at
|
||||||
|
`_mngrs.py:327`.
|
||||||
|
|
||||||
|
## Files changed
|
||||||
|
|
||||||
|
- `tests/test_resource_cache.py` — add
|
||||||
|
`test_moc_reentry_during_teardown` xfail test fn
|
||||||
|
|
||||||
|
## Human edits
|
||||||
|
|
||||||
|
None — committed as generated (pending user review).
|
||||||
|
|
@ -0,0 +1,27 @@
|
||||||
|
# AI Prompt I/O Log — claude
|
||||||
|
|
||||||
|
This directory tracks prompt inputs and model
|
||||||
|
outputs for AI-assisted development using
|
||||||
|
`claude` (Claude Code).
|
||||||
|
|
||||||
|
## Policy
|
||||||
|
|
||||||
|
Prompt logging follows the
|
||||||
|
[NLNet generative AI policy][nlnet-ai].
|
||||||
|
All substantive AI contributions are logged
|
||||||
|
with:
|
||||||
|
- Model name and version
|
||||||
|
- Timestamps
|
||||||
|
- The prompts that produced the output
|
||||||
|
- Unedited model output (`.raw.md` files)
|
||||||
|
|
||||||
|
[nlnet-ai]: https://nlnet.nl/foundation/policies/generativeAI/
|
||||||
|
|
||||||
|
## Usage
|
||||||
|
|
||||||
|
Entries are created by the `/prompt-io` skill
|
||||||
|
or automatically via `/commit-msg` integration.
|
||||||
|
|
||||||
|
Human contributors remain accountable for all
|
||||||
|
code decisions. AI-generated content is never
|
||||||
|
presented as human-authored work.
|
||||||
|
|
@ -318,7 +318,7 @@ def test_open_local_sub_to_stream(
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
async def cancel_outer_cs(
|
async def maybe_cancel_outer_cs(
|
||||||
cs: trio.CancelScope|None = None,
|
cs: trio.CancelScope|None = None,
|
||||||
delay: float = 0,
|
delay: float = 0,
|
||||||
):
|
):
|
||||||
|
|
@ -332,12 +332,31 @@ async def cancel_outer_cs(
|
||||||
if cs:
|
if cs:
|
||||||
log.info('task calling cs.cancel()')
|
log.info('task calling cs.cancel()')
|
||||||
cs.cancel()
|
cs.cancel()
|
||||||
trio.lowlevel.checkpoint()
|
|
||||||
yield
|
yield
|
||||||
|
|
||||||
|
if cs:
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
# XXX, if not cancelled we'll leak this inf-blocking
|
||||||
|
# subtask to the actor's service tn..
|
||||||
|
else:
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'delay',
|
||||||
|
[0.05, 0.5, 1],
|
||||||
|
ids="pre_sleep_delay={}".format,
|
||||||
|
)
|
||||||
|
@pytest.mark.parametrize(
|
||||||
|
'cancel_by_cs',
|
||||||
|
[True, False],
|
||||||
|
ids="cancel_by_cs={}".format,
|
||||||
|
)
|
||||||
def test_lock_not_corrupted_on_fast_cancel(
|
def test_lock_not_corrupted_on_fast_cancel(
|
||||||
|
delay: float,
|
||||||
|
cancel_by_cs: bool,
|
||||||
debug_mode: bool,
|
debug_mode: bool,
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
):
|
):
|
||||||
|
|
@ -354,17 +373,14 @@ def test_lock_not_corrupted_on_fast_cancel(
|
||||||
due to it having erronously exited without calling
|
due to it having erronously exited without calling
|
||||||
`lock.release()`.
|
`lock.release()`.
|
||||||
|
|
||||||
|
|
||||||
'''
|
'''
|
||||||
delay: float = 1.
|
|
||||||
|
|
||||||
async def use_moc(
|
async def use_moc(
|
||||||
cs: trio.CancelScope|None,
|
|
||||||
delay: float,
|
delay: float,
|
||||||
|
cs: trio.CancelScope|None = None,
|
||||||
):
|
):
|
||||||
log.info('task entering moc')
|
log.info('task entering moc')
|
||||||
async with maybe_open_context(
|
async with maybe_open_context(
|
||||||
cancel_outer_cs,
|
maybe_cancel_outer_cs,
|
||||||
kwargs={
|
kwargs={
|
||||||
'cs': cs,
|
'cs': cs,
|
||||||
'delay': delay,
|
'delay': delay,
|
||||||
|
|
@ -375,8 +391,14 @@ def test_lock_not_corrupted_on_fast_cancel(
|
||||||
else:
|
else:
|
||||||
log.info('1st task entered')
|
log.info('1st task entered')
|
||||||
|
|
||||||
|
if cs:
|
||||||
await trio.sleep_forever()
|
await trio.sleep_forever()
|
||||||
|
|
||||||
|
else:
|
||||||
|
await trio.sleep(delay)
|
||||||
|
|
||||||
|
# ^END, exit shared ctx.
|
||||||
|
|
||||||
async def main():
|
async def main():
|
||||||
with trio.fail_after(delay + 2):
|
with trio.fail_after(delay + 2):
|
||||||
async with (
|
async with (
|
||||||
|
|
@ -384,6 +406,7 @@ def test_lock_not_corrupted_on_fast_cancel(
|
||||||
debug_mode=debug_mode,
|
debug_mode=debug_mode,
|
||||||
loglevel=loglevel,
|
loglevel=loglevel,
|
||||||
),
|
),
|
||||||
|
# ?TODO, pass this as the parent tn?
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
):
|
):
|
||||||
get_console_log('info')
|
get_console_log('info')
|
||||||
|
|
@ -391,15 +414,216 @@ def test_lock_not_corrupted_on_fast_cancel(
|
||||||
cs = tn.cancel_scope
|
cs = tn.cancel_scope
|
||||||
tn.start_soon(
|
tn.start_soon(
|
||||||
use_moc,
|
use_moc,
|
||||||
cs,
|
|
||||||
delay,
|
delay,
|
||||||
|
cs if cancel_by_cs else None,
|
||||||
name='child',
|
name='child',
|
||||||
)
|
)
|
||||||
with trio.CancelScope() as rent_cs:
|
with trio.CancelScope() as rent_cs:
|
||||||
await use_moc(
|
await use_moc(
|
||||||
cs=rent_cs,
|
|
||||||
delay=delay,
|
delay=delay,
|
||||||
|
cs=rent_cs if cancel_by_cs else None,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def acm_with_resource(resource_id: str):
|
||||||
|
'''
|
||||||
|
Yield `resource_id` as the cached value.
|
||||||
|
|
||||||
|
Used to verify per-`ctx_key` isolation when the same
|
||||||
|
`acm_func` is called with different kwargs.
|
||||||
|
|
||||||
|
'''
|
||||||
|
yield resource_id
|
||||||
|
|
||||||
|
|
||||||
|
def test_per_ctx_key_resource_lifecycle(
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Verify that `maybe_open_context()` correctly isolates resource
|
||||||
|
lifecycle **per `ctx_key`** when the same `acm_func` is called
|
||||||
|
with different kwargs.
|
||||||
|
|
||||||
|
Previously `_Cache.users` was a single global `int` and
|
||||||
|
`_Cache.locks` was keyed on `fid` (function ID), so calling
|
||||||
|
the same `acm_func` with different kwargs (producing different
|
||||||
|
`ctx_key`s) meant:
|
||||||
|
|
||||||
|
- teardown for one key was skipped bc the *other* key's users
|
||||||
|
kept the global count > 0,
|
||||||
|
- and re-entry could hit the old
|
||||||
|
`assert not resources.get(ctx_key)` crash during the
|
||||||
|
teardown window.
|
||||||
|
|
||||||
|
This was the root cause of a long-standing bug in piker's
|
||||||
|
`brokerd.kraken` backend.
|
||||||
|
|
||||||
|
'''
|
||||||
|
timeout: float = 6
|
||||||
|
if debug_mode:
|
||||||
|
timeout = 999
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
a_ready = trio.Event()
|
||||||
|
a_exit = trio.Event()
|
||||||
|
|
||||||
|
async def hold_resource_a():
|
||||||
|
'''
|
||||||
|
Open resource 'a' and keep it alive until signalled.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async with maybe_open_context(
|
||||||
|
acm_with_resource,
|
||||||
|
kwargs={'resource_id': 'a'},
|
||||||
|
) as (cache_hit, value):
|
||||||
|
assert not cache_hit
|
||||||
|
assert value == 'a'
|
||||||
|
log.info("resource 'a' entered (holding)")
|
||||||
|
a_ready.set()
|
||||||
|
await a_exit.wait()
|
||||||
|
log.info("resource 'a' exiting")
|
||||||
|
|
||||||
|
with trio.fail_after(timeout):
|
||||||
|
async with (
|
||||||
|
tractor.open_root_actor(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
loglevel=loglevel,
|
||||||
|
),
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
|
# Phase 1: bg task holds resource 'a' open.
|
||||||
|
tn.start_soon(hold_resource_a)
|
||||||
|
await a_ready.wait()
|
||||||
|
|
||||||
|
# Phase 2: open resource 'b' (different kwargs,
|
||||||
|
# same acm_func) then exit it while 'a' is still
|
||||||
|
# alive.
|
||||||
|
async with maybe_open_context(
|
||||||
|
acm_with_resource,
|
||||||
|
kwargs={'resource_id': 'b'},
|
||||||
|
) as (cache_hit, value):
|
||||||
|
assert not cache_hit
|
||||||
|
assert value == 'b'
|
||||||
|
log.info("resource 'b' entered")
|
||||||
|
|
||||||
|
log.info("resource 'b' exited, waiting for teardown")
|
||||||
|
await trio.lowlevel.checkpoint()
|
||||||
|
|
||||||
|
# Phase 3: re-open 'b'; must be a fresh cache MISS
|
||||||
|
# proving 'b' was torn down independently of 'a'.
|
||||||
|
#
|
||||||
|
# With the old global `_Cache.users` counter this
|
||||||
|
# would be a stale cache HIT (leaked resource) or
|
||||||
|
# trigger `assert not resources.get(ctx_key)`.
|
||||||
|
async with maybe_open_context(
|
||||||
|
acm_with_resource,
|
||||||
|
kwargs={'resource_id': 'b'},
|
||||||
|
) as (cache_hit, value):
|
||||||
|
assert not cache_hit, (
|
||||||
|
"resource 'b' was NOT torn down despite "
|
||||||
|
"having zero users! (global user count bug)"
|
||||||
|
)
|
||||||
|
assert value == 'b'
|
||||||
|
log.info(
|
||||||
|
"resource 'b' re-entered "
|
||||||
|
"(cache miss, correct)"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Phase 4: let 'a' exit, clean shutdown.
|
||||||
|
a_exit.set()
|
||||||
|
|
||||||
|
trio.run(main)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.xfail(
|
||||||
|
reason=(
|
||||||
|
'Demonstrates the `_Cache.run_ctx` teardown race: '
|
||||||
|
'a re-entering task hits '
|
||||||
|
'`assert not resources.get(ctx_key)` because '
|
||||||
|
'`values` was popped but `resources` was not yet '
|
||||||
|
'(acm `__aexit__` checkpoint in between). '
|
||||||
|
'Fixed by per-`ctx_key` locking in 9e49eddd.'
|
||||||
|
),
|
||||||
|
raises=AssertionError,
|
||||||
|
)
|
||||||
|
def test_moc_reentry_during_teardown(
|
||||||
|
debug_mode: bool,
|
||||||
|
loglevel: str,
|
||||||
|
):
|
||||||
|
'''
|
||||||
|
Reproduce the piker `open_cached_client('kraken')` race:
|
||||||
|
|
||||||
|
- same `acm_func`, NO kwargs (identical `ctx_key`)
|
||||||
|
- multiple tasks share the cached resource
|
||||||
|
- all users exit -> teardown starts
|
||||||
|
- a NEW task enters during `_Cache.run_ctx.__aexit__`
|
||||||
|
- `values[ctx_key]` is gone (popped in inner finally)
|
||||||
|
but `resources[ctx_key]` still exists (outer finally
|
||||||
|
hasn't run yet bc the acm cleanup has checkpoints)
|
||||||
|
- old code: `assert not resources.get(ctx_key)` FIRES
|
||||||
|
|
||||||
|
This models the real-world scenario where `brokerd.kraken`
|
||||||
|
tasks concurrently call `open_cached_client('kraken')`
|
||||||
|
(same `acm_func`, empty kwargs, shared `ctx_key`) and
|
||||||
|
the teardown/re-entry race triggers intermittently.
|
||||||
|
|
||||||
|
'''
|
||||||
|
async def main():
|
||||||
|
in_aexit = trio.Event()
|
||||||
|
|
||||||
|
@acm
|
||||||
|
async def cached_client():
|
||||||
|
'''
|
||||||
|
Simulates `kraken.api.get_client()`:
|
||||||
|
- no params (all callers share one `ctx_key`)
|
||||||
|
- slow-ish cleanup to widen the race window
|
||||||
|
between `values.pop()` and `resources.pop()`
|
||||||
|
inside `_Cache.run_ctx`.
|
||||||
|
|
||||||
|
'''
|
||||||
|
yield 'the-client'
|
||||||
|
# Signal that we're in __aexit__ — at this
|
||||||
|
# point `values` has already been popped by
|
||||||
|
# `run_ctx`'s inner finally, but `resources`
|
||||||
|
# is still alive (outer finally hasn't run).
|
||||||
|
in_aexit.set()
|
||||||
|
await trio.sleep(10)
|
||||||
|
|
||||||
|
first_done = trio.Event()
|
||||||
|
|
||||||
|
async def use_and_exit():
|
||||||
|
async with maybe_open_context(
|
||||||
|
cached_client,
|
||||||
|
) as (cache_hit, value):
|
||||||
|
assert value == 'the-client'
|
||||||
|
first_done.set()
|
||||||
|
|
||||||
|
async def reenter_during_teardown():
|
||||||
|
'''
|
||||||
|
Wait for the acm's `__aexit__` to start (meaning
|
||||||
|
`values` is popped but `resources` still exists),
|
||||||
|
then re-enter — triggering the assert.
|
||||||
|
|
||||||
|
'''
|
||||||
|
await in_aexit.wait()
|
||||||
|
async with maybe_open_context(
|
||||||
|
cached_client,
|
||||||
|
) as (cache_hit, value):
|
||||||
|
assert value == 'the-client'
|
||||||
|
|
||||||
|
with trio.fail_after(5):
|
||||||
|
async with (
|
||||||
|
tractor.open_root_actor(
|
||||||
|
debug_mode=debug_mode,
|
||||||
|
loglevel=loglevel,
|
||||||
|
),
|
||||||
|
trio.open_nursery() as tn,
|
||||||
|
):
|
||||||
|
tn.start_soon(use_and_exit)
|
||||||
|
tn.start_soon(reenter_during_teardown)
|
||||||
|
|
||||||
trio.run(main)
|
trio.run(main)
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue