From ff322ae7be35fe6fc224e999a472c93f76f6bc79 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 30 Aug 2021 17:39:53 -0400 Subject: [PATCH] Re-impl ctx-mng caching using `trio.Nursery.start()` Maybe i've finally learned my lesson that exit stacks and per task ctx manager caching is just not trionic.. Use the approach we've taken for the daemon service manager as well: create a process global nursery for each unique ctx manager we wish to cache and simply tear it down when the number of consumers goes to zero. This seems to resolve all prior issues and gets us error-free cached feeds! --- piker/_cacheables.py | 105 +++++++++++++++++++++++++------------------ 1 file changed, 62 insertions(+), 43 deletions(-) diff --git a/piker/_cacheables.py b/piker/_cacheables.py index d4e6ccfa..d86b477d 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -23,19 +23,20 @@ Cacheing apis and toolz. from collections import OrderedDict from typing import ( - Optional, + Any, Hashable, + Optional, TypeVar, AsyncContextManager, - AsyncIterable, ) from contextlib import ( asynccontextmanager, AsyncExitStack, - contextmanager, ) import trio +from trio_typing import TaskStatus +from tractor._portal import maybe_open_nursery from .brokers import get_brokermod from .log import get_logger @@ -132,14 +133,35 @@ async def open_cached_client( class cache: '''Globally (processs wide) cached, task access to a - kept-alive-while-in-use data feed. + kept-alive-while-in-use async resource. ''' lock = trio.Lock() users: int = 0 - ctxs: dict[tuple[str, str], AsyncIterable] = {} + values: dict[tuple[str, str], tuple[AsyncExitStack, Any]] = {} + nurseries: dict[int, Optional[trio.Nursery]] = {} no_more_users: Optional[trio.Event] = None + @classmethod + async def run_ctx( + cls, + mng, + key, + task_status: TaskStatus[T] = trio.TASK_STATUS_IGNORED, + + ) -> None: + async with mng as value: + + cls.no_more_users = trio.Event() + cls.values[key] = value + task_status.started(value) + try: + await cls.no_more_users.wait() + finally: + value = cls.values.pop(key) + # discard nursery ref so it won't be re-used (an error) + cls.nurseries.pop(id(mng)) + @asynccontextmanager async def maybe_open_ctx( @@ -153,51 +175,48 @@ async def maybe_open_ctx( a cache hit. ''' - @contextmanager - def get_and_use() -> AsyncIterable[T]: - # key error must bubble here - value = cache.ctxs[key] - log.info(f'Reusing cached feed for {key}') - try: - cache.users += 1 - yield value - finally: - cache.users -= 1 - if cache.users == 0: - # signal to original allocator task feed use is complete - cache.no_more_users.set() - try: - with get_and_use() as value: - yield True, value - except KeyError: - # lock feed acquisition around task racing / ``trio``'s - # scheduler protocol - await cache.lock.acquire() + await cache.lock.acquire() + + ctx_key = id(mngr) + + # TODO: does this need to be a tractor "root nursery"? + async with maybe_open_nursery(cache.nurseries.get(ctx_key)) as n: + cache.nurseries[ctx_key] = n + + value = None try: - with get_and_use() as value: - cache.lock.release() - yield True, value - return + # lock feed acquisition around task racing / ``trio``'s + # scheduler protocol + value = cache.values[key] + log.info(f'Reusing cached feed for {key}') + cache.users += 1 + cache.lock.release() + yield True, value except KeyError: + log.info(f'Allocating new feed for {key}') + # **critical section** that should prevent other tasks from # checking the cache until complete otherwise the scheduler # may switch and by accident we create more then one feed. - cache.no_more_users = trio.Event() - log.info(f'Allocating new feed for {key}') - # TODO: eventually support N-brokers - async with mngr as value: - cache.ctxs[key] = value + value = await n.start(cache.run_ctx, mngr, key) + cache.users += 1 + cache.lock.release() + + yield False, value + + finally: + cache.users -= 1 + + if cache.lock.locked(): cache.lock.release() - try: - yield True, value - finally: - # don't tear down the feed until there are zero - # users of it left. - if cache.users > 0: - await cache.no_more_users.wait() - log.warning('De-allocating feed for {key}') - cache.ctxs.pop(key) + if value is not None: + # if no more consumers, teardown the client + if cache.users <= 0: + log.warning(f'De-allocating feed for {key}') + + # terminate mngr nursery + cache.no_more_users.set()