From 6359623019397218e3cfec6a127c8d0221006813 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 17 Apr 2018 16:53:29 -0400 Subject: [PATCH] Allow broker specific subscriptions Allow client connections to subscribe for quote streams from specific brokers and spawn broker-client quoter tasks on-demand according to client connection demands. Support multiple subscribers to a single daemon process. --- piker/brokers/core.py | 219 +++++++++++++++++++++++------------------- 1 file changed, 120 insertions(+), 99 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index db3ec785..b70f3a23 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -12,6 +12,7 @@ from typing import AsyncContextManager import trio from ..log import get_logger +from . import get_brokermod log = get_logger('broker.core') @@ -126,10 +127,8 @@ class StreamQueue: async def poll_tickers( - client: 'Client', - quoter: AsyncContextManager, - tickers: [str], - queue: StreamQueue, + brokermod: ModuleType, + tickers2qs: {str: StreamQueue}, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -142,107 +141,129 @@ async def poll_tickers( sleeptime = round(1. / rate, 3) _cache = {} # ticker to quote caching - async with quoter(client, tickers) as get_quotes: - # run a first quote smoke test filtering out any bad tickers - first_quotes_dict = await get_quotes(tickers) - # FIXME: oh god it's so hideous - tickers[:] = list(first_quotes_dict.keys())[:] - - while True: # use an event here to trigger exit? - prequote_start = time.time() - - with trio.move_on_after(3) as cancel_scope: - quotes = await get_quotes(tickers) - - cancelled = cancel_scope.cancelled_caught - if cancelled: - log.warn("Quote query timed out after 3 seconds, retrying...") - # handle network outages by idling until response is received - quotes = await wait_for_network(partial(get_quotes, tickers)) - - postquote_start = time.time() - payload = {} - for symbol, quote in quotes.items(): - # FIXME: None is returned if a symbol can't be found. - # Consider filtering out such symbols before starting poll loop - if quote is None: - continue - - if diff_cached: - # if cache is enabled then only deliver "new" changes - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - payload[symbol] = quote - else: - payload[symbol] = quote - - if payload: - await queue.put(payload) - - req_time = round(postquote_start - prequote_start, 3) - proc_time = round(time.time() - postquote_start, 3) - tot = req_time + proc_time - log.debug(f"Request + processing took {tot}") - delay = sleeptime - tot - if delay <= 0: - log.warn( - f"Took {req_time} (request) + {proc_time} (processing) " - f"= {tot} secs (> {sleeptime}) for processing quotes?") - else: - log.debug(f"Sleeping for {delay}") - await trio.sleep(delay) - - -async def _handle_subs( - queue, - stream2tickers, - nursery, - task_status=trio.TASK_STATUS_IGNORED -): - """Handle quote stream subscriptions. - """ - async with queue.stream: - async for tickers in queue: - task_status.started(tickers) - log.info(f"{queue.peer} subscribed for tickers {tickers}") - stream2tickers[queue.peer] = tickers - else: - log.info(f"{queue.peer} was disconnected") - nursery.cancel_scope.cancel() - - -async def _daemon_main(brokermod): - """Main entry point for the piker daemon. - """ - rate = 5 broker_limit = getattr(brokermod, '_rate_limit', float('inf')) if broker_limit < rate: rate = broker_limit log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") - stream2tickers = {} + tickers = list(tickers2qs.keys()) + async with brokermod.get_client() as client: + async with brokermod.quoter(client, tickers) as get_quotes: + # run a first quote smoke test filtering out any bad tickers + first_quotes_dict = await get_quotes(tickers) + valid_symbols = list(first_quotes_dict.keys())[:] - async def start_quoter(stream): - queue = StreamQueue(stream) # wrap in a shabby queue-like api - log.debug(f"Accepted new connection from {queue.peer}") + for ticker in set(tickers) - set(valid_symbols): + tickers2qs.pop(ticker) - # spawn request handler - async with trio.open_nursery() as nursery: - await nursery.start( - _handle_subs, queue, stream2tickers, nursery) - nursery.start_soon( - partial( - poll_tickers, client, brokermod.quoter, - stream2tickers[queue.peer], queue, rate=rate) - ) + # push intial quotes + q_payloads = {} + for symbol, quote in first_quotes_dict.items(): + if quote is None: + tickers2qs.pop(symbol) + continue + for queue in tickers2qs[symbol]: + q_payloads.setdefault(queue, {})[symbol] = quote - async with trio.open_nursery() as nursery: - listeners = await nursery.start( - partial(trio.serve_tcp, start_quoter, 1616, host='127.0.0.1') - ) - log.debug(f"Spawned {listeners}") + if q_payloads: + for queue, payload in q_payloads.items(): + await queue.put(payload) + + # assign valid symbol set + tickers = list(tickers2qs.keys()) + + while True: # use an event here to trigger exit? + prequote_start = time.time() + + with trio.move_on_after(3) as cancel_scope: + quotes = await get_quotes(valid_symbols) + + cancelled = cancel_scope.cancelled_caught + if cancelled: + log.warn("Quote query timed out after 3 seconds, retrying...") + # handle network outages by idling until response is received + quotes = await wait_for_network(partial(get_quotes, tickers)) + + postquote_start = time.time() + q_payloads = {} + for symbol, quote in quotes.items(): + # FIXME: None is returned if a symbol can't be found. + # Consider filtering out such symbols before starting poll loop + if quote is None: + continue + + if diff_cached: + # if cache is enabled then only deliver "new" changes + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + for queue in tickers2qs[symbol]: + q_payloads.setdefault(queue, {})[symbol] = quote + else: + for queue in tickers2qs[symbol]: + q_payloads[queue] = {symbol: quote} + + # deliver to each subscriber + if q_payloads: + for queue, payload in q_payloads.items(): + await queue.put(payload) + + req_time = round(postquote_start - prequote_start, 3) + proc_time = round(time.time() - postquote_start, 3) + tot = req_time + proc_time + log.debug(f"Request + processing took {tot}") + delay = sleeptime - tot + if delay <= 0: + log.warn( + f"Took {req_time} (request) + {proc_time} (processing) " + f"= {tot} secs (> {sleeptime}) for processing quotes?") + else: + log.debug(f"Sleeping for {delay}") + await trio.sleep(delay) + + +async def start_quoter(stream): + """Handle per-broker quote stream subscriptions. + """ + broker2tickersubs = {} + tickers2qs = {} + + queue = StreamQueue(stream) # wrap in a shabby queue-like api + log.debug(f"Accepted new connection from {queue.peer}") + async with trio.open_nursery() as nursery: + async with queue.stream: + async for (broker, tickers) in queue: + log.info( + f"{queue.peer} subscribed to {broker} for tickers {tickers}") + + if broker not in broker2tickersubs: # spawn quote streamer + tickers2qs = broker2tickersubs.setdefault(broker, {}) + brokermod = get_brokermod(broker) + log.info(f"Spawning quote streamer for broker {broker}") + # task should begin on the next checkpoint/iteration + nursery.start_soon(poll_tickers, brokermod, tickers2qs) + + # create map from each symbol to consuming client queues + for ticker in tickers: + tickers2qs.setdefault(ticker, set()).add(queue) + + # remove queue from any ticker subscriptions it no longer wants + for ticker in set(tickers2qs) - set(tickers): + tickers2qs[ticker].remove(queue) + else: + log.info(f"{queue.peer} was disconnected") + nursery.cancel_scope.cancel() + + +async def _daemon_main(brokermod): + """Entry point for the piker daemon. + """ + async with trio.open_nursery() as nursery: + listeners = await nursery.start( + partial(trio.serve_tcp, start_quoter, 1616, host='127.0.0.1') + ) + log.debug(f"Spawned {listeners}")