diff --git a/ai/prompt-io/claude/20260406T222648Z_f086222_prompt_io.md b/ai/prompt-io/claude/20260406T222648Z_f086222_prompt_io.md new file mode 100644 index 00000000..c6a593c4 --- /dev/null +++ b/ai/prompt-io/claude/20260406T222648Z_f086222_prompt_io.md @@ -0,0 +1,52 @@ +--- +model: claude-opus-4-6 +service: claude +session: (ad-hoc, not tracked via conf.toml) +timestamp: 2026-04-06T22:26:48Z +git_ref: f086222 +scope: code +substantive: true +raw_file: 20260406T222648Z_f086222_prompt_io.raw.md +--- + +## Prompt + +User applied the per-key locking patch (`f086222d`), +removed the `@pytest.mark.xfail` from +`test_moc_reentry_during_teardown`, and found the +test STILL fails — now with `RuntimeError` instead +of `AssertionError`. Asked to diagnose why the fix +doesn't actually resolve this race. + +## Response summary + +Traced the exact scheduling sequence across three +concurrent tasks to show that the per-`ctx_key` +locking change doesn't close the fundamental gap: + +- `maybe_open_context`'s finally pops the lock and + sets `no_more_users` in the **exiting caller's + task** +- But the actual resource cleanup (`values.pop`, + acm `__aexit__`, `resources.pop`) runs in the + **`run_ctx` task** inside `service_tn` +- A re-entering task creates a fresh lock (old one + popped), acquires immediately, and hits stale + `resources[ctx_key]` while `run_ctx` is still in + `__aexit__` + +Identified three possible fix strategies: +1. Let `run_ctx` own lock release/pop after + `resources.pop()` +2. Add a "teardown complete" event that new callers + wait on +3. Move `resources.pop()` into `run_ctx`'s inner + finally (before acm `__aexit__`) + +## Files changed + +No code changes — analysis only. + +## Human edits + +N/A — diagnostic output, no code generated. diff --git a/ai/prompt-io/claude/20260406T222648Z_f086222_prompt_io.raw.md b/ai/prompt-io/claude/20260406T222648Z_f086222_prompt_io.raw.md new file mode 100644 index 00000000..95edeedf --- /dev/null +++ b/ai/prompt-io/claude/20260406T222648Z_f086222_prompt_io.raw.md @@ -0,0 +1,31 @@ +--- +model: claude-opus-4-6 +service: claude +timestamp: 2026-04-06T22:26:48Z +git_ref: f086222 +--- + +Here's the problem. The per-key locking doesn't fix this race because the **lock lifetime is decoupled from `run_ctx`'s lifetime**. Trace through what happens: + +**`maybe_open_context` finally** (lines 468-495): +1. `users[ctx_key] -= 1` → 0 +2. `no_more_users.set()` → wakes `run_ctx` (but it doesn't run yet, just becomes ready) +3. `_Cache.locks.pop(ctx_key)` → **lock is gone** +4. `maybe_open_context` exits + +**`run_ctx`** now gets scheduled (lines 237-241): +5. Inner finally: `values.pop(ctx_key)` → values gone +6. `mng.__aexit__()` runs → `in_aexit.set()`, `await trio.sleep(10)` → **CHECKPOINT** + +**`reenter_during_teardown`** wakes: +7. Enters `maybe_open_context` → no lock exists for `ctx_key` → creates **fresh** lock, acquires immediately +8. `values[ctx_key]` → `KeyError` +9. `resources.get(ctx_key)` → **still exists!** (step 6 hasn't reached `resources.pop()` in `run_ctx`'s outer finally yet) +10. `raise RuntimeError('Caching resources ALREADY exist?!')` + +The core issue: `no_more_users.set()` (step 2) and `locks.pop()` (step 3) happen in the **exiting caller's task**, but the actual resource cleanup (`values.pop`, acm `__aexit__`, `resources.pop`) happens in the **`run_ctx` task** inside `service_tn`. There's no synchronization between "lock removed, new callers welcome" and "run_ctx has actually finished tearing down". The per-`ctx_key` lock change just made the lock more granular — it didn't close this gap. + +The fix needs to ensure a new caller can't acquire a lock for `ctx_key` until `run_ctx` has fully exited (including `resources.pop()`). Either: +- **don't pop the lock in `maybe_open_context`** — let `run_ctx` release/pop it after `resources.pop()` +- **wait on a "teardown complete" event** in the cache-miss path when stale `resources` exist +- **pop `resources` before the acm `__aexit__`** (move `resources.pop()` into the inner finally alongside `values.pop()`) diff --git a/tests/test_resource_cache.py b/tests/test_resource_cache.py index 450bc7c3..29876539 100644 --- a/tests/test_resource_cache.py +++ b/tests/test_resource_cache.py @@ -12,6 +12,7 @@ import trio import tractor from tractor.trionics import ( maybe_open_context, + collapse_eg, ) from tractor.log import ( get_console_log, @@ -539,17 +540,6 @@ def test_per_ctx_key_resource_lifecycle( trio.run(main) -@pytest.mark.xfail( - reason=( - 'Demonstrates the `_Cache.run_ctx` teardown race: ' - 'a re-entering task hits ' - '`assert not resources.get(ctx_key)` because ' - '`values` was popped but `resources` was not yet ' - '(acm `__aexit__` checkpoint in between). ' - 'Fixed by per-`ctx_key` locking in 9e49eddd.' - ), - raises=AssertionError, -) def test_moc_reentry_during_teardown( debug_mode: bool, loglevel: str, @@ -621,6 +611,7 @@ def test_moc_reentry_during_teardown( debug_mode=debug_mode, loglevel=loglevel, ), + collapse_eg(), trio.open_nursery() as tn, ): tn.start_soon(use_and_exit) diff --git a/tractor/trionics/_mngrs.py b/tractor/trionics/_mngrs.py index 9524ffe1..601d196d 100644 --- a/tractor/trionics/_mngrs.py +++ b/tractor/trionics/_mngrs.py @@ -19,6 +19,7 @@ Async context manager primitives with hard ``trio``-aware semantics ''' from __future__ import annotations +from collections import defaultdict from contextlib import ( asynccontextmanager as acm, ) @@ -135,7 +136,7 @@ async def gather_contexts( ''' seed: int = id(mngrs) - unwrapped: dict[int, T | None] = {}.fromkeys( + unwrapped: dict[int, T|None] = {}.fromkeys( (id(mngr) for mngr in mngrs), seed, ) @@ -204,8 +205,11 @@ class _Cache: ''' service_tn: trio.Nursery|None = None - locks: dict[Hashable, trio.Lock] = {} - users: int = 0 + locks: dict[Hashable, trio.StrictFIFOLock] = {} + users: defaultdict[ + tuple|Hashable, + int, + ] = defaultdict(int) values: dict[Any, Any] = {} resources: dict[ Hashable, @@ -222,29 +226,36 @@ class _Cache: task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED, ) -> None: - try: - async with mng as value: - _, no_more_users = cls.resources[ctx_key] - try: - cls.values[ctx_key] = value - task_status.started(value) - await no_more_users.wait() - finally: - value = cls.values.pop(ctx_key) - finally: - # discard nursery ref so it won't be re-used (an error)? - cls.resources.pop(ctx_key) + async with mng as value: + _, no_more_users = cls.resources[ctx_key] + cls.values[ctx_key] = value + task_status.started(value) + try: + await no_more_users.wait() + finally: + value = cls.values.pop(ctx_key) + cls.resources.pop(ctx_key) + + +class _UnresolvedCtx: + ''' + Placeholder for the maybe-value delivered from some `acm_func`, + once (first) entered by a `maybe_open_context()` task. + + Enables internal teardown logic conditioned on whether the + context was actually entered successfully vs. cancelled prior. + + ''' @acm async def maybe_open_context( - acm_func: Callable[..., AsyncContextManager[T]], # XXX: used as cache key after conversion to tuple # and all embedded values must also be hashable kwargs: dict = {}, - key: Hashable | Callable[..., Hashable] = None, + key: Hashable|Callable[..., Hashable] = None, # caller can provide their own scope tn: trio.Nursery|None = None, @@ -257,25 +268,55 @@ async def maybe_open_context( Return the `_Cached` instance on a _Cache hit. ''' - fid = id(acm_func) - + fid: int = id(acm_func) if inspect.isfunction(key): - ctx_key = (fid, key(**kwargs)) + ctx_key = ( + fid, + key(**kwargs) + ) else: - ctx_key = (fid, key or tuple(kwargs.items())) + ctx_key = ( + fid, + key or tuple(kwargs.items()) + ) # yielded output - yielded: Any = None - lock_registered: bool = False + # sentinel = object() + yielded: Any = _UnresolvedCtx + user_registered: bool = False # Lock resource acquisition around task racing / ``trio``'s # scheduler protocol. # NOTE: the lock is target context manager func specific in order # to allow re-entrant use cases where one `maybe_open_context()` - # wrapped factor may want to call into another. - lock = _Cache.locks.setdefault(fid, trio.Lock()) - lock_registered: bool = True + # wrapped factory may want to call into another. + task: trio.Task = trio.lowlevel.current_task() + lock: trio.StrictFIFOLock|None = _Cache.locks.get( + ctx_key + ) + if not lock: + lock = _Cache.locks[ + ctx_key + ] = trio.StrictFIFOLock() + header: str = 'Allocated NEW lock for @acm_func,\n' + else: + await trio.lowlevel.checkpoint() + header: str = 'Reusing OLD lock for @acm_func,\n' + + log.debug( + f'{header}' + f'Acquiring..\n' + f'task={task!r}\n' + f'ctx_key={ctx_key!r}\n' + f'acm_func={acm_func}\n' + ) await lock.acquire() + log.debug( + f'Acquired lock..\n' + f'task={task!r}\n' + f'ctx_key={ctx_key!r}\n' + f'acm_func={acm_func}\n' + ) # XXX: one singleton nursery per actor and we want to # have it not be closed until all consumers have exited (which is @@ -312,6 +353,7 @@ async def maybe_open_context( # checking the _Cache until complete otherwise the scheduler # may switch and by accident we create more then one resource. yielded = _Cache.values[ctx_key] + # XXX^ should key-err if not-yet-allocated except KeyError as _ke: # XXX, stay mutexed up to cache-miss yield @@ -324,17 +366,35 @@ async def maybe_open_context( ) mngr = acm_func(**kwargs) resources = _Cache.resources - assert not resources.get(ctx_key), f'Resource exists? {ctx_key}' + entry: tuple|None = resources.get(ctx_key) + if entry: + service_tn, ev = entry + raise RuntimeError( + f'Caching resources ALREADY exist?!\n' + f'ctx_key={ctx_key!r}\n' + f'acm_func={acm_func}\n' + f'task: {task}\n' + ) + resources[ctx_key] = (service_tn, trio.Event()) - yielded: Any = await service_tn.start( - _Cache.run_ctx, - mngr, - ctx_key, - ) - _Cache.users += 1 + try: + yielded: Any = await service_tn.start( + _Cache.run_ctx, + mngr, + ctx_key, + ) + except BaseException: + # If `run_ctx` (wrapping the acm's `__aenter__`) + # fails or is cancelled, clean up the `resources` + # entry we just set — OW it leaks permanently. + # |_ https://github.com/goodboy/tractor/pull/436#discussion_r3047201323 + resources.pop(ctx_key, None) + raise + _Cache.users[ctx_key] += 1 + user_registered = True finally: # XXX, since this runs from an `except` it's a checkpoint - # whih can be `trio.Cancelled`-masked. + # which can be `trio.Cancelled`-masked. # # NOTE, in that case the mutex is never released by the # (first and) caching task and **we can't** simply shield @@ -365,9 +425,10 @@ async def maybe_open_context( maybe_taskc.__context__ = None raise taskc - else: - _Cache.users += 1 + # XXX, cached-entry-path + _Cache.users[ctx_key] += 1 + user_registered = True log.debug( f'Re-using cached resource for user {_Cache.users}\n\n' f'{ctx_key!r} -> {type(yielded)}\n' @@ -386,17 +447,28 @@ async def maybe_open_context( finally: if lock.locked(): stats: trio.LockStatistics = lock.statistics() + owner: trio.Task|None = stats.owner log.error( - f'Lock left locked by last owner !?\n' + f'Lock never released by last owner={owner!r} !?\n' f'{stats}\n' + f'\n' + f'task={task!r}\n' + f'ctx_key={ctx_key!r}\n' + f'acm_func={acm_func}\n' + ) - _Cache.users -= 1 + if user_registered: + _Cache.users[ctx_key] -= 1 - if yielded is not None: + if yielded is not _UnresolvedCtx: # if no more consumers, teardown the client - if _Cache.users <= 0: - log.debug(f'De-allocating resource for {ctx_key}') + if _Cache.users[ctx_key] <= 0: + log.debug( + f'De-allocating @acm-func entry\n' + f'ctx_key={ctx_key!r}\n' + f'acm_func={acm_func!r}\n' + ) # XXX: if we're cancelled we the entry may have never # been entered since the nursery task was killed. @@ -406,9 +478,11 @@ async def maybe_open_context( _, no_more_users = entry no_more_users.set() - if lock_registered: - maybe_lock = _Cache.locks.pop(fid, None) - if maybe_lock is None: - log.error( - f'Resource lock for {fid} ALREADY POPPED?' - ) + maybe_lock = _Cache.locks.pop( + ctx_key, + None, + ) + if maybe_lock is None: + log.error( + f'Resource lock for {ctx_key} ALREADY POPPED?' + )