Address Copilot review fixes on `maybe_open_context()`
Deats, - drop unused `import tractor` (F401) - fix `_Cache.locks` annotation to `trio.StrictFIFOLock` - fix typos: "mabye-value", "Acquir lock" - add `resources.pop()` cleanup in the caller if `service_tn.start()` fails — prevents a permanent `_Cache.resources` leak on `__aenter__` failure (note: Copilot's suggested outer `try/finally` in `run_ctx` would re-introduce the atomicity gap) - add `user_registered` flag so `users -= 1` only runs when the task actually incremented - move lock pop into the `users <= 0` teardown block so the last exiting user always cleans up, regardless of who created the lock; drop now-dead `lock_registered` var Also, - swap `fid` for `ctx_key` in debug log msgs - remove stale commented-out `# fid` refs Review: PR #436 (copilot-pull-request-reviewer) https://github.com/goodboy/tractor/pull/436 (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-codesubint_spawner_backend
parent
4fc477cfd6
commit
391c8d3566
|
|
@ -40,7 +40,6 @@ from typing import (
|
||||||
import trio
|
import trio
|
||||||
from tractor.runtime._state import current_actor
|
from tractor.runtime._state import current_actor
|
||||||
from tractor.log import get_logger
|
from tractor.log import get_logger
|
||||||
import tractor
|
|
||||||
# from ._beg import collapse_eg
|
# from ._beg import collapse_eg
|
||||||
# from ._taskc import (
|
# from ._taskc import (
|
||||||
# maybe_raise_from_masking_exc,
|
# maybe_raise_from_masking_exc,
|
||||||
|
|
@ -206,7 +205,7 @@ class _Cache:
|
||||||
|
|
||||||
'''
|
'''
|
||||||
service_tn: trio.Nursery|None = None
|
service_tn: trio.Nursery|None = None
|
||||||
locks: dict[Hashable, trio.Lock] = {}
|
locks: dict[Hashable, trio.StrictFIFOLock] = {}
|
||||||
users: defaultdict[
|
users: defaultdict[
|
||||||
tuple|Hashable,
|
tuple|Hashable,
|
||||||
int,
|
int,
|
||||||
|
|
@ -240,7 +239,7 @@ class _Cache:
|
||||||
|
|
||||||
class _UnresolvedCtx:
|
class _UnresolvedCtx:
|
||||||
'''
|
'''
|
||||||
Placeholder for the mabye-value delivered from some `acm_func`,
|
Placeholder for the maybe-value delivered from some `acm_func`,
|
||||||
once (first) entered by a `maybe_open_context()` task.
|
once (first) entered by a `maybe_open_context()` task.
|
||||||
|
|
||||||
Enables internal teardown logic conditioned on whether the
|
Enables internal teardown logic conditioned on whether the
|
||||||
|
|
@ -284,7 +283,7 @@ async def maybe_open_context(
|
||||||
# yielded output
|
# yielded output
|
||||||
# sentinel = object()
|
# sentinel = object()
|
||||||
yielded: Any = _UnresolvedCtx
|
yielded: Any = _UnresolvedCtx
|
||||||
lock_registered: bool = False
|
user_registered: bool = False
|
||||||
|
|
||||||
# Lock resource acquisition around task racing / ``trio``'s
|
# Lock resource acquisition around task racing / ``trio``'s
|
||||||
# scheduler protocol.
|
# scheduler protocol.
|
||||||
|
|
@ -293,17 +292,13 @@ async def maybe_open_context(
|
||||||
# wrapped factory may want to call into another.
|
# wrapped factory may want to call into another.
|
||||||
task: trio.Task = trio.lowlevel.current_task()
|
task: trio.Task = trio.lowlevel.current_task()
|
||||||
lock: trio.StrictFIFOLock|None = _Cache.locks.get(
|
lock: trio.StrictFIFOLock|None = _Cache.locks.get(
|
||||||
# fid
|
|
||||||
ctx_key
|
ctx_key
|
||||||
)
|
)
|
||||||
if not lock:
|
if not lock:
|
||||||
lock = _Cache.locks[
|
lock = _Cache.locks[
|
||||||
ctx_key
|
ctx_key
|
||||||
# fid
|
|
||||||
] = trio.StrictFIFOLock()
|
] = trio.StrictFIFOLock()
|
||||||
# lock = _Cache.locks[fid] = trio.Lock()
|
|
||||||
header: str = 'Allocated NEW lock for @acm_func,\n'
|
header: str = 'Allocated NEW lock for @acm_func,\n'
|
||||||
lock_registered: bool = True
|
|
||||||
else:
|
else:
|
||||||
await trio.lowlevel.checkpoint()
|
await trio.lowlevel.checkpoint()
|
||||||
header: str = 'Reusing OLD lock for @acm_func,\n'
|
header: str = 'Reusing OLD lock for @acm_func,\n'
|
||||||
|
|
@ -312,14 +307,14 @@ async def maybe_open_context(
|
||||||
f'{header}'
|
f'{header}'
|
||||||
f'Acquiring..\n'
|
f'Acquiring..\n'
|
||||||
f'task={task!r}\n'
|
f'task={task!r}\n'
|
||||||
f'fid={fid!r}\n'
|
f'ctx_key={ctx_key!r}\n'
|
||||||
f'acm_func={acm_func}\n'
|
f'acm_func={acm_func}\n'
|
||||||
)
|
)
|
||||||
await lock.acquire()
|
await lock.acquire()
|
||||||
log.debug(
|
log.debug(
|
||||||
f'Acquir lock..\n'
|
f'Acquired lock..\n'
|
||||||
f'task={task!r}\n'
|
f'task={task!r}\n'
|
||||||
f'fid={fid!r}\n'
|
f'ctx_key={ctx_key!r}\n'
|
||||||
f'acm_func={acm_func}\n'
|
f'acm_func={acm_func}\n'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
@ -385,12 +380,21 @@ async def maybe_open_context(
|
||||||
)
|
)
|
||||||
|
|
||||||
resources[ctx_key] = (service_tn, trio.Event())
|
resources[ctx_key] = (service_tn, trio.Event())
|
||||||
yielded: Any = await service_tn.start(
|
try:
|
||||||
_Cache.run_ctx,
|
yielded: Any = await service_tn.start(
|
||||||
mngr,
|
_Cache.run_ctx,
|
||||||
ctx_key,
|
mngr,
|
||||||
)
|
ctx_key,
|
||||||
|
)
|
||||||
|
except BaseException:
|
||||||
|
# If `run_ctx` (wrapping the acm's `__aenter__`)
|
||||||
|
# fails or is cancelled, clean up the `resources`
|
||||||
|
# entry we just set — OW it leaks permanently.
|
||||||
|
# |_ https://github.com/goodboy/tractor/pull/436#discussion_r3047201323
|
||||||
|
resources.pop(ctx_key, None)
|
||||||
|
raise
|
||||||
_Cache.users[ctx_key] += 1
|
_Cache.users[ctx_key] += 1
|
||||||
|
user_registered = True
|
||||||
finally:
|
finally:
|
||||||
# XXX, since this runs from an `except` it's a checkpoint
|
# XXX, since this runs from an `except` it's a checkpoint
|
||||||
# which can be `trio.Cancelled`-masked.
|
# which can be `trio.Cancelled`-masked.
|
||||||
|
|
@ -427,6 +431,7 @@ async def maybe_open_context(
|
||||||
else:
|
else:
|
||||||
# XXX, cached-entry-path
|
# XXX, cached-entry-path
|
||||||
_Cache.users[ctx_key] += 1
|
_Cache.users[ctx_key] += 1
|
||||||
|
user_registered = True
|
||||||
log.debug(
|
log.debug(
|
||||||
f'Re-using cached resource for user {_Cache.users}\n\n'
|
f'Re-using cached resource for user {_Cache.users}\n\n'
|
||||||
f'{ctx_key!r} -> {type(yielded)}\n'
|
f'{ctx_key!r} -> {type(yielded)}\n'
|
||||||
|
|
@ -451,14 +456,15 @@ async def maybe_open_context(
|
||||||
f'{stats}\n'
|
f'{stats}\n'
|
||||||
f'\n'
|
f'\n'
|
||||||
f'task={task!r}\n'
|
f'task={task!r}\n'
|
||||||
f'fid={fid!r}\n'
|
f'ctx_key={ctx_key!r}\n'
|
||||||
f'acm_func={acm_func}\n'
|
f'acm_func={acm_func}\n'
|
||||||
|
|
||||||
)
|
)
|
||||||
# XXX, trace it.
|
# XXX, trace it.
|
||||||
# await tractor.pause(shield=True)
|
# await tractor.pause(shield=True)
|
||||||
|
|
||||||
_Cache.users[ctx_key] -= 1
|
if user_registered:
|
||||||
|
_Cache.users[ctx_key] -= 1
|
||||||
|
|
||||||
if yielded is not _UnresolvedCtx:
|
if yielded is not _UnresolvedCtx:
|
||||||
# if no more consumers, teardown the client
|
# if no more consumers, teardown the client
|
||||||
|
|
@ -477,12 +483,11 @@ async def maybe_open_context(
|
||||||
_, no_more_users = entry
|
_, no_more_users = entry
|
||||||
no_more_users.set()
|
no_more_users.set()
|
||||||
|
|
||||||
if lock_registered:
|
maybe_lock = _Cache.locks.pop(
|
||||||
maybe_lock = _Cache.locks.pop(
|
ctx_key,
|
||||||
ctx_key,
|
None,
|
||||||
None,
|
)
|
||||||
|
if maybe_lock is None:
|
||||||
|
log.error(
|
||||||
|
f'Resource lock for {ctx_key} ALREADY POPPED?'
|
||||||
)
|
)
|
||||||
if maybe_lock is None:
|
|
||||||
log.error(
|
|
||||||
f'Resource lock for {ctx_key} ALREADY POPPED?'
|
|
||||||
)
|
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue