diff --git a/piker/_cacheables.py b/piker/_cacheables.py index c1113431..48fcdeef 100644 --- a/piker/_cacheables.py +++ b/piker/_cacheables.py @@ -18,19 +18,25 @@ Cacheing apis and toolz. """ -from typing import Dict -from contextlib import asynccontextmanager, AsyncExitStack +from typing import Optional +from contextlib import ( + asynccontextmanager, + AsyncExitStack, + contextmanager, +) import trio from .brokers import get_brokermod from .log import get_logger +from . import data +from .data.feed import Feed log = get_logger(__name__) -_cache: Dict[str, 'Client'] = {} # noqa +_cache: dict[str, 'Client'] = {} # noqa @asynccontextmanager @@ -83,3 +89,82 @@ async def open_cached_client( client._consumers -= 1 if client._consumers <= 0: await client._exit_stack.aclose() + + +class cache: + '''Globally (processs wide) cached, task access to a + kept-alive-while-in-use data feed. + + ''' + lock = trio.Lock() + users: int = 0 + feeds: dict[tuple[str, str], Feed] = {} + no_more_users: Optional[trio.Event] = None + + +@asynccontextmanager +async def maybe_open_feed( + + broker: str, + symbol: str, + loglevel: str, + +) -> Feed: + + key = (broker, symbol) + + @contextmanager + def get_and_use() -> Feed: + # key error must bubble here + feed = cache.feeds[key] + log.info(f'Reusing cached feed for {key}') + try: + cache.users += 1 + yield feed + finally: + cache.users -= 1 + if cache.users == 0: + # signal to original allocator task feed use is complete + cache.no_more_users.set() + + try: + with get_and_use() as feed: + yield feed + except KeyError: + # lock feed acquisition around task racing / ``trio``'s + # scheduler protocol + await cache.lock.acquire() + try: + with get_and_use() as feed: + cache.lock.release() + yield feed + return + + except KeyError: + # **critical section** that should prevent other tasks from + # checking the cache until complete otherwise the scheduler + # may switch and by accident we create more then one feed. + + cache.no_more_users = trio.Event() + + log.info(f'Allocating new feed for {key}') + # TODO: eventually support N-brokers + async with ( + data.open_feed( + broker, + [symbol], + loglevel=loglevel, + ) as feed, + ): + cache.feeds[key] = feed + cache.lock.release() + try: + yield feed + finally: + # don't tear down the feed until there are zero + # users of it left. + if cache.users > 0: + await cache.no_more_users.wait() + + log.warning('De-allocating feed for {key}') + cache.feeds.pop(key) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index cd6985f3..ad5c6cf4 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -22,7 +22,7 @@ from contextlib import asynccontextmanager from dataclasses import dataclass, field from pprint import pformat import time -from typing import AsyncIterator, Callable, Optional +from typing import AsyncIterator, Callable from bidict import bidict from pydantic import BaseModel @@ -30,11 +30,11 @@ import trio from trio_typing import TaskStatus import tractor -from .. import data from ..log import get_logger from ..data._normalize import iterticks from ..data.feed import Feed from .._daemon import maybe_spawn_brokerd +from .._cacheables import maybe_open_feed from . import _paper_engine as paper from ._messages import ( Status, Order, @@ -132,7 +132,6 @@ async def clear_dark_triggers( brokerd_orders_stream: tractor.MsgStream, ems_client_order_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa - broker: str, symbol: str, @@ -902,73 +901,6 @@ async def process_client_order_cmds( ) -class cache: - '''Globally (processs wide) cached, task access to a - kept-alive-while-in-use data feed. - - ''' - lock = trio.Lock() - users: int = 0 - feeds: dict[tuple[str, str], Feed] = {} - no_more_users: Optional[trio.Event] = None - - -@asynccontextmanager -async def maybe_open_clearing_feed( - - broker: str, - symbol: str, - loglevel: str, - -) -> Feed: - try: - log.info(f'Reusing existing feed for {(broker, symbol)}') - yield cache.feeds[(broker, symbol)] - except KeyError: - # lock feed acquisition around task racing / ``trio``'s scheduler protocol - await cache.lock.acquire() - try: - cache.users += 1 - cached_feed = cache.feeds[(broker, symbol)] - cache.lock.release() - try: - yield cached_feed - finally: - cache.users -= 1 - if cache.users == 0: - # signal to original allocator task feed use is complete - cache.no_more_users.set() - return - - except KeyError: - # **critical section** that should prevent other tasks from - # checking the cache until complete otherwise the scheduler - # may switch and by accident we create more then one feed. - - cache.no_more_users = trio.Event() - - log.warning(f'Creating new feed for {(broker, symbol)}') - # TODO: eventually support N-brokers - async with ( - data.open_feed( - broker, - [symbol], - loglevel=loglevel, - ) as feed, - ): - cache.feeds[(broker, symbol)] = feed - cache.lock.release() - try: - yield feed - finally: - # don't tear down the feed until there are zero - # users of it left. - if cache.users > 0: - await cache.no_more_users.wait() - - cache.feeds.pop((broker, symbol)) - - @tractor.context async def _emsd_main( @@ -1027,7 +959,7 @@ async def _emsd_main( # spawn one task per broker feed async with ( - maybe_open_clearing_feed( + maybe_open_feed( broker, symbol, loglevel=loglevel, @@ -1073,11 +1005,9 @@ async def _emsd_main( n.start_soon( clear_dark_triggers, - # relay.brokerd_dialogue, brokerd_stream, ems_client_order_stream, feed.stream, - broker, symbol, book