diff --git a/piker/brokers/api.py b/piker/brokers/api.py index ba54a565..f54a0e86 100644 --- a/piker/brokers/api.py +++ b/piker/brokers/api.py @@ -28,6 +28,7 @@ from ..log import get_logger log = get_logger(__name__) +_clients: Dict[str, 'Client'] = {} @asynccontextmanager async def get_cached_client( @@ -39,29 +40,38 @@ async def get_cached_client( If one has not been setup do it and cache it. """ - # check if a cached client is in the local actor's statespace - ss = tractor.current_actor().statespace + global _clients + clients = ss.setdefault('clients', {'_lock': trio.Lock()}) lock = clients['_lock'] + client = None + try: - log.info(f"Loading existing `{brokername}` daemon") + log.info(f"Loading existing `{brokername}` client") + async with lock: client = clients[brokername] client._consumers += 1 + yield client + except KeyError: log.info(f"Creating new client for broker {brokername}") + async with lock: brokermod = get_brokermod(brokername) exit_stack = AsyncExitStack() + client = await exit_stack.enter_async_context( brokermod.get_client() ) client._consumers = 0 client._exit_stack = exit_stack clients[brokername] = client + yield client + finally: client._consumers -= 1 if client._consumers <= 0: