Merge pull request #436 from goodboy/moc_revert_to_fix
Moc revert to fix: reversion fix for bug in #435subint_spawner_backend
commit
f47010d7e9
|
|
@ -0,0 +1,52 @@
|
|||
---
|
||||
model: claude-opus-4-6
|
||||
service: claude
|
||||
session: (ad-hoc, not tracked via conf.toml)
|
||||
timestamp: 2026-04-06T22:26:48Z
|
||||
git_ref: f086222
|
||||
scope: code
|
||||
substantive: true
|
||||
raw_file: 20260406T222648Z_f086222_prompt_io.raw.md
|
||||
---
|
||||
|
||||
## Prompt
|
||||
|
||||
User applied the per-key locking patch (`f086222d`),
|
||||
removed the `@pytest.mark.xfail` from
|
||||
`test_moc_reentry_during_teardown`, and found the
|
||||
test STILL fails — now with `RuntimeError` instead
|
||||
of `AssertionError`. Asked to diagnose why the fix
|
||||
doesn't actually resolve this race.
|
||||
|
||||
## Response summary
|
||||
|
||||
Traced the exact scheduling sequence across three
|
||||
concurrent tasks to show that the per-`ctx_key`
|
||||
locking change doesn't close the fundamental gap:
|
||||
|
||||
- `maybe_open_context`'s finally pops the lock and
|
||||
sets `no_more_users` in the **exiting caller's
|
||||
task**
|
||||
- But the actual resource cleanup (`values.pop`,
|
||||
acm `__aexit__`, `resources.pop`) runs in the
|
||||
**`run_ctx` task** inside `service_tn`
|
||||
- A re-entering task creates a fresh lock (old one
|
||||
popped), acquires immediately, and hits stale
|
||||
`resources[ctx_key]` while `run_ctx` is still in
|
||||
`__aexit__`
|
||||
|
||||
Identified three possible fix strategies:
|
||||
1. Let `run_ctx` own lock release/pop after
|
||||
`resources.pop()`
|
||||
2. Add a "teardown complete" event that new callers
|
||||
wait on
|
||||
3. Move `resources.pop()` into `run_ctx`'s inner
|
||||
finally (before acm `__aexit__`)
|
||||
|
||||
## Files changed
|
||||
|
||||
No code changes — analysis only.
|
||||
|
||||
## Human edits
|
||||
|
||||
N/A — diagnostic output, no code generated.
|
||||
|
|
@ -0,0 +1,31 @@
|
|||
---
|
||||
model: claude-opus-4-6
|
||||
service: claude
|
||||
timestamp: 2026-04-06T22:26:48Z
|
||||
git_ref: f086222
|
||||
---
|
||||
|
||||
Here's the problem. The per-key locking doesn't fix this race because the **lock lifetime is decoupled from `run_ctx`'s lifetime**. Trace through what happens:
|
||||
|
||||
**`maybe_open_context` finally** (lines 468-495):
|
||||
1. `users[ctx_key] -= 1` → 0
|
||||
2. `no_more_users.set()` → wakes `run_ctx` (but it doesn't run yet, just becomes ready)
|
||||
3. `_Cache.locks.pop(ctx_key)` → **lock is gone**
|
||||
4. `maybe_open_context` exits
|
||||
|
||||
**`run_ctx`** now gets scheduled (lines 237-241):
|
||||
5. Inner finally: `values.pop(ctx_key)` → values gone
|
||||
6. `mng.__aexit__()` runs → `in_aexit.set()`, `await trio.sleep(10)` → **CHECKPOINT**
|
||||
|
||||
**`reenter_during_teardown`** wakes:
|
||||
7. Enters `maybe_open_context` → no lock exists for `ctx_key` → creates **fresh** lock, acquires immediately
|
||||
8. `values[ctx_key]` → `KeyError`
|
||||
9. `resources.get(ctx_key)` → **still exists!** (step 6 hasn't reached `resources.pop()` in `run_ctx`'s outer finally yet)
|
||||
10. `raise RuntimeError('Caching resources ALREADY exist?!')`
|
||||
|
||||
The core issue: `no_more_users.set()` (step 2) and `locks.pop()` (step 3) happen in the **exiting caller's task**, but the actual resource cleanup (`values.pop`, acm `__aexit__`, `resources.pop`) happens in the **`run_ctx` task** inside `service_tn`. There's no synchronization between "lock removed, new callers welcome" and "run_ctx has actually finished tearing down". The per-`ctx_key` lock change just made the lock more granular — it didn't close this gap.
|
||||
|
||||
The fix needs to ensure a new caller can't acquire a lock for `ctx_key` until `run_ctx` has fully exited (including `resources.pop()`). Either:
|
||||
- **don't pop the lock in `maybe_open_context`** — let `run_ctx` release/pop it after `resources.pop()`
|
||||
- **wait on a "teardown complete" event** in the cache-miss path when stale `resources` exist
|
||||
- **pop `resources` before the acm `__aexit__`** (move `resources.pop()` into the inner finally alongside `values.pop()`)
|
||||
|
|
@ -12,6 +12,7 @@ import trio
|
|||
import tractor
|
||||
from tractor.trionics import (
|
||||
maybe_open_context,
|
||||
collapse_eg,
|
||||
)
|
||||
from tractor.log import (
|
||||
get_console_log,
|
||||
|
|
@ -539,17 +540,6 @@ def test_per_ctx_key_resource_lifecycle(
|
|||
trio.run(main)
|
||||
|
||||
|
||||
@pytest.mark.xfail(
|
||||
reason=(
|
||||
'Demonstrates the `_Cache.run_ctx` teardown race: '
|
||||
'a re-entering task hits '
|
||||
'`assert not resources.get(ctx_key)` because '
|
||||
'`values` was popped but `resources` was not yet '
|
||||
'(acm `__aexit__` checkpoint in between). '
|
||||
'Fixed by per-`ctx_key` locking in 9e49eddd.'
|
||||
),
|
||||
raises=AssertionError,
|
||||
)
|
||||
def test_moc_reentry_during_teardown(
|
||||
debug_mode: bool,
|
||||
loglevel: str,
|
||||
|
|
@ -621,6 +611,7 @@ def test_moc_reentry_during_teardown(
|
|||
debug_mode=debug_mode,
|
||||
loglevel=loglevel,
|
||||
),
|
||||
collapse_eg(),
|
||||
trio.open_nursery() as tn,
|
||||
):
|
||||
tn.start_soon(use_and_exit)
|
||||
|
|
|
|||
|
|
@ -19,6 +19,7 @@ Async context manager primitives with hard ``trio``-aware semantics
|
|||
|
||||
'''
|
||||
from __future__ import annotations
|
||||
from collections import defaultdict
|
||||
from contextlib import (
|
||||
asynccontextmanager as acm,
|
||||
)
|
||||
|
|
@ -135,7 +136,7 @@ async def gather_contexts(
|
|||
|
||||
'''
|
||||
seed: int = id(mngrs)
|
||||
unwrapped: dict[int, T | None] = {}.fromkeys(
|
||||
unwrapped: dict[int, T|None] = {}.fromkeys(
|
||||
(id(mngr) for mngr in mngrs),
|
||||
seed,
|
||||
)
|
||||
|
|
@ -204,8 +205,11 @@ class _Cache:
|
|||
|
||||
'''
|
||||
service_tn: trio.Nursery|None = None
|
||||
locks: dict[Hashable, trio.Lock] = {}
|
||||
users: int = 0
|
||||
locks: dict[Hashable, trio.StrictFIFOLock] = {}
|
||||
users: defaultdict[
|
||||
tuple|Hashable,
|
||||
int,
|
||||
] = defaultdict(int)
|
||||
values: dict[Any, Any] = {}
|
||||
resources: dict[
|
||||
Hashable,
|
||||
|
|
@ -222,29 +226,36 @@ class _Cache:
|
|||
task_status: trio.TaskStatus[T] = trio.TASK_STATUS_IGNORED,
|
||||
|
||||
) -> None:
|
||||
try:
|
||||
async with mng as value:
|
||||
_, no_more_users = cls.resources[ctx_key]
|
||||
try:
|
||||
cls.values[ctx_key] = value
|
||||
task_status.started(value)
|
||||
await no_more_users.wait()
|
||||
finally:
|
||||
value = cls.values.pop(ctx_key)
|
||||
finally:
|
||||
# discard nursery ref so it won't be re-used (an error)?
|
||||
cls.resources.pop(ctx_key)
|
||||
async with mng as value:
|
||||
_, no_more_users = cls.resources[ctx_key]
|
||||
cls.values[ctx_key] = value
|
||||
task_status.started(value)
|
||||
try:
|
||||
await no_more_users.wait()
|
||||
finally:
|
||||
value = cls.values.pop(ctx_key)
|
||||
cls.resources.pop(ctx_key)
|
||||
|
||||
|
||||
class _UnresolvedCtx:
|
||||
'''
|
||||
Placeholder for the maybe-value delivered from some `acm_func`,
|
||||
once (first) entered by a `maybe_open_context()` task.
|
||||
|
||||
Enables internal teardown logic conditioned on whether the
|
||||
context was actually entered successfully vs. cancelled prior.
|
||||
|
||||
'''
|
||||
|
||||
|
||||
@acm
|
||||
async def maybe_open_context(
|
||||
|
||||
acm_func: Callable[..., AsyncContextManager[T]],
|
||||
|
||||
# XXX: used as cache key after conversion to tuple
|
||||
# and all embedded values must also be hashable
|
||||
kwargs: dict = {},
|
||||
key: Hashable | Callable[..., Hashable] = None,
|
||||
key: Hashable|Callable[..., Hashable] = None,
|
||||
|
||||
# caller can provide their own scope
|
||||
tn: trio.Nursery|None = None,
|
||||
|
|
@ -257,25 +268,55 @@ async def maybe_open_context(
|
|||
Return the `_Cached` instance on a _Cache hit.
|
||||
|
||||
'''
|
||||
fid = id(acm_func)
|
||||
|
||||
fid: int = id(acm_func)
|
||||
if inspect.isfunction(key):
|
||||
ctx_key = (fid, key(**kwargs))
|
||||
ctx_key = (
|
||||
fid,
|
||||
key(**kwargs)
|
||||
)
|
||||
else:
|
||||
ctx_key = (fid, key or tuple(kwargs.items()))
|
||||
ctx_key = (
|
||||
fid,
|
||||
key or tuple(kwargs.items())
|
||||
)
|
||||
|
||||
# yielded output
|
||||
yielded: Any = None
|
||||
lock_registered: bool = False
|
||||
# sentinel = object()
|
||||
yielded: Any = _UnresolvedCtx
|
||||
user_registered: bool = False
|
||||
|
||||
# Lock resource acquisition around task racing / ``trio``'s
|
||||
# scheduler protocol.
|
||||
# NOTE: the lock is target context manager func specific in order
|
||||
# to allow re-entrant use cases where one `maybe_open_context()`
|
||||
# wrapped factor may want to call into another.
|
||||
lock = _Cache.locks.setdefault(fid, trio.Lock())
|
||||
lock_registered: bool = True
|
||||
# wrapped factory may want to call into another.
|
||||
task: trio.Task = trio.lowlevel.current_task()
|
||||
lock: trio.StrictFIFOLock|None = _Cache.locks.get(
|
||||
ctx_key
|
||||
)
|
||||
if not lock:
|
||||
lock = _Cache.locks[
|
||||
ctx_key
|
||||
] = trio.StrictFIFOLock()
|
||||
header: str = 'Allocated NEW lock for @acm_func,\n'
|
||||
else:
|
||||
await trio.lowlevel.checkpoint()
|
||||
header: str = 'Reusing OLD lock for @acm_func,\n'
|
||||
|
||||
log.debug(
|
||||
f'{header}'
|
||||
f'Acquiring..\n'
|
||||
f'task={task!r}\n'
|
||||
f'ctx_key={ctx_key!r}\n'
|
||||
f'acm_func={acm_func}\n'
|
||||
)
|
||||
await lock.acquire()
|
||||
log.debug(
|
||||
f'Acquired lock..\n'
|
||||
f'task={task!r}\n'
|
||||
f'ctx_key={ctx_key!r}\n'
|
||||
f'acm_func={acm_func}\n'
|
||||
)
|
||||
|
||||
# XXX: one singleton nursery per actor and we want to
|
||||
# have it not be closed until all consumers have exited (which is
|
||||
|
|
@ -312,6 +353,7 @@ async def maybe_open_context(
|
|||
# checking the _Cache until complete otherwise the scheduler
|
||||
# may switch and by accident we create more then one resource.
|
||||
yielded = _Cache.values[ctx_key]
|
||||
# XXX^ should key-err if not-yet-allocated
|
||||
|
||||
except KeyError as _ke:
|
||||
# XXX, stay mutexed up to cache-miss yield
|
||||
|
|
@ -324,17 +366,35 @@ async def maybe_open_context(
|
|||
)
|
||||
mngr = acm_func(**kwargs)
|
||||
resources = _Cache.resources
|
||||
assert not resources.get(ctx_key), f'Resource exists? {ctx_key}'
|
||||
entry: tuple|None = resources.get(ctx_key)
|
||||
if entry:
|
||||
service_tn, ev = entry
|
||||
raise RuntimeError(
|
||||
f'Caching resources ALREADY exist?!\n'
|
||||
f'ctx_key={ctx_key!r}\n'
|
||||
f'acm_func={acm_func}\n'
|
||||
f'task: {task}\n'
|
||||
)
|
||||
|
||||
resources[ctx_key] = (service_tn, trio.Event())
|
||||
yielded: Any = await service_tn.start(
|
||||
_Cache.run_ctx,
|
||||
mngr,
|
||||
ctx_key,
|
||||
)
|
||||
_Cache.users += 1
|
||||
try:
|
||||
yielded: Any = await service_tn.start(
|
||||
_Cache.run_ctx,
|
||||
mngr,
|
||||
ctx_key,
|
||||
)
|
||||
except BaseException:
|
||||
# If `run_ctx` (wrapping the acm's `__aenter__`)
|
||||
# fails or is cancelled, clean up the `resources`
|
||||
# entry we just set — OW it leaks permanently.
|
||||
# |_ https://github.com/goodboy/tractor/pull/436#discussion_r3047201323
|
||||
resources.pop(ctx_key, None)
|
||||
raise
|
||||
_Cache.users[ctx_key] += 1
|
||||
user_registered = True
|
||||
finally:
|
||||
# XXX, since this runs from an `except` it's a checkpoint
|
||||
# whih can be `trio.Cancelled`-masked.
|
||||
# which can be `trio.Cancelled`-masked.
|
||||
#
|
||||
# NOTE, in that case the mutex is never released by the
|
||||
# (first and) caching task and **we can't** simply shield
|
||||
|
|
@ -365,9 +425,10 @@ async def maybe_open_context(
|
|||
maybe_taskc.__context__ = None
|
||||
|
||||
raise taskc
|
||||
|
||||
else:
|
||||
_Cache.users += 1
|
||||
# XXX, cached-entry-path
|
||||
_Cache.users[ctx_key] += 1
|
||||
user_registered = True
|
||||
log.debug(
|
||||
f'Re-using cached resource for user {_Cache.users}\n\n'
|
||||
f'{ctx_key!r} -> {type(yielded)}\n'
|
||||
|
|
@ -386,17 +447,28 @@ async def maybe_open_context(
|
|||
finally:
|
||||
if lock.locked():
|
||||
stats: trio.LockStatistics = lock.statistics()
|
||||
owner: trio.Task|None = stats.owner
|
||||
log.error(
|
||||
f'Lock left locked by last owner !?\n'
|
||||
f'Lock never released by last owner={owner!r} !?\n'
|
||||
f'{stats}\n'
|
||||
f'\n'
|
||||
f'task={task!r}\n'
|
||||
f'ctx_key={ctx_key!r}\n'
|
||||
f'acm_func={acm_func}\n'
|
||||
|
||||
)
|
||||
|
||||
_Cache.users -= 1
|
||||
if user_registered:
|
||||
_Cache.users[ctx_key] -= 1
|
||||
|
||||
if yielded is not None:
|
||||
if yielded is not _UnresolvedCtx:
|
||||
# if no more consumers, teardown the client
|
||||
if _Cache.users <= 0:
|
||||
log.debug(f'De-allocating resource for {ctx_key}')
|
||||
if _Cache.users[ctx_key] <= 0:
|
||||
log.debug(
|
||||
f'De-allocating @acm-func entry\n'
|
||||
f'ctx_key={ctx_key!r}\n'
|
||||
f'acm_func={acm_func!r}\n'
|
||||
)
|
||||
|
||||
# XXX: if we're cancelled we the entry may have never
|
||||
# been entered since the nursery task was killed.
|
||||
|
|
@ -406,9 +478,11 @@ async def maybe_open_context(
|
|||
_, no_more_users = entry
|
||||
no_more_users.set()
|
||||
|
||||
if lock_registered:
|
||||
maybe_lock = _Cache.locks.pop(fid, None)
|
||||
if maybe_lock is None:
|
||||
log.error(
|
||||
f'Resource lock for {fid} ALREADY POPPED?'
|
||||
)
|
||||
maybe_lock = _Cache.locks.pop(
|
||||
ctx_key,
|
||||
None,
|
||||
)
|
||||
if maybe_lock is None:
|
||||
log.error(
|
||||
f'Resource lock for {ctx_key} ALREADY POPPED?'
|
||||
)
|
||||
|
|
|
|||
Loading…
Reference in New Issue