From 2514843fc1f80e130b7a13a52ce6f5baa03b040f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 27 Jan 2019 14:50:04 -0500 Subject: [PATCH] Port to the new `@tractor.msg.pub` decorator API The pub-sub data feed system was factored into `tractor` as an experimental api / subsystem. Move to using that which greatly simplifies the data feed architecture. --- piker/brokers/data.py | 216 ++++++++++++------------------------------ 1 file changed, 61 insertions(+), 155 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index bc3ff369..09847615 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -44,10 +44,38 @@ async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: await trio.sleep(sleep) +# TODO: at this point probably just just make this a class and +# a lot of these functions should be methods. It will definitely +# make stateful UI apps easier to implement +@dataclass +class BrokerFeed: + """A per broker "client feed" container. + + A structure to keep track of components used by + real-time data daemons. This is a backend "client" which pulls + data from broker specific data lakes: + + ``DataFeed`` <- tractor -> ``BrokerFeed`` <- broker IPC -> broker API + """ + mod: ModuleType + client: object + exit_stack: contextlib.AsyncExitStack + quoter_keys: Tuple[str] = ('stock', 'option') + locks: Dict[str, trio.StrictFIFOLock] = field( + default_factory=lambda: + {'stock': trio.StrictFIFOLock(), 'option': trio.StrictFIFOLock()} + ) + quoters: Dict[str, typing.Coroutine] = field(default_factory=dict) + subscriptions: Dict[str, Dict[str, set]] = field( + default_factory=partial(dict, **{'option': {}, 'stock': {}}) + ) + + +@tractor.msg.pub(tasks=['stock', 'option']) async def stream_quotes( get_topics: typing.Callable, get_quotes: Coroutine, - brokermod: ModuleType, + feed: BrokerFeed, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -57,6 +85,11 @@ async def stream_quotes( A stock-broker client ``get_quotes()`` async context manager must be provided which returns an async quote retrieval function. """ + broker_limit = getattr(feed.mod, '_rate_limit', float('inf')) + if broker_limit < rate: + rate = broker_limit + log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") + sleeptime = round(1. / rate, 3) _cache = {} # ticker to quote caching @@ -94,11 +127,11 @@ async def stream_quotes( log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote - new_quotes[symbol] = quote + new_quotes[quote['key']] = quote else: log.info(f"Delivering quotes:\n{quotes}") for quote in quotes: - newquotes[quote['symbol']] = quote + new_quotes[quote['symbol']] = quote yield new_quotes @@ -117,90 +150,6 @@ async def stream_quotes( await trio.sleep(delay) -# TODO: at this point probably just just make this a class and -# a lot of these functions should be methods. It will definitely -# make stateful UI apps easier to implement -@dataclass -class BrokerFeed: - """A per broker "client feed" container. - - A structure to keep track of components used by - real-time data daemons. This is a backend "client" which pulls - data from broker specific data lakes: - - ``DataFeed`` <- tractor -> ``BrokerFeed`` <- broker IPC -> broker API - """ - mod: ModuleType - client: object - exit_stack: contextlib.AsyncExitStack - quoter_keys: Tuple[str] = ('stock', 'option') - locks: Dict[str, trio.StrictFIFOLock] = field( - default_factory=lambda: - {'stock': trio.StrictFIFOLock(), 'option': trio.StrictFIFOLock()} - ) - quoters: Dict[str, typing.Coroutine] = field(default_factory=dict) - subscriptions: Dict[str, Dict[str, set]] = field( - default_factory=partial(dict, **{'option': {}, 'stock': {}}) - ) - - -async def fan_out_to_ctxs( - pub_gen: typing.AsyncGenerator, - feed: BrokerFeed, - get_quotes: Coroutine, - topics2ctxs: Dict[str, tractor.Context], - topic_key: str = 'key', - packet_key: str = 'symbol', - rate: int = 5, # delay between quote requests - diff_cached: bool = True, # only deliver "new" quotes to the queue -) -> None: - """Request and fan out quotes to each subscribed actor channel. - """ - broker_limit = getattr(feed.mod, '_rate_limit', float('inf')) - if broker_limit < rate: - rate = broker_limit - log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") - - def get_topics(): - return tuple(topics2ctxs.keys()) - - async for published in pub_gen( - get_topics, - get_quotes, - feed.mod, - rate, - diff_cached=diff_cached, - ): - ctx_payloads = {} - for packet_key, data in published.items(): - # grab each suscription topic using provided key for lookup - topic = data[topic_key] - # build a new dict packet for passing to multiple underlying channels - packet = {packet_key: data} - for ctx in topics2ctxs.get(topic, set()): - ctx_payloads.setdefault(ctx, {}).update(packet), - - # deliver to each subscriber (fan out) - if ctx_payloads: - for ctx, payload in ctx_payloads.items(): - try: - await ctx.send_yield(payload) - except ( - # That's right, anything you can think of... - trio.ClosedStreamError, ConnectionResetError, - ConnectionRefusedError, - ): - log.warn(f"{ctx.chan} went down?") - for ctx_set in topics2ctxs.values(): - ctx_set.discard(ctx) - - if not any(topics2ctxs.values()): - log.warn(f"No subs left for broker {feed.mod.name}, exiting task") - break - - log.info(f"Terminating stream quoter task for {pub_gen.__name__}") - - async def symbol_data(broker: str, tickers: List[str]): """Retrieve baseline symbol info from broker. """ @@ -255,38 +204,6 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### -def modify_quote_stream(broker, feed_type, symbols, ctx): - """Absolute symbol subscription list for each quote stream. - - Effectively a symbol subscription api. - """ - log.info(f"{ctx.chan} changed symbol subscription to {symbols}") - ss = tractor.current_actor().statespace - feed = ss['feeds'].get(broker) - if feed is None: - raise RuntimeError( - "`get_cached_feed()` must be called before modifying its stream" - ) - - symbols2ctxs = feed.subscriptions[feed_type] - # update map from each symbol to requesting client's chan - for ticker in symbols: - symbols2ctxs.setdefault(ticker, set()).add(ctx) - - # remove any existing symbol subscriptions if symbol is not - # found in ``symbols`` - # TODO: this can likely be factored out into the pub-sub api - for ticker in filter( - lambda ticker: ticker not in symbols, symbols2ctxs.copy() - ): - ctx_set = symbols2ctxs.get(ticker) - ctx_set.discard(ctx) - - if not ctx_set: - # pop empty sets which will trigger bg quoter task termination - symbols2ctxs.pop(ticker) - - async def get_cached_feed( brokername: str, ) -> BrokerFeed: @@ -317,11 +234,11 @@ async def get_cached_feed( async def start_quote_stream( + ctx: tractor.Context, # marks this as a streaming func broker: str, symbols: List[Any], feed_type: str = 'stock', diff_cached: bool = True, - ctx: tractor.Context = None, rate: int = 3, ) -> None: """Handle per-broker quote stream subscriptions using a "lazy" pub-sub @@ -341,7 +258,7 @@ async def start_quote_stream( # another actor task may have already created it feed = await get_cached_feed(broker) symbols2ctxs = feed.subscriptions[feed_type] - task_is_dead = None + packetizer = None if feed_type == 'stock': get_quotes = feed.quoters.setdefault( @@ -351,8 +268,7 @@ async def start_quote_stream( # do a smoke quote (note this mutates the input list and filters # out bad symbols for now) payload = await smoke_quote(get_quotes, symbols, broker) - # push initial smoke quote response for client initialization - await ctx.send_yield(payload) + elif feed_type == 'option': # FIXME: yeah we need maybe a more general way to specify # the arg signature for the option feed beasides a symbol @@ -361,46 +277,36 @@ async def start_quote_stream( 'option', await feed.mod.option_quoter(feed.client, symbols) ) + # packetize payload = { quote['symbol']: quote for quote in await get_quotes(symbols) } - # push initial smoke quote response for client initialization - await ctx.send_yield(payload) + + def packetizer(topic, quote): + return {quote['symbol']: quote} + + # push initial smoke quote response for client initialization + await ctx.send_yield(payload) + try: - # update map from each symbol to requesting client's chan - modify_quote_stream(broker, feed_type, symbols, ctx) + await stream_quotes( - # prevents more then one broker feed task from spawning - lock = feed.locks.get(feed_type) + # pub required kwargs + task_name=feed_type, + ctx=ctx, + topics=symbols, + packetizer=packetizer, - # block and let existing feed task deliver - # stream data until it is cancelled in which case - # we'll take over and spawn it again - async with lock: - # no data feeder task yet; so start one - respawn = True - while respawn: - respawn = False - log.info(f"Spawning data feed task for {feed.mod.name}") - try: - # unblocks when no more symbols subscriptions exist and the - # quote streamer task terminates - await fan_out_to_ctxs( - stream_quotes, - feed, - get_quotes, - symbols2ctxs, - diff_cached=diff_cached, - rate=rate, - ) - except trio.BrokenResourceError: - log.exception("Respawning failed data feed task") - respawn = True + # actual func args + feed=feed, + get_quotes=get_quotes, + diff_cached=diff_cached, + rate=rate, + ) + log.info( + f"Terminating stream quoter task for {feed.mod.name}") finally: - # if we're cancelled externally unsubscribe our quote feed - modify_quote_stream(broker, feed_type, [], ctx) - # if there are truly no more subscriptions with this broker # drop from broker subs dict if not any(symbols2ctxs.values()):