diff --git a/piker/_cacheables.py b/piker/_cacheables.py index d15e7c45..87b700f5 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -31,13 +31,11 @@ from typing import ( ) from contextlib import ( asynccontextmanager, - AsyncExitStack, ) import trio from trio_typing import TaskStatus import tractor -from tractor._portal import maybe_open_nursery from .brokers import get_brokermod from .log import get_logger @@ -86,8 +84,11 @@ class cache: ''' lock = trio.Lock() users: int = 0 - values: dict[tuple[str, str], tuple[AsyncExitStack, Any]] = {} - nurseries: dict[int, Optional[trio.Nursery]] = {} + values: dict[Any, Any] = {} + resources: dict[ + int, + Optional[tuple[trio.Nursery, trio.Event]] + ] = {} no_more_users: Optional[trio.Event] = None @classmethod @@ -100,15 +101,15 @@ class cache: ) -> None: async with mng as value: - cls.no_more_users = trio.Event() + _, no_more_users = cls.resources[id(mng)] cls.values[key] = value task_status.started(value) try: - await cls.no_more_users.wait() + await no_more_users.wait() finally: value = cls.values.pop(key) # discard nursery ref so it won't be re-used (an error) - cls.nurseries.pop(id(mng)) + cls.resources.pop(id(mng)) @asynccontextmanager @@ -128,13 +129,12 @@ async def maybe_open_ctx( ctx_key = id(mngr) - value = None try: # lock feed acquisition around task racing / ``trio``'s # scheduler protocol value = cache.values[key] - log.info(f'Reusing cached feed for {key}') + log.info(f'Reusing cached resource for {key}') cache.users += 1 cache.lock.release() yield True, value @@ -146,14 +146,15 @@ async def maybe_open_ctx( # checking the cache until complete otherwise the scheduler # may switch and by accident we create more then one feed. - # TODO: vaoid pulling from ``tractor`` internals and - # instead offer a "root nursery" in + # TODO: avoid pulling from ``tractor`` internals and + # instead offer a "root nursery" in piker actors? service_n = tractor.current_actor()._service_n # TODO: does this need to be a tractor "root nursery"? - ln = cache.nurseries.get(ctx_key) + ln = cache.resources.get(ctx_key) assert not ln - ln = cache.nurseries[ctx_key] = service_n + + ln, _ = cache.resources[ctx_key] = (service_n, trio.Event()) value = await ln.start(cache.run_ctx, mngr, key) cache.users += 1 @@ -170,10 +171,11 @@ async def maybe_open_ctx( if value is not None: # if no more consumers, teardown the client if cache.users <= 0: - log.warning(f'De-allocating feed for {key}') + log.warning(f'De-allocating resource for {key}') # terminate mngr nursery - cache.no_more_users.set() + _, no_more_users = cache.resources[ctx_key] + no_more_users.set() @asynccontextmanager