From d65bd78f5de83931ffcdfa0df947e9d95f758f78 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Apr 2018 02:03:22 -0400 Subject: [PATCH 01/23] Add a quote stream server task Add a daemon-server task for delivering subscription based quote streams via json serialized packets wrapped in a queue interface. --- piker/brokers/core.py | 98 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 96 insertions(+), 2 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index fac9d72c..e199ecd2 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -3,6 +3,7 @@ Core broker-daemon tasks and API. """ import time import inspect +import json from functools import partial import socket from types import ModuleType @@ -67,11 +68,55 @@ async def wait_for_network(get_quotes, sleep=1): await trio.sleep(sleep) +class Disconnect(trio.Cancelled): + "Stream was closed" + + +class StreamQueue: + """Stream wrapped as a queue that delivers json serialized "packets" + delimited by ``delim``. + """ + def __init__(self, stream, delim=b'\n'): + self.stream = stream + self._delim = delim + self.peer = stream.socket.getpeername() + + async def get(self): + """Get a packet from the underlying stream. + """ + delim = self._delim + buff = b'' + while True: + data = await self.stream.receive_some(2**10) + log.trace(f"Data is {data}") + if data == b'': + raise Disconnect("Stream connection was closed") + buff += data + if delim in buff: + try: + return json.loads(buff) + except json.decoder.JSONDecodeError: + log.exception("Failed to process JSON packet:") + continue + + async def put(self, data): + return await self.stream.send_all(json.dumps(data).encode() + b'\n') + + async def __aiter__(self): + return self + + async def __anext__(self): + try: + return await self.get() + except Disconnect: + raise StopAsyncIteration + + async def poll_tickers( client: 'Client', quoter: AsyncContextManager, tickers: [str], - q: trio.Queue, + queue: StreamQueue, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -85,6 +130,11 @@ async def poll_tickers( _cache = {} # ticker to quote caching async with quoter(client, tickers) as get_quotes: + # run a first quote smoke test filtering out any bad tickers + first_quotes_dict = await get_quotes(tickers) + # FIXME: oh god it's so hideous + tickers[:] = list(first_quotes_dict.keys())[:] + while True: # use an event here to trigger exit? prequote_start = time.time() @@ -118,7 +168,7 @@ async def poll_tickers( payload[symbol] = quote if payload: - q.put_nowait(payload) + await queue.put(payload) req_time = round(postquote_start - prequote_start, 3) proc_time = round(time.time() - postquote_start, 3) @@ -132,3 +182,47 @@ async def poll_tickers( else: log.debug(f"Sleeping for {delay}") await trio.sleep(delay) + + +async def _handle_subs( + queue, + stream2tickers, + nursery, + task_status=trio.TASK_STATUS_IGNORED +): + """Handle quote stream subscriptions. + """ + async with queue.stream: + async for tickers in queue: + task_status.started(tickers) + log.info(f"{queue.peer} subscribed for tickers {tickers}") + stream2tickers[queue.peer] = tickers + else: + log.info(f"{queue.peer} was disconnected") + nursery.cancel_scope.cancel() + + +async def _daemon_main(brokermod): + """Main entry point for the piker daemon. + """ + stream2tickers = {} + async with brokermod.get_client() as client: + + async def start_quoter(stream): + queue = StreamQueue(stream) # wrap in a shabby queue-like api + log.debug(f"Accepted new connection from {queue.peer}") + + # spawn request handler + async with trio.open_nursery() as nursery: + await nursery.start( + _handle_subs, queue, stream2tickers, nursery) + nursery.start_soon( + poll_tickers, client, brokermod.quoter, + stream2tickers[queue.peer], queue + ) + + async with trio.open_nursery() as nursery: + listeners = await nursery.start( + partial(trio.serve_tcp, start_quoter, 1616, host='127.0.0.1') + ) + log.debug(f"Spawned {listeners}") From 23ae71089fc04e868ed04e6ab99b5dd4d560c38f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Apr 2018 02:13:16 -0400 Subject: [PATCH 02/23] Handle dynamic symbol subscriptions in QT backend --- piker/brokers/questrade.py | 27 +++++++++++++++++++++------ 1 file changed, 21 insertions(+), 6 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index ee0886cb..f88f3f0b 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -288,12 +288,30 @@ async def get_client() -> Client: async def quoter(client: Client, tickers: [str]): """Quoter context. """ - t2ids = await client.tickers2ids(tickers) - ids = ','.join(map(str, t2ids.values())) + t2ids = {} + ids = '' + + def filter_symbols(quotes_dict): + nonlocal t2ids + for symbol, quote in quotes_dict.items(): + if quote['low52w'] is None: + log.warn( + f"{symbol} seems to be defunct discarding from tickers") + t2ids.pop(symbol) async def get_quote(tickers): """Query for quotes using cached symbol ids. """ + if not tickers: + return {} + nonlocal ids, t2ids + new, current = set(tickers), set(t2ids.keys()) + if new != current: + # update ticker ids cache + log.info(f"Tickers set changed {new - current}") + t2ids = await client.tickers2ids(tickers) + ids = ','.join(map(str, t2ids.values())) + try: quotes_resp = await client.api.quotes(ids=ids) except QuestradeError as qterr: @@ -315,10 +333,7 @@ async def quoter(client: Client, tickers: [str]): return quotes first_quotes_dict = await get_quote(tickers) - for symbol, quote in first_quotes_dict.items(): - if quote['low52w'] is None: - log.warn(f"{symbol} seems to be defunct discarding from tickers") - t2ids.pop(symbol) + filter_symbols(first_quotes_dict) # re-save symbol ids cache ids = ','.join(map(str, t2ids.values())) From 73ef95f42a086a274c5b9546e44eb6a61878aa2d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Apr 2018 02:13:59 -0400 Subject: [PATCH 03/23] Add `pikerd` entry point --- piker/cli.py | 12 ++++++++++++ setup.py | 1 + 2 files changed, 13 insertions(+) diff --git a/piker/cli.py b/piker/cli.py index 171a71d8..c6342eec 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -35,6 +35,18 @@ def run(main, loglevel='info'): log.debug("Exiting piker") +@click.command() +@click.option('--broker', '-b', default=DEFAULT_BROKER, + help='Broker backend to use') +@click.option('--loglevel', '-l', default='warning', help='Logging level') +def pikerd(broker, loglevel): + """Spawn the piker daemon. + """ + from piker.brokers.core import _daemon_main + brokermod = get_brokermod(broker) + run(partial(_daemon_main, brokermod), loglevel) + + @click.group() def cli(): pass diff --git a/setup.py b/setup.py index 07d86eb4..3f9f7416 100755 --- a/setup.py +++ b/setup.py @@ -31,6 +31,7 @@ setup( entry_points={ 'console_scripts': [ 'piker = piker.cli:cli', + 'pikerd = piker.cli:pikerd', ] }, install_requires=[ From 4898459bcdb3cfaecd209c2bc0a550b53386ee09 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Apr 2018 02:16:26 -0400 Subject: [PATCH 04/23] Make watchlist app retrieve quotes from the broker daemon --- piker/ui/watchlist.py | 132 +++++++++++++++++++++--------------------- 1 file changed, 65 insertions(+), 67 deletions(-) diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index 9f57d708..4f6f4d25 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -7,7 +7,6 @@ Launch with ``piker watch ``. """ from itertools import chain from types import ModuleType -from functools import partial import trio from kivy.uix.boxlayout import BoxLayout @@ -21,7 +20,6 @@ from kivy.core.window import Window from ..log import get_logger from .pager import PagerView -from ..brokers.core import poll_tickers log = get_logger('watchlist') @@ -49,7 +47,7 @@ _kv = (f''' #:kivy 1.10.0 - font_size: 18 + font_size: 20 # text_size: self.size size: self.texture_size color: {colorcode('gray')} @@ -386,77 +384,77 @@ async def _async_main(name, tickers, brokermod, rate): This is started with cli command `piker watch`. ''' - queue = trio.Queue(1000) + # setup ticker stream + from ..brokers.core import StreamQueue + queue = StreamQueue(await trio.open_tcp_stream('127.0.0.1', 1616)) + await queue.put(tickers) # initial request for symbols price streams + + # get initial symbol data async with brokermod.get_client() as client: - async with trio.open_nursery() as nursery: - # get long term data including last days close price - sd = await client.symbol_data(tickers) + # get long term data including last days close price + sd = await client.symbol_data(tickers) - nursery.start_soon( - partial(poll_tickers, client, brokermod.quoter, tickers, queue, - rate=rate) - ) + async with trio.open_nursery() as nursery: + # get first quotes response + quotes = await queue.get() + first_quotes = [ + brokermod.format_quote(quote, symbol_data=sd)[0] + for quote in quotes.values()] - # get first quotes response - quotes = await queue.get() - first_quotes = [ - brokermod.format_quote(quote, symbol_data=sd)[0] - for quote in quotes.values()] + if first_quotes[0].get('last') is None: + log.error("Broker API is down temporarily") + nursery.cancel_scope.cancel() + return - if first_quotes[0].get('last') is None: - log.error("Broker API is down temporarily") - nursery.cancel_scope.cancel() - return + # build out UI + Window.set_title(f"watchlist: {name}\t(press ? for help)") + Builder.load_string(_kv) + box = BoxLayout(orientation='vertical', padding=5, spacing=5) - # build out UI - Window.set_title(f"watchlist: {name}\t(press ? for help)") - Builder.load_string(_kv) - box = BoxLayout(orientation='vertical', padding=5, spacing=5) + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._bidasks - # define bid-ask "stacked" cells - # (TODO: needs some rethinking and renaming for sure) - bidasks = brokermod._bidasks + # add header row + headers = first_quotes[0].keys() + header = Row( + {key: key for key in headers}, + headers=headers, + bidasks=bidasks, + is_header_row=True, + size_hint=(1, None), + ) + box.add_widget(header) - # add header row - headers = first_quotes[0].keys() - header = Row( - {key: key for key in headers}, - headers=headers, - bidasks=bidasks, - is_header_row=True, - size_hint=(1, None), - ) - box.add_widget(header) + # build grid + grid = TickerTable( + cols=1, + size_hint=(1, None), + ) + for ticker_record in first_quotes: + grid.append_row(ticker_record, bidasks=bidasks) + # associate the col headers row with the ticker table even though + # they're technically wrapped separately in containing BoxLayout + header.table = grid - # build grid - grid = TickerTable( - cols=1, - size_hint=(1, None), - ) - for ticker_record in first_quotes: - grid.append_row(ticker_record, bidasks=bidasks) - # associate the col headers row with the ticker table even though - # they're technically wrapped separately in containing BoxLayout - header.table = grid + # mark the initial sorted column header as bold and underlined + sort_cell = header.get_cell(grid.sort_key) + sort_cell.bold = sort_cell.underline = True + grid.last_clicked_col_cell = sort_cell - # mark the initial sorted column header as bold and underlined - sort_cell = header.get_cell(grid.sort_key) - sort_cell.bold = sort_cell.underline = True - grid.last_clicked_col_cell = sort_cell + # set up a pager view for large ticker lists + grid.bind(minimum_height=grid.setter('height')) + pager = PagerView(box, grid, nursery) + box.add_widget(pager) - # set up a pager view for large ticker lists - grid.bind(minimum_height=grid.setter('height')) - pager = PagerView(box, grid, nursery) - box.add_widget(pager) - - widgets = { - # 'anchor': anchor, - 'root': box, - 'grid': grid, - 'box': box, - 'header': header, - 'pager': pager, - } - nursery.start_soon(run_kivy, widgets['root'], nursery) - nursery.start_soon( - update_quotes, brokermod, widgets, queue, sd, quotes) + widgets = { + # 'anchor': anchor, + 'root': box, + 'grid': grid, + 'box': box, + 'header': header, + 'pager': pager, + } + nursery.start_soon(run_kivy, widgets['root'], nursery) + nursery.start_soon( + update_quotes, brokermod, widgets, queue, sd, quotes) From f80735121ca28e026dc9d97bad63adae34556e55 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 16 Apr 2018 13:34:26 -0400 Subject: [PATCH 05/23] Use an async generator inside `StreamQueue` Async generators are faster and less code. Handle segmented packets which can happen during periods of high quote volume. Move per-broker rate limit logic into daemon task. --- piker/brokers/core.py | 52 ++++++++++++++++++++++++++++++------------- piker/cli.py | 6 ----- 2 files changed, 36 insertions(+), 22 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index e199ecd2..db3ec785 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -80,36 +80,49 @@ class StreamQueue: self.stream = stream self._delim = delim self.peer = stream.socket.getpeername() + self._agen = self._iter_packets() - async def get(self): + async def _iter_packets(self): """Get a packet from the underlying stream. """ delim = self._delim buff = b'' while True: - data = await self.stream.receive_some(2**10) + packets = [] + try: + data = await self.stream.receive_some(2**10) + except trio.BrokenStreamError as err: + log.debug("Stream connection was broken") + return + log.trace(f"Data is {data}") if data == b'': - raise Disconnect("Stream connection was closed") - buff += data - if delim in buff: + log.debug("Stream connection was closed") + return + + if buff: # last received packet was segmented + data = buff + data + + # if last packet has not fully arrived it will + # be a truncated byte-stream + packets = data.split(delim) + buff = packets.pop() + + for packet in packets: try: - return json.loads(buff) + yield json.loads(packet) except json.decoder.JSONDecodeError: - log.exception("Failed to process JSON packet:") + log.exception(f"Failed to process JSON packet: {buff}") continue async def put(self, data): return await self.stream.send_all(json.dumps(data).encode() + b'\n') - async def __aiter__(self): - return self + async def get(self): + return await self._agen.asend(None) - async def __anext__(self): - try: - return await self.get() - except Disconnect: - raise StopAsyncIteration + async def __aiter__(self): + return self._agen async def poll_tickers( @@ -205,6 +218,12 @@ async def _handle_subs( async def _daemon_main(brokermod): """Main entry point for the piker daemon. """ + rate = 5 + broker_limit = getattr(brokermod, '_rate_limit', float('inf')) + if broker_limit < rate: + rate = broker_limit + log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") + stream2tickers = {} async with brokermod.get_client() as client: @@ -217,8 +236,9 @@ async def _daemon_main(brokermod): await nursery.start( _handle_subs, queue, stream2tickers, nursery) nursery.start_soon( - poll_tickers, client, brokermod.quoter, - stream2tickers[queue.peer], queue + partial( + poll_tickers, client, brokermod.quoter, + stream2tickers[queue.peer], queue, rate=rate) ) async with trio.open_nursery() as nursery: diff --git a/piker/cli.py b/piker/cli.py index c6342eec..a8f9b924 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -134,12 +134,6 @@ def watch(loglevel, broker, rate, name): brokermod = get_brokermod(broker) watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) - broker_limit = getattr(brokermod, '_rate_limit', float('inf')) - - if broker_limit < rate: - rate = broker_limit - log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") - trio.run(_async_main, name, watchlists[name], brokermod, rate) From 6359623019397218e3cfec6a127c8d0221006813 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 17 Apr 2018 16:53:29 -0400 Subject: [PATCH 06/23] Allow broker specific subscriptions Allow client connections to subscribe for quote streams from specific brokers and spawn broker-client quoter tasks on-demand according to client connection demands. Support multiple subscribers to a single daemon process. --- piker/brokers/core.py | 219 +++++++++++++++++++++++------------------- 1 file changed, 120 insertions(+), 99 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index db3ec785..b70f3a23 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -12,6 +12,7 @@ from typing import AsyncContextManager import trio from ..log import get_logger +from . import get_brokermod log = get_logger('broker.core') @@ -126,10 +127,8 @@ class StreamQueue: async def poll_tickers( - client: 'Client', - quoter: AsyncContextManager, - tickers: [str], - queue: StreamQueue, + brokermod: ModuleType, + tickers2qs: {str: StreamQueue}, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -142,107 +141,129 @@ async def poll_tickers( sleeptime = round(1. / rate, 3) _cache = {} # ticker to quote caching - async with quoter(client, tickers) as get_quotes: - # run a first quote smoke test filtering out any bad tickers - first_quotes_dict = await get_quotes(tickers) - # FIXME: oh god it's so hideous - tickers[:] = list(first_quotes_dict.keys())[:] - - while True: # use an event here to trigger exit? - prequote_start = time.time() - - with trio.move_on_after(3) as cancel_scope: - quotes = await get_quotes(tickers) - - cancelled = cancel_scope.cancelled_caught - if cancelled: - log.warn("Quote query timed out after 3 seconds, retrying...") - # handle network outages by idling until response is received - quotes = await wait_for_network(partial(get_quotes, tickers)) - - postquote_start = time.time() - payload = {} - for symbol, quote in quotes.items(): - # FIXME: None is returned if a symbol can't be found. - # Consider filtering out such symbols before starting poll loop - if quote is None: - continue - - if diff_cached: - # if cache is enabled then only deliver "new" changes - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - payload[symbol] = quote - else: - payload[symbol] = quote - - if payload: - await queue.put(payload) - - req_time = round(postquote_start - prequote_start, 3) - proc_time = round(time.time() - postquote_start, 3) - tot = req_time + proc_time - log.debug(f"Request + processing took {tot}") - delay = sleeptime - tot - if delay <= 0: - log.warn( - f"Took {req_time} (request) + {proc_time} (processing) " - f"= {tot} secs (> {sleeptime}) for processing quotes?") - else: - log.debug(f"Sleeping for {delay}") - await trio.sleep(delay) - - -async def _handle_subs( - queue, - stream2tickers, - nursery, - task_status=trio.TASK_STATUS_IGNORED -): - """Handle quote stream subscriptions. - """ - async with queue.stream: - async for tickers in queue: - task_status.started(tickers) - log.info(f"{queue.peer} subscribed for tickers {tickers}") - stream2tickers[queue.peer] = tickers - else: - log.info(f"{queue.peer} was disconnected") - nursery.cancel_scope.cancel() - - -async def _daemon_main(brokermod): - """Main entry point for the piker daemon. - """ - rate = 5 broker_limit = getattr(brokermod, '_rate_limit', float('inf')) if broker_limit < rate: rate = broker_limit log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") - stream2tickers = {} + tickers = list(tickers2qs.keys()) + async with brokermod.get_client() as client: + async with brokermod.quoter(client, tickers) as get_quotes: + # run a first quote smoke test filtering out any bad tickers + first_quotes_dict = await get_quotes(tickers) + valid_symbols = list(first_quotes_dict.keys())[:] - async def start_quoter(stream): - queue = StreamQueue(stream) # wrap in a shabby queue-like api - log.debug(f"Accepted new connection from {queue.peer}") + for ticker in set(tickers) - set(valid_symbols): + tickers2qs.pop(ticker) - # spawn request handler - async with trio.open_nursery() as nursery: - await nursery.start( - _handle_subs, queue, stream2tickers, nursery) - nursery.start_soon( - partial( - poll_tickers, client, brokermod.quoter, - stream2tickers[queue.peer], queue, rate=rate) - ) + # push intial quotes + q_payloads = {} + for symbol, quote in first_quotes_dict.items(): + if quote is None: + tickers2qs.pop(symbol) + continue + for queue in tickers2qs[symbol]: + q_payloads.setdefault(queue, {})[symbol] = quote - async with trio.open_nursery() as nursery: - listeners = await nursery.start( - partial(trio.serve_tcp, start_quoter, 1616, host='127.0.0.1') - ) - log.debug(f"Spawned {listeners}") + if q_payloads: + for queue, payload in q_payloads.items(): + await queue.put(payload) + + # assign valid symbol set + tickers = list(tickers2qs.keys()) + + while True: # use an event here to trigger exit? + prequote_start = time.time() + + with trio.move_on_after(3) as cancel_scope: + quotes = await get_quotes(valid_symbols) + + cancelled = cancel_scope.cancelled_caught + if cancelled: + log.warn("Quote query timed out after 3 seconds, retrying...") + # handle network outages by idling until response is received + quotes = await wait_for_network(partial(get_quotes, tickers)) + + postquote_start = time.time() + q_payloads = {} + for symbol, quote in quotes.items(): + # FIXME: None is returned if a symbol can't be found. + # Consider filtering out such symbols before starting poll loop + if quote is None: + continue + + if diff_cached: + # if cache is enabled then only deliver "new" changes + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + for queue in tickers2qs[symbol]: + q_payloads.setdefault(queue, {})[symbol] = quote + else: + for queue in tickers2qs[symbol]: + q_payloads[queue] = {symbol: quote} + + # deliver to each subscriber + if q_payloads: + for queue, payload in q_payloads.items(): + await queue.put(payload) + + req_time = round(postquote_start - prequote_start, 3) + proc_time = round(time.time() - postquote_start, 3) + tot = req_time + proc_time + log.debug(f"Request + processing took {tot}") + delay = sleeptime - tot + if delay <= 0: + log.warn( + f"Took {req_time} (request) + {proc_time} (processing) " + f"= {tot} secs (> {sleeptime}) for processing quotes?") + else: + log.debug(f"Sleeping for {delay}") + await trio.sleep(delay) + + +async def start_quoter(stream): + """Handle per-broker quote stream subscriptions. + """ + broker2tickersubs = {} + tickers2qs = {} + + queue = StreamQueue(stream) # wrap in a shabby queue-like api + log.debug(f"Accepted new connection from {queue.peer}") + async with trio.open_nursery() as nursery: + async with queue.stream: + async for (broker, tickers) in queue: + log.info( + f"{queue.peer} subscribed to {broker} for tickers {tickers}") + + if broker not in broker2tickersubs: # spawn quote streamer + tickers2qs = broker2tickersubs.setdefault(broker, {}) + brokermod = get_brokermod(broker) + log.info(f"Spawning quote streamer for broker {broker}") + # task should begin on the next checkpoint/iteration + nursery.start_soon(poll_tickers, brokermod, tickers2qs) + + # create map from each symbol to consuming client queues + for ticker in tickers: + tickers2qs.setdefault(ticker, set()).add(queue) + + # remove queue from any ticker subscriptions it no longer wants + for ticker in set(tickers2qs) - set(tickers): + tickers2qs[ticker].remove(queue) + else: + log.info(f"{queue.peer} was disconnected") + nursery.cancel_scope.cancel() + + +async def _daemon_main(brokermod): + """Entry point for the piker daemon. + """ + async with trio.open_nursery() as nursery: + listeners = await nursery.start( + partial(trio.serve_tcp, start_quoter, 1616, host='127.0.0.1') + ) + log.debug(f"Spawned {listeners}") From 0c7ecd383bfeffa123def5e1715d56bb8f1fb0d0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 17 Apr 2018 17:17:08 -0400 Subject: [PATCH 07/23] Monkey patch broker mods with a name attr --- piker/brokers/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 8ae3f1dc..2e12b3ea 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -13,7 +13,10 @@ __brokers__ = [ def get_brokermod(brokername: str) -> ModuleType: """Return the imported broker module by name. """ - return import_module('.' + brokername, 'piker.brokers') + module = import_module('.' + brokername, 'piker.brokers') + # we only allows monkeys because it's for internal keying + module.name = module.__name__.split('.')[-1] + return module def iter_brokermods(): From a6dc6973270144af43b5a3f9e5eb6baca1b53df3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 17 Apr 2018 17:19:22 -0400 Subject: [PATCH 08/23] Move watchlist app to new daemon-socket api --- piker/ui/watchlist.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index 4f6f4d25..7ce726f1 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -316,9 +316,10 @@ class TickerTable(GridLayout): async def update_quotes( + nursery: 'Nursery', brokermod: ModuleType, widgets: dict, - queue: trio.Queue, + queue: 'StreamQueue', symbol_data: dict, first_quotes: dict ): @@ -358,9 +359,7 @@ async def update_quotes( grid.render_rows(cache) # core cell update loop - while True: - log.debug("Waiting on quotes") - quotes = await queue.get() # new quotes data only + async for quotes in queue: # new quotes data only for symbol, quote in quotes.items(): record, displayable = brokermod.format_quote( quote, symbol_data=symbol_data) @@ -370,7 +369,10 @@ async def update_quotes( color_row(row, record) grid.render_rows(cache) + log.debug("Waiting on quotes") + log.warn("Server connection dropped") + nursery.cancel_scope.cancel() async def run_kivy(root, nursery): '''Trio-kivy entry point. @@ -387,7 +389,7 @@ async def _async_main(name, tickers, brokermod, rate): # setup ticker stream from ..brokers.core import StreamQueue queue = StreamQueue(await trio.open_tcp_stream('127.0.0.1', 1616)) - await queue.put(tickers) # initial request for symbols price streams + await queue.put((brokermod.name, tickers)) # initial request for symbols price streams # get initial symbol data async with brokermod.get_client() as client: @@ -457,4 +459,4 @@ async def _async_main(name, tickers, brokermod, rate): } nursery.start_soon(run_kivy, widgets['root'], nursery) nursery.start_soon( - update_quotes, brokermod, widgets, queue, sd, quotes) + update_quotes, nursery, brokermod, widgets, queue, sd, quotes) From 02a71c51badb3f36977a7f52b7936680710de7e2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 Apr 2018 01:29:33 -0400 Subject: [PATCH 09/23] Make .quoter() a simple factory func --- piker/brokers/questrade.py | 5 ++--- piker/brokers/robinhood.py | 3 +-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index f88f3f0b..323cbab6 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -284,7 +284,6 @@ async def get_client() -> Client: write_conf(client) -@asynccontextmanager async def quoter(client: Client, tickers: [str]): """Quoter context. """ @@ -328,7 +327,7 @@ async def quoter(client: Client, tickers: [str]): quotes[quote['symbol']] = quote if quote.get('delay', 0) > 0: - log.warning(f"Delayed quote:\n{quote}") + log.warn(f"Delayed quote:\n{quote}") return quotes @@ -338,7 +337,7 @@ async def quoter(client: Client, tickers: [str]): # re-save symbol ids cache ids = ','.join(map(str, t2ids.values())) - yield get_quote + return get_quote # Questrade key conversion / column order diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py index 42ba2982..5a220251 100644 --- a/piker/brokers/robinhood.py +++ b/piker/brokers/robinhood.py @@ -72,11 +72,10 @@ async def get_client() -> Client: yield Client() -@asynccontextmanager async def quoter(client: Client, tickers: [str]): """Quoter context. """ - yield client.quote + return client.quote # Robinhood key conversion / column order From 030ecdcce8e45bbd4771ae92f6796313ce0b1ab4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 Apr 2018 01:30:22 -0400 Subject: [PATCH 10/23] Filter symbols and push initial quote in stream handler Filter out bad symbols by processing an initial batch quote and pushing to the subscribing client before spawning a quoter task. This also avoids exposing the quoter task to anything but the broker module and a `get_quotes()` routine. --- piker/brokers/core.py | 185 +++++++++++++++++++++++------------------- 1 file changed, 100 insertions(+), 85 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index b70f3a23..2dc1f809 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -7,7 +7,7 @@ import json from functools import partial import socket from types import ModuleType -from typing import AsyncContextManager +from typing import Coroutine import trio @@ -84,7 +84,7 @@ class StreamQueue: self._agen = self._iter_packets() async def _iter_packets(self): - """Get a packet from the underlying stream. + """Yield packets from the underlying stream. """ delim = self._delim buff = b'' @@ -128,6 +128,7 @@ class StreamQueue: async def poll_tickers( brokermod: ModuleType, + get_quotes: Coroutine, tickers2qs: {str: StreamQueue}, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue @@ -146,91 +147,68 @@ async def poll_tickers( rate = broker_limit log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") - tickers = list(tickers2qs.keys()) + while True: # use an event here to trigger exit? + prequote_start = time.time() - async with brokermod.get_client() as client: - async with brokermod.quoter(client, tickers) as get_quotes: - # run a first quote smoke test filtering out any bad tickers - first_quotes_dict = await get_quotes(tickers) - valid_symbols = list(first_quotes_dict.keys())[:] + tickers = list(tickers2qs.keys()) + with trio.move_on_after(3) as cancel_scope: + quotes = await get_quotes(tickers) - for ticker in set(tickers) - set(valid_symbols): - tickers2qs.pop(ticker) + cancelled = cancel_scope.cancelled_caught + if cancelled: + log.warn("Quote query timed out after 3 seconds, retrying...") + # handle network outages by idling until response is received + quotes = await wait_for_network(partial(get_quotes, tickers)) - # push intial quotes - q_payloads = {} - for symbol, quote in first_quotes_dict.items(): - if quote is None: - tickers2qs.pop(symbol) - continue + postquote_start = time.time() + q_payloads = {} + for symbol, quote in quotes.items(): + # FIXME: None is returned if a symbol can't be found. + # Consider filtering out such symbols before starting poll loop + if quote is None: + continue + + if diff_cached: + # if cache is enabled then only deliver "new" changes + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + for queue in tickers2qs[symbol]: + q_payloads.setdefault(queue, {})[symbol] = quote + else: for queue in tickers2qs[symbol]: q_payloads.setdefault(queue, {})[symbol] = quote - if q_payloads: - for queue, payload in q_payloads.items(): - await queue.put(payload) + # deliver to each subscriber + if q_payloads: + for queue, payload in q_payloads.items(): + await queue.put(payload) - # assign valid symbol set - tickers = list(tickers2qs.keys()) - - while True: # use an event here to trigger exit? - prequote_start = time.time() - - with trio.move_on_after(3) as cancel_scope: - quotes = await get_quotes(valid_symbols) - - cancelled = cancel_scope.cancelled_caught - if cancelled: - log.warn("Quote query timed out after 3 seconds, retrying...") - # handle network outages by idling until response is received - quotes = await wait_for_network(partial(get_quotes, tickers)) - - postquote_start = time.time() - q_payloads = {} - for symbol, quote in quotes.items(): - # FIXME: None is returned if a symbol can't be found. - # Consider filtering out such symbols before starting poll loop - if quote is None: - continue - - if diff_cached: - # if cache is enabled then only deliver "new" changes - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - for queue in tickers2qs[symbol]: - q_payloads.setdefault(queue, {})[symbol] = quote - else: - for queue in tickers2qs[symbol]: - q_payloads[queue] = {symbol: quote} - - # deliver to each subscriber - if q_payloads: - for queue, payload in q_payloads.items(): - await queue.put(payload) - - req_time = round(postquote_start - prequote_start, 3) - proc_time = round(time.time() - postquote_start, 3) - tot = req_time + proc_time - log.debug(f"Request + processing took {tot}") - delay = sleeptime - tot - if delay <= 0: - log.warn( - f"Took {req_time} (request) + {proc_time} (processing) " - f"= {tot} secs (> {sleeptime}) for processing quotes?") - else: - log.debug(f"Sleeping for {delay}") - await trio.sleep(delay) + req_time = round(postquote_start - prequote_start, 3) + proc_time = round(time.time() - postquote_start, 3) + tot = req_time + proc_time + log.debug(f"Request + processing took {tot}") + delay = sleeptime - tot + if delay <= 0: + log.warn( + f"Took {req_time} (request) + {proc_time} (processing) " + f"= {tot} secs (> {sleeptime}) for processing quotes?") + else: + log.debug(f"Sleeping for {delay}") + await trio.sleep(delay) async def start_quoter(stream): """Handle per-broker quote stream subscriptions. + + Spawns new quoter tasks for each broker backend on-demand. """ broker2tickersubs = {} tickers2qs = {} + clients = {} queue = StreamQueue(stream) # wrap in a shabby queue-like api log.debug(f"Accepted new connection from {queue.peer}") @@ -240,27 +218,64 @@ async def start_quoter(stream): log.info( f"{queue.peer} subscribed to {broker} for tickers {tickers}") - if broker not in broker2tickersubs: # spawn quote streamer - tickers2qs = broker2tickersubs.setdefault(broker, {}) + if broker not in broker2tickersubs: + tickers2qs = broker2tickersubs.setdefault( + broker, {}.fromkeys(tickers, {queue,})) brokermod = get_brokermod(broker) log.info(f"Spawning quote streamer for broker {broker}") + + # TODO: move to AsyncExitStack in 3.7 + client = await brokermod.get_client().__aenter__() + get_quotes = await brokermod.quoter(client, tickers) + else: + brokermod, client, get_quotes = clients[broker] + tickers2qs = broker2tickersubs[broker] + # update map from each symbol to requesting client's queue + for ticker in tickers: + tickers2qs.setdefault(ticker, set()).add(queue) + # remove stale ticker subscriptions + for ticker in set(tickers2qs) - set(tickers): + tickers2qs[ticker].remove(queue) + + # run a single quote filtering out any bad tickers + quotes = await get_quotes(tickers) + # pop any tickers that aren't returned in the first quote + for ticker in set(tickers) - set(quotes): + log.warn( + f"Symbol `{ticker}` not found by broker `{brokermod.name}`") + tickers2qs.pop(ticker) + + # pop any tickers that return "empty" quotes + payload = {} + for symbol, quote in quotes.items(): + if quote is None: + log.warn( + f"Symbol `{symbol}` not found by broker" + f" `{brokermod.name}`") + tickers2qs.pop(symbol, None) + continue + payload[symbol] = quote + + if broker not in clients: # no quoter task yet + clients[broker] = (brokermod, client, get_quotes) + # push initial quotes response for client initialization + await queue.put(payload) + # task should begin on the next checkpoint/iteration - nursery.start_soon(poll_tickers, brokermod, tickers2qs) - - # create map from each symbol to consuming client queues - for ticker in tickers: - tickers2qs.setdefault(ticker, set()).add(queue) - - # remove queue from any ticker subscriptions it no longer wants - for ticker in set(tickers2qs) - set(tickers): - tickers2qs[ticker].remove(queue) + log.info(f"Spawning quoter task for {brokermod.name}") + nursery.start_soon( + poll_tickers, brokermod, get_quotes, tickers2qs) else: log.info(f"{queue.peer} was disconnected") nursery.cancel_scope.cancel() + # TODO: move to AsyncExitStack in 3.7 + for _, client, _ in clients.values(): + await client.__aexit__() + async def _daemon_main(brokermod): - """Entry point for the piker daemon. + """Entry point for the broker daemon. """ async with trio.open_nursery() as nursery: listeners = await nursery.start( From dd5e1e7ea7ad9c4b1666f28a7823359c91f4231a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 Apr 2018 13:31:07 -0400 Subject: [PATCH 11/23] Doh, set sleeptime after adjusting the rate limit --- piker/brokers/core.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 2dc1f809..5a4c9140 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -139,14 +139,14 @@ async def poll_tickers( A broker-client ``quoter`` async context manager must be provided which returns an async quote function. """ - sleeptime = round(1. / rate, 3) - _cache = {} # ticker to quote caching - broker_limit = getattr(brokermod, '_rate_limit', float('inf')) if broker_limit < rate: rate = broker_limit log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") + sleeptime = round(1. / rate, 3) + _cache = {} # ticker to quote caching + while True: # use an event here to trigger exit? prequote_start = time.time() From 51b44cf236ad882c747b8e94d1fa7919c5cd3492 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 Apr 2018 13:56:56 -0400 Subject: [PATCH 12/23] Use msgpack for quote-packet serialization --- piker/brokers/core.py | 23 +++++++---------------- setup.py | 2 +- 2 files changed, 8 insertions(+), 17 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 5a4c9140..511c216c 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -3,12 +3,12 @@ Core broker-daemon tasks and API. """ import time import inspect -import json from functools import partial import socket from types import ModuleType from typing import Coroutine +import msgpack import trio from ..log import get_logger @@ -88,6 +88,7 @@ class StreamQueue: """ delim = self._delim buff = b'' + unpacker = msgpack.Unpacker(raw=False) while True: packets = [] try: @@ -101,23 +102,13 @@ class StreamQueue: log.debug("Stream connection was closed") return - if buff: # last received packet was segmented - data = buff + data - - # if last packet has not fully arrived it will - # be a truncated byte-stream - packets = data.split(delim) - buff = packets.pop() - - for packet in packets: - try: - yield json.loads(packet) - except json.decoder.JSONDecodeError: - log.exception(f"Failed to process JSON packet: {buff}") - continue + unpacker.feed(data) + for packet in unpacker: + yield packet async def put(self, data): - return await self.stream.send_all(json.dumps(data).encode() + b'\n') + return await self.stream.send_all( + msgpack.dumps(data, use_bin_type=True)) async def get(self): return await self._agen.asend(None) diff --git a/setup.py b/setup.py index 3f9f7416..3ba37c91 100755 --- a/setup.py +++ b/setup.py @@ -36,7 +36,7 @@ setup( }, install_requires=[ 'click', 'colorlog', 'trio', 'attrs', 'async_generator', - 'pygments', 'cython', 'asks', 'pandas', + 'pygments', 'cython', 'asks', 'pandas', 'msgpack', #'kivy', see requirement.txt; using a custom branch atm ], extras_require={ From 4d4c04cd11e1edcdff15896d4b6925dad3b4f614 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 18 Apr 2018 14:03:59 -0400 Subject: [PATCH 13/23] Document daemon usage --- README.rst | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/README.rst b/README.rst index 10d45005..04b71f4a 100644 --- a/README.rst +++ b/README.rst @@ -34,6 +34,15 @@ broker quote query ``rate`` with ``-r``:: piker watch indexes -l info -r 10 +It is also possible to run the broker-client micro service as a daemon:: + + pikerd -l info + +Then start the client app as normal:: + + piker watch indexes -l info + + .. _trio: https://github.com/python-trio/trio .. _pipenv: https://docs.pipenv.org/ From 17feb175357ab7d7875cfc939ed40b6d9b77eae2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 19 Apr 2018 00:17:36 -0400 Subject: [PATCH 14/23] Add a reliable `Client` API In order to start working toward a HA distributed architecture make apps use a `Client` type to talk to daemons. The `Client` provides fault-tolerance for connection failures such that the app will continue running until a connection to the original service can be made or the process is killed. This will make it easier to simply spawn up new daemon child processes when faults are detected. --- piker/brokers/core.py | 116 +++++++++++++++++++++++++++++++++--------- 1 file changed, 92 insertions(+), 24 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 511c216c..8b823808 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -6,7 +6,7 @@ import inspect from functools import partial import socket from types import ModuleType -from typing import Coroutine +from typing import Coroutine, Callable import msgpack import trio @@ -48,14 +48,14 @@ async def quote(brokermod: ModuleType, tickers: [str]) -> dict: return results -async def wait_for_network(get_quotes, sleep=1): +async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: """Wait until the network comes back up. """ down = False while True: try: with trio.move_on_after(1) as cancel_scope: - quotes = await get_quotes() + quotes = await net_func() if down: log.warn("Network is back up") return quotes @@ -69,35 +69,22 @@ async def wait_for_network(get_quotes, sleep=1): await trio.sleep(sleep) -class Disconnect(trio.Cancelled): - "Stream was closed" - - class StreamQueue: - """Stream wrapped as a queue that delivers json serialized "packets" - delimited by ``delim``. + """Stream wrapped as a queue that delivers ``msgpack`` serialized objects. """ - def __init__(self, stream, delim=b'\n'): + def __init__(self, stream): self.stream = stream - self._delim = delim self.peer = stream.socket.getpeername() self._agen = self._iter_packets() async def _iter_packets(self): """Yield packets from the underlying stream. """ - delim = self._delim - buff = b'' unpacker = msgpack.Unpacker(raw=False) while True: - packets = [] - try: - data = await self.stream.receive_some(2**10) - except trio.BrokenStreamError as err: - log.debug("Stream connection was broken") - return - + data = await self.stream.receive_some(2**10) log.trace(f"Data is {data}") + if data == b'': log.debug("Stream connection was closed") return @@ -117,7 +104,88 @@ class StreamQueue: return self._agen -async def poll_tickers( +class Client: + """The most basic client. + + Use this to talk to any micro-service daemon or other client(s) over a + TCP socket managed by ``trio``. + """ + def __init__( + self, sockaddr: tuple, + startup_seq: Coroutine, + auto_reconnect: bool = True, + ): + self._sockaddr = sockaddr + self._startup_seq = startup_seq + self._autorecon = auto_reconnect + self.stream = None + self.squeue = None + + async def connect(self, sockaddr: tuple = None, **kwargs): + sockaddr = sockaddr or self._sockaddr + stream = await trio.open_tcp_stream(*sockaddr, **kwargs) + self.squeue = StreamQueue(stream) + await self._startup_seq(self) + return stream + + async def send(self, item): + await self.squeue.put(item) + + async def recv(self): + try: + return await self.squeue.get() + except trio.BrokenStreamError as err: + if self._autorecon: + await self._reconnect() + return await self.recv() + + async def __aenter__(self): + await self.connect(self._sockaddr) + return self + + async def __aexit__(self, *args): + await self.squeue.stream.__aexit__() + self.stream = None + + async def _reconnect(self): + """Handle connection failures by polling until a reconnect can be + established. + """ + down = False + while True: + try: + with trio.move_on_after(3) as cancel_scope: + await self.connect() + cancelled = cancel_scope.cancelled_caught + if cancelled: + log.warn("Reconnect timed out after 3 seconds, retrying...") + continue + else: + log.warn("Stream connection re-established!") + break + except OSError: + if not down: + down = True + log.warn( + "Connection went down, waiting for re-establishment") + await trio.sleep(1) + + async def aiter_recv(self): + """Async iterate items from underlying stream. + """ + try: + async for item in self.squeue: + yield item + except trio.BrokenStreamError as err: + if not self._autorecon: + raise + if self._autorecon: # attempt reconnect + await self._reconnect() + async for item in self.aiter_recv(): + yield item + + +async def stream_quotes( brokermod: ModuleType, get_quotes: Coroutine, tickers2qs: {str: StreamQueue}, @@ -192,7 +260,7 @@ async def poll_tickers( await trio.sleep(delay) -async def start_quoter(stream): +async def start_quoter(stream: trio.SocketStream) -> None: """Handle per-broker quote stream subscriptions. Spawns new quoter tasks for each broker backend on-demand. @@ -255,7 +323,7 @@ async def start_quoter(stream): # task should begin on the next checkpoint/iteration log.info(f"Spawning quoter task for {brokermod.name}") nursery.start_soon( - poll_tickers, brokermod, get_quotes, tickers2qs) + stream_quotes, brokermod, get_quotes, tickers2qs) else: log.info(f"{queue.peer} was disconnected") nursery.cancel_scope.cancel() @@ -265,7 +333,7 @@ async def start_quoter(stream): await client.__aexit__() -async def _daemon_main(brokermod): +async def _daemon_main(brokermod: ModuleType) -> None: """Entry point for the broker daemon. """ async with trio.open_nursery() as nursery: From 412313975084718e5618a009dd19cfab55d8b9f1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 19 Apr 2018 00:27:04 -0400 Subject: [PATCH 15/23] Use `Client` in watchlist app --- piker/ui/watchlist.py | 135 ++++++++++++++++++++++-------------------- 1 file changed, 70 insertions(+), 65 deletions(-) diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index 7ce726f1..69e6e350 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -319,7 +319,7 @@ async def update_quotes( nursery: 'Nursery', brokermod: ModuleType, widgets: dict, - queue: 'StreamQueue', + client: 'Client', symbol_data: dict, first_quotes: dict ): @@ -359,7 +359,7 @@ async def update_quotes( grid.render_rows(cache) # core cell update loop - async for quotes in queue: # new quotes data only + async for quotes in client.aiter_recv(): # new quotes data only for symbol, quote in quotes.items(): record, displayable = brokermod.format_quote( quote, symbol_data=symbol_data) @@ -374,6 +374,7 @@ async def update_quotes( log.warn("Server connection dropped") nursery.cancel_scope.cancel() + async def run_kivy(root, nursery): '''Trio-kivy entry point. ''' @@ -387,76 +388,80 @@ async def _async_main(name, tickers, brokermod, rate): This is started with cli command `piker watch`. ''' # setup ticker stream - from ..brokers.core import StreamQueue - queue = StreamQueue(await trio.open_tcp_stream('127.0.0.1', 1616)) - await queue.put((brokermod.name, tickers)) # initial request for symbols price streams + from ..brokers.core import Client - # get initial symbol data - async with brokermod.get_client() as client: - # get long term data including last days close price - sd = await client.symbol_data(tickers) + async def subscribe(client): + # initial request for symbols price streams + await client.send((brokermod.name, tickers)) - async with trio.open_nursery() as nursery: - # get first quotes response - quotes = await queue.get() - first_quotes = [ - brokermod.format_quote(quote, symbol_data=sd)[0] - for quote in quotes.values()] + async with Client(('127.0.0.1', 1616), subscribe) as client: - if first_quotes[0].get('last') is None: - log.error("Broker API is down temporarily") - nursery.cancel_scope.cancel() - return + # get initial symbol data + async with brokermod.get_client() as bclient: + # get long term data including last days close price + sd = await bclient.symbol_data(tickers) - # build out UI - Window.set_title(f"watchlist: {name}\t(press ? for help)") - Builder.load_string(_kv) - box = BoxLayout(orientation='vertical', padding=5, spacing=5) + async with trio.open_nursery() as nursery: + # get first quotes response + quotes = await client.recv() + first_quotes = [ + brokermod.format_quote(quote, symbol_data=sd)[0] + for quote in quotes.values()] - # define bid-ask "stacked" cells - # (TODO: needs some rethinking and renaming for sure) - bidasks = brokermod._bidasks + if first_quotes[0].get('last') is None: + log.error("Broker API is down temporarily") + nursery.cancel_scope.cancel() + return - # add header row - headers = first_quotes[0].keys() - header = Row( - {key: key for key in headers}, - headers=headers, - bidasks=bidasks, - is_header_row=True, - size_hint=(1, None), - ) - box.add_widget(header) + # build out UI + Window.set_title(f"watchlist: {name}\t(press ? for help)") + Builder.load_string(_kv) + box = BoxLayout(orientation='vertical', padding=5, spacing=5) - # build grid - grid = TickerTable( - cols=1, - size_hint=(1, None), - ) - for ticker_record in first_quotes: - grid.append_row(ticker_record, bidasks=bidasks) - # associate the col headers row with the ticker table even though - # they're technically wrapped separately in containing BoxLayout - header.table = grid + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._bidasks - # mark the initial sorted column header as bold and underlined - sort_cell = header.get_cell(grid.sort_key) - sort_cell.bold = sort_cell.underline = True - grid.last_clicked_col_cell = sort_cell + # add header row + headers = first_quotes[0].keys() + header = Row( + {key: key for key in headers}, + headers=headers, + bidasks=bidasks, + is_header_row=True, + size_hint=(1, None), + ) + box.add_widget(header) - # set up a pager view for large ticker lists - grid.bind(minimum_height=grid.setter('height')) - pager = PagerView(box, grid, nursery) - box.add_widget(pager) + # build grid + grid = TickerTable( + cols=1, + size_hint=(1, None), + ) + for ticker_record in first_quotes: + grid.append_row(ticker_record, bidasks=bidasks) + # associate the col headers row with the ticker table even though + # they're technically wrapped separately in containing BoxLayout + header.table = grid - widgets = { - # 'anchor': anchor, - 'root': box, - 'grid': grid, - 'box': box, - 'header': header, - 'pager': pager, - } - nursery.start_soon(run_kivy, widgets['root'], nursery) - nursery.start_soon( - update_quotes, nursery, brokermod, widgets, queue, sd, quotes) + # mark the initial sorted column header as bold and underlined + sort_cell = header.get_cell(grid.sort_key) + sort_cell.bold = sort_cell.underline = True + grid.last_clicked_col_cell = sort_cell + + # set up a pager view for large ticker lists + grid.bind(minimum_height=grid.setter('height')) + pager = PagerView(box, grid, nursery) + box.add_widget(pager) + + widgets = { + # 'anchor': anchor, + 'root': box, + 'grid': grid, + 'box': box, + 'header': header, + 'pager': pager, + } + nursery.start_soon(run_kivy, widgets['root'], nursery) + nursery.start_soon( + update_quotes, nursery, brokermod, widgets, client, sd, quotes) From 90e8dd911c7043a0c0e33785d6b09ac951582f4a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 19 Apr 2018 01:29:14 -0400 Subject: [PATCH 16/23] Daemon main doesn't require brokermod anymore --- piker/brokers/core.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 8b823808..d3c6e6c0 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -333,8 +333,9 @@ async def start_quoter(stream: trio.SocketStream) -> None: await client.__aexit__() -async def _daemon_main(brokermod: ModuleType) -> None: - """Entry point for the broker daemon. +async def _daemon_main() -> None: + """Entry point for the broker daemon which waits for connections + before spawning micro-services. """ async with trio.open_nursery() as nursery: listeners = await nursery.start( From 2973b40946d165922312aa5f45818ef41801c5ba Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 19 Apr 2018 01:32:21 -0400 Subject: [PATCH 17/23] Allow wl app to spawn a broker daemon in a subprocess --- piker/cli.py | 69 ++++++++++++++++++---- piker/ui/watchlist.py | 131 ++++++++++++++++++++---------------------- 2 files changed, 118 insertions(+), 82 deletions(-) diff --git a/piker/cli.py b/piker/cli.py index a8f9b924..906a89c8 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -1,20 +1,23 @@ """ Console interface to broker client/daemons. """ +from collections import defaultdict from functools import partial from importlib import import_module -import os -from collections import defaultdict +from multiprocessing import Process import json +import os +import signal +import time import click -import trio import pandas as pd +import trio - -from .log import get_console_log, colorize_json, get_logger from . import watchlists as wl from .brokers import core, get_brokermod +from .brokers.core import _daemon_main +from .log import get_console_log, colorize_json, get_logger log = get_logger('cli') DEFAULT_BROKER = 'robinhood' @@ -36,15 +39,11 @@ def run(main, loglevel='info'): @click.command() -@click.option('--broker', '-b', default=DEFAULT_BROKER, - help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') -def pikerd(broker, loglevel): +def pikerd(loglevel): """Spawn the piker daemon. """ - from piker.brokers.core import _daemon_main - brokermod = get_brokermod(broker) - run(partial(_daemon_main, brokermod), loglevel) + run(_daemon_main, loglevel) @click.group() @@ -134,7 +133,53 @@ def watch(loglevel, broker, rate, name): brokermod = get_brokermod(broker) watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) - trio.run(_async_main, name, watchlists[name], brokermod, rate) + tickers = watchlists[name] + + # setup ticker stream + from .brokers.core import Client + + async def main(timeout=1): + async def subscribe(client): + # initial request for symbols price streams + await client.send((brokermod.name, tickers)) + + client = Client(('127.0.0.1', 1616), subscribe) + start = time.time() + while True: + try: + await client.connect() + break + except OSError as oserr: + log.info("Waiting on daemon to come up...") + await trio.sleep(0.1) + if time.time() - start > timeout: + raise + continue + + async with trio.open_nursery() as nursery: + nursery.start_soon( + _async_main, name, client, tickers, + brokermod, rate + ) + + try: + trio.run(main) + except OSError as oserr: + log.exception(oserr) + answer = input( + "\nWould you like to spawn a broker daemon locally? [Y/n]") + if answer is not 'n': + child = Process( + target=run, + args=(_daemon_main, loglevel), + daemon=True, + ) + child.daemon = True + child.start() + trio.run(main, 5) + # trio dies with a keyboard interrupt + os.kill(child.pid, signal.SIGINT) + child.join() @cli.group() diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index 69e6e350..be21dd85 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -382,86 +382,77 @@ async def run_kivy(root, nursery): nursery.cancel_scope.cancel() # cancel all other tasks that may be running -async def _async_main(name, tickers, brokermod, rate): +async def _async_main(name, client, tickers, brokermod, rate): '''Launch kivy app + all other related tasks. This is started with cli command `piker watch`. ''' - # setup ticker stream - from ..brokers.core import Client + # get initial symbol data + async with brokermod.get_client() as bclient: + # get long term data including last days close price + sd = await bclient.symbol_data(tickers) - async def subscribe(client): - # initial request for symbols price streams - await client.send((brokermod.name, tickers)) + async with trio.open_nursery() as nursery: + # get first quotes response + quotes = await client.recv() + first_quotes = [ + brokermod.format_quote(quote, symbol_data=sd)[0] + for quote in quotes.values()] - async with Client(('127.0.0.1', 1616), subscribe) as client: + if first_quotes[0].get('last') is None: + log.error("Broker API is down temporarily") + nursery.cancel_scope.cancel() + return - # get initial symbol data - async with brokermod.get_client() as bclient: - # get long term data including last days close price - sd = await bclient.symbol_data(tickers) + # build out UI + Window.set_title(f"watchlist: {name}\t(press ? for help)") + Builder.load_string(_kv) + box = BoxLayout(orientation='vertical', padding=5, spacing=5) - async with trio.open_nursery() as nursery: - # get first quotes response - quotes = await client.recv() - first_quotes = [ - brokermod.format_quote(quote, symbol_data=sd)[0] - for quote in quotes.values()] + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._bidasks - if first_quotes[0].get('last') is None: - log.error("Broker API is down temporarily") - nursery.cancel_scope.cancel() - return + # add header row + headers = first_quotes[0].keys() + header = Row( + {key: key for key in headers}, + headers=headers, + bidasks=bidasks, + is_header_row=True, + size_hint=(1, None), + ) + box.add_widget(header) - # build out UI - Window.set_title(f"watchlist: {name}\t(press ? for help)") - Builder.load_string(_kv) - box = BoxLayout(orientation='vertical', padding=5, spacing=5) + # build grid + grid = TickerTable( + cols=1, + size_hint=(1, None), + ) + for ticker_record in first_quotes: + grid.append_row(ticker_record, bidasks=bidasks) + # associate the col headers row with the ticker table even though + # they're technically wrapped separately in containing BoxLayout + header.table = grid - # define bid-ask "stacked" cells - # (TODO: needs some rethinking and renaming for sure) - bidasks = brokermod._bidasks + # mark the initial sorted column header as bold and underlined + sort_cell = header.get_cell(grid.sort_key) + sort_cell.bold = sort_cell.underline = True + grid.last_clicked_col_cell = sort_cell - # add header row - headers = first_quotes[0].keys() - header = Row( - {key: key for key in headers}, - headers=headers, - bidasks=bidasks, - is_header_row=True, - size_hint=(1, None), - ) - box.add_widget(header) + # set up a pager view for large ticker lists + grid.bind(minimum_height=grid.setter('height')) + pager = PagerView(box, grid, nursery) + box.add_widget(pager) - # build grid - grid = TickerTable( - cols=1, - size_hint=(1, None), - ) - for ticker_record in first_quotes: - grid.append_row(ticker_record, bidasks=bidasks) - # associate the col headers row with the ticker table even though - # they're technically wrapped separately in containing BoxLayout - header.table = grid - - # mark the initial sorted column header as bold and underlined - sort_cell = header.get_cell(grid.sort_key) - sort_cell.bold = sort_cell.underline = True - grid.last_clicked_col_cell = sort_cell - - # set up a pager view for large ticker lists - grid.bind(minimum_height=grid.setter('height')) - pager = PagerView(box, grid, nursery) - box.add_widget(pager) - - widgets = { - # 'anchor': anchor, - 'root': box, - 'grid': grid, - 'box': box, - 'header': header, - 'pager': pager, - } - nursery.start_soon(run_kivy, widgets['root'], nursery) - nursery.start_soon( - update_quotes, nursery, brokermod, widgets, client, sd, quotes) + widgets = { + # 'anchor': anchor, + 'root': box, + 'grid': grid, + 'box': box, + 'header': header, + 'pager': pager, + } + nursery.start_soon(run_kivy, widgets['root'], nursery) + nursery.start_soon( + update_quotes, nursery, brokermod, widgets, client, sd, quotes) From 0add443e8bab84a998f04f1b7799bda9a2b11bdf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 20 Apr 2018 00:49:34 -0400 Subject: [PATCH 18/23] Spawn broker-daemon without asking --- piker/cli.py | 35 ++++++++++++++++++----------------- 1 file changed, 18 insertions(+), 17 deletions(-) diff --git a/piker/cli.py b/piker/cli.py index 906a89c8..79bbcb61 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -126,7 +126,7 @@ def quote(loglevel, broker, tickers, df_output): @click.option('--rate', '-r', default=5, help='Logging level') @click.argument('name', nargs=1, required=True) def watch(loglevel, broker, rate, name): - """Spawn a watchlist. + """Spawn a real-time watchlist. """ from .ui.watchlist import _async_main log = get_console_log(loglevel) # activate console logging @@ -145,12 +145,15 @@ def watch(loglevel, broker, rate, name): client = Client(('127.0.0.1', 1616), subscribe) start = time.time() + down = False while True: try: await client.connect() break except OSError as oserr: - log.info("Waiting on daemon to come up...") + if not down: + log.info("Waiting on daemon to come up...") + down = True await trio.sleep(0.1) if time.time() - start > timeout: raise @@ -162,24 +165,22 @@ def watch(loglevel, broker, rate, name): brokermod, rate ) + # signal exit of stream handler task + await client.aclose() + try: trio.run(main) except OSError as oserr: - log.exception(oserr) - answer = input( - "\nWould you like to spawn a broker daemon locally? [Y/n]") - if answer is not 'n': - child = Process( - target=run, - args=(_daemon_main, loglevel), - daemon=True, - ) - child.daemon = True - child.start() - trio.run(main, 5) - # trio dies with a keyboard interrupt - os.kill(child.pid, signal.SIGINT) - child.join() + log.error(oserr) + log.info("Spawning local broker-daemon...") + child = Process( + target=run, + args=(_daemon_main, loglevel), + daemon=True, + ) + child.start() + trio.run(main, 5) + child.join() @cli.group() From 4f387ea2bee8a3e7895d692c942dc126ee9f5d09 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 20 Apr 2018 01:06:14 -0400 Subject: [PATCH 19/23] Fix subscriptions and connection handling Oh boy where to start. - Handle broken streams in the `StreamQueue` gracefully; terminate the async generator. - When a stream queue connection is unwritable discard its subscriptions inside the quoter task - If all subscriptions are discarded for a broker then tear down its quoter task - Use listener parent nursery for spawning quoter tasks - Make broker subs data structures global/shared between conn handler tasks - Register the `tickers2qs` entry *after* instantiating broker client(s) (avoids race condition when mulitple client connections are coming online simultaneously) - Push smoke quotes to every client not just the first that connects - Track quoter tasks in a cross-task set - Handle unsubscriptions more correctly --- piker/brokers/core.py | 195 +++++++++++++++++++++++++++--------------- 1 file changed, 124 insertions(+), 71 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index d3c6e6c0..7cb16002 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -82,8 +82,12 @@ class StreamQueue: """ unpacker = msgpack.Unpacker(raw=False) while True: - data = await self.stream.receive_some(2**10) - log.trace(f"Data is {data}") + try: + data = await self.stream.receive_some(2**10) + log.trace(f"Data is {data}") + except trio.BrokenStreamError: + log.error(f"Stream connection {self.peer} broke") + return if data == b'': log.debug("Stream connection was closed") @@ -118,7 +122,6 @@ class Client: self._sockaddr = sockaddr self._startup_seq = startup_seq self._autorecon = auto_reconnect - self.stream = None self.squeue = None async def connect(self, sockaddr: tuple = None, **kwargs): @@ -139,13 +142,15 @@ class Client: await self._reconnect() return await self.recv() + async def aclose(self, *args): + await self.squeue.stream.aclose() + async def __aenter__(self): await self.connect(self._sockaddr) return self async def __aexit__(self, *args): - await self.squeue.stream.__aexit__() - self.stream = None + await self.aclose(*args) async def _reconnect(self): """Handle connection failures by polling until a reconnect can be @@ -163,11 +168,12 @@ class Client: else: log.warn("Stream connection re-established!") break - except OSError: + except (OSError, ConnectionRefusedError): if not down: down = True log.warn( - "Connection went down, waiting for re-establishment") + f"Connection to {self._sockaddr} went down, waiting" + " for re-establishment") await trio.sleep(1) async def aiter_recv(self): @@ -244,7 +250,16 @@ async def stream_quotes( # deliver to each subscriber if q_payloads: for queue, payload in q_payloads.items(): - await queue.put(payload) + try: + await queue.put(payload) + except ( + # That's right, anything you can think of... + trio.ClosedStreamError, ConnectionResetError, + ConnectionRefusedError, + ): + log.warn(f"{queue.peer} went down?") + for qset in tickers2qs.values(): + qset.discard(queue) req_time = round(postquote_start - prequote_start, 3) proc_time = round(time.time() - postquote_start, 3) @@ -259,86 +274,124 @@ async def stream_quotes( log.debug(f"Sleeping for {delay}") await trio.sleep(delay) + if not any(tickers2qs.values()): + log.warn( + f"No subscriptions left, tearing down {brokermod.name} daemon") + break -async def start_quoter(stream: trio.SocketStream) -> None: + +async def start_quoter( + broker2tickersubs: dict, + clients: dict, + nursery: "Nusery", + stream: trio.SocketStream, +) -> None: """Handle per-broker quote stream subscriptions. Spawns new quoter tasks for each broker backend on-demand. + Since most brokers seems to support batch quote requests we + limit to one task per process for now. """ - broker2tickersubs = {} - tickers2qs = {} - clients = {} - + daemons = set() queue = StreamQueue(stream) # wrap in a shabby queue-like api - log.debug(f"Accepted new connection from {queue.peer}") - async with trio.open_nursery() as nursery: - async with queue.stream: - async for (broker, tickers) in queue: - log.info( - f"{queue.peer} subscribed to {broker} for tickers {tickers}") + log.info(f"Accepted new connection from {queue.peer}") + async with queue.stream: + async for broker, tickers in queue: + log.info( + f"{queue.peer} subscribed to {broker} for tickers {tickers}") - if broker not in broker2tickersubs: - tickers2qs = broker2tickersubs.setdefault( - broker, {}.fromkeys(tickers, {queue,})) - brokermod = get_brokermod(broker) - log.info(f"Spawning quote streamer for broker {broker}") + if broker not in broker2tickersubs: + brokermod = get_brokermod(broker) + log.info(f"Spawning quote streamer for broker {broker}") - # TODO: move to AsyncExitStack in 3.7 - client = await brokermod.get_client().__aenter__() - get_quotes = await brokermod.quoter(client, tickers) - else: - brokermod, client, get_quotes = clients[broker] - tickers2qs = broker2tickersubs[broker] - # update map from each symbol to requesting client's queue - for ticker in tickers: - tickers2qs.setdefault(ticker, set()).add(queue) - # remove stale ticker subscriptions - for ticker in set(tickers2qs) - set(tickers): - tickers2qs[ticker].remove(queue) - - # run a single quote filtering out any bad tickers - quotes = await get_quotes(tickers) - # pop any tickers that aren't returned in the first quote - for ticker in set(tickers) - set(quotes): - log.warn( - f"Symbol `{ticker}` not found by broker `{brokermod.name}`") - tickers2qs.pop(ticker) - - # pop any tickers that return "empty" quotes - payload = {} - for symbol, quote in quotes.items(): - if quote is None: - log.warn( - f"Symbol `{symbol}` not found by broker" - f" `{brokermod.name}`") - tickers2qs.pop(symbol, None) - continue - payload[symbol] = quote - - if broker not in clients: # no quoter task yet - clients[broker] = (brokermod, client, get_quotes) - # push initial quotes response for client initialization - await queue.put(payload) - - # task should begin on the next checkpoint/iteration - log.info(f"Spawning quoter task for {brokermod.name}") - nursery.start_soon( - stream_quotes, brokermod, get_quotes, tickers2qs) + # TODO: move to AsyncExitStack in 3.7 + client_cntxmng = brokermod.get_client() + client = await client_cntxmng.__aenter__() + get_quotes = await brokermod.quoter(client, tickers) + clients[broker] = ( + brokermod, client, client_cntxmng, get_quotes) + tickers2qs = broker2tickersubs.setdefault( + broker, {}.fromkeys(tickers, {queue,})) else: - log.info(f"{queue.peer} was disconnected") - nursery.cancel_scope.cancel() + log.info(f"Subscribing with existing `{broker}` daemon") + brokermod, client, _, get_quotes = clients[broker] + tickers2qs = broker2tickersubs[broker] + # update map from each symbol to requesting new client's queue + for ticker in tickers: + tickers2qs.setdefault(ticker, set()).add(queue) - # TODO: move to AsyncExitStack in 3.7 - for _, client, _ in clients.values(): - await client.__aexit__() + # beginning of section to be trimmed out with #37 + ################################################# + # get a single quote filtering out any bad tickers + # NOTE: this code is always run for every new client + # subscription even when a broker quoter task is already running + # since the new client needs to know what symbols are accepted + log.warn(f"Retrieving smoke quote for {queue.peer}") + quotes = await get_quotes(tickers) + # pop any tickers that aren't returned in the first quote + tickers = set(tickers) - set(quotes) + for ticker in tickers: + log.warn( + f"Symbol `{ticker}` not found by broker `{brokermod.name}`") + tickers2qs.pop(ticker, None) + + # pop any tickers that return "empty" quotes + payload = {} + for symbol, quote in quotes.items(): + if quote is None: + log.warn( + f"Symbol `{symbol}` not found by broker" + f" `{brokermod.name}`") + tickers2qs.pop(symbol, None) + continue + payload[symbol] = quote + + # push initial quotes response for client initialization + await queue.put(payload) + + # end of section to be trimmed out with #37 + ########################################### + + if broker not in daemons: # no quoter task yet + # task should begin on the next checkpoint/iteration + log.info(f"Spawning quoter task for {brokermod.name}") + nursery.start_soon( + stream_quotes, brokermod, get_quotes, tickers2qs) + daemons.add(broker) + + log.debug("Waiting on subscription request") + else: + log.info(f"client @ {queue.peer} disconnected") + # drop any lingering subscriptions + for ticker, qset in tickers2qs.items(): + qset.discard(queue) + + # if there are no more subscriptions with this broker + # drop from broker subs dict + if not any(tickers2qs.values()): + log.info(f"No more subscriptions for {broker}") + broker2tickersubs.pop(broker) + + # TODO: move to AsyncExitStack in 3.7 + for _, _, cntxmng, _ in clients.values(): + # FIXME: yes I know it's totally wrong... + await cntxmng.__aexit__(None, None, None) async def _daemon_main() -> None: """Entry point for the broker daemon which waits for connections before spawning micro-services. """ + # global space for broker-daemon subscriptions + broker2tickersubs = {} + clients = {} + async with trio.open_nursery() as nursery: listeners = await nursery.start( - partial(trio.serve_tcp, start_quoter, 1616, host='127.0.0.1') + partial( + trio.serve_tcp, + partial(start_quoter, broker2tickersubs, clients, nursery), + 1616, host='127.0.0.1' + ) ) log.debug(f"Spawned {listeners}") From 063dfad5b4fdc8ed6c832818d368a78ff2073531 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 20 Apr 2018 11:40:45 -0400 Subject: [PATCH 20/23] Make daemon registry cross-task --- piker/brokers/core.py | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 7cb16002..6797ead4 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -215,6 +215,10 @@ async def stream_quotes( while True: # use an event here to trigger exit? prequote_start = time.time() + if not any(tickers2qs.values()): + log.warn(f"No subs left for broker {brokermod.name}, exiting task") + break + tickers = list(tickers2qs.keys()) with trio.move_on_after(3) as cancel_scope: quotes = await get_quotes(tickers) @@ -274,15 +278,11 @@ async def stream_quotes( log.debug(f"Sleeping for {delay}") await trio.sleep(delay) - if not any(tickers2qs.values()): - log.warn( - f"No subscriptions left, tearing down {brokermod.name} daemon") - break - async def start_quoter( broker2tickersubs: dict, clients: dict, + dtasks: set, # daemon task registry nursery: "Nusery", stream: trio.SocketStream, ) -> None: @@ -292,7 +292,6 @@ async def start_quoter( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ - daemons = set() queue = StreamQueue(stream) # wrap in a shabby queue-like api log.info(f"Accepted new connection from {queue.peer}") async with queue.stream: @@ -352,12 +351,12 @@ async def start_quoter( # end of section to be trimmed out with #37 ########################################### - if broker not in daemons: # no quoter task yet + if broker not in dtasks: # no quoter task yet # task should begin on the next checkpoint/iteration log.info(f"Spawning quoter task for {brokermod.name}") nursery.start_soon( stream_quotes, brokermod, get_quotes, tickers2qs) - daemons.add(broker) + dtasks.add(broker) log.debug("Waiting on subscription request") else: @@ -370,7 +369,8 @@ async def start_quoter( # drop from broker subs dict if not any(tickers2qs.values()): log.info(f"No more subscriptions for {broker}") - broker2tickersubs.pop(broker) + broker2tickersubs.pop(broker, None) + dtasks.discard(broker) # TODO: move to AsyncExitStack in 3.7 for _, _, cntxmng, _ in clients.values(): @@ -385,12 +385,16 @@ async def _daemon_main() -> None: # global space for broker-daemon subscriptions broker2tickersubs = {} clients = {} + dtasks = set() async with trio.open_nursery() as nursery: listeners = await nursery.start( partial( trio.serve_tcp, - partial(start_quoter, broker2tickersubs, clients, nursery), + partial( + start_quoter, broker2tickersubs, clients, + dtasks, nursery + ), 1616, host='127.0.0.1' ) ) From 6a6f773477b6c19e0bbd17d5a097df5363edaae0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 20 Apr 2018 11:41:23 -0400 Subject: [PATCH 21/23] Adjust some log levels --- piker/brokers/questrade.py | 2 +- piker/cli.py | 2 +- piker/ui/watchlist.py | 1 + 3 files changed, 3 insertions(+), 2 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 323cbab6..3f0ebbd7 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -307,7 +307,7 @@ async def quoter(client: Client, tickers: [str]): new, current = set(tickers), set(t2ids.keys()) if new != current: # update ticker ids cache - log.info(f"Tickers set changed {new - current}") + log.debug(f"Tickers set changed {new - current}") t2ids = await client.tickers2ids(tickers) ids = ','.join(map(str, t2ids.values())) diff --git a/piker/cli.py b/piker/cli.py index 79bbcb61..a383eb35 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -171,7 +171,7 @@ def watch(loglevel, broker, rate, name): try: trio.run(main) except OSError as oserr: - log.error(oserr) + log.warn(oserr) log.info("Spawning local broker-daemon...") child = Process( target=run, diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index be21dd85..a0944ffb 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -394,6 +394,7 @@ async def _async_main(name, client, tickers, brokermod, rate): async with trio.open_nursery() as nursery: # get first quotes response + log.debug("Waiting on first quote...") quotes = await client.recv() first_quotes = [ brokermod.format_quote(quote, symbol_data=sd)[0] From a2c4f0c80bc088e4619da270c5dd709a9f0be532 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 22 Apr 2018 12:48:35 -0400 Subject: [PATCH 22/23] Don't recurse in Client.aiter_recv() --- piker/brokers/core.py | 46 +++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 24 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 6797ead4..6d62d14a 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -119,13 +119,13 @@ class Client: startup_seq: Coroutine, auto_reconnect: bool = True, ): - self._sockaddr = sockaddr + self.sockaddr = sockaddr self._startup_seq = startup_seq self._autorecon = auto_reconnect self.squeue = None async def connect(self, sockaddr: tuple = None, **kwargs): - sockaddr = sockaddr or self._sockaddr + sockaddr = sockaddr or self.sockaddr stream = await trio.open_tcp_stream(*sockaddr, **kwargs) self.squeue = StreamQueue(stream) await self._startup_seq(self) @@ -146,7 +146,7 @@ class Client: await self.squeue.stream.aclose() async def __aenter__(self): - await self.connect(self._sockaddr) + await self.connect(self.sockaddr) return self async def __aexit__(self, *args): @@ -163,7 +163,8 @@ class Client: await self.connect() cancelled = cancel_scope.cancelled_caught if cancelled: - log.warn("Reconnect timed out after 3 seconds, retrying...") + log.warn( + "Reconnect timed out after 3 seconds, retrying...") continue else: log.warn("Stream connection re-established!") @@ -172,23 +173,25 @@ class Client: if not down: down = True log.warn( - f"Connection to {self._sockaddr} went down, waiting" + f"Connection to {self.sockaddr} went down, waiting" " for re-establishment") await trio.sleep(1) async def aiter_recv(self): """Async iterate items from underlying stream. """ - try: - async for item in self.squeue: - yield item - except trio.BrokenStreamError as err: - if not self._autorecon: - raise - if self._autorecon: # attempt reconnect - await self._reconnect() - async for item in self.aiter_recv(): - yield item + while True: + try: + async for item in self.squeue: + yield item + except trio.BrokenStreamError as err: + if not self._autorecon: + raise + if self._autorecon: # attempt reconnect + await self._reconnect() + continue + else: + return async def stream_quotes( @@ -232,11 +235,6 @@ async def stream_quotes( postquote_start = time.time() q_payloads = {} for symbol, quote in quotes.items(): - # FIXME: None is returned if a symbol can't be found. - # Consider filtering out such symbols before starting poll loop - if quote is None: - continue - if diff_cached: # if cache is enabled then only deliver "new" changes last = _cache.setdefault(symbol, {}) @@ -301,7 +299,6 @@ async def start_quoter( if broker not in broker2tickersubs: brokermod = get_brokermod(broker) - log.info(f"Spawning quote streamer for broker {broker}") # TODO: move to AsyncExitStack in 3.7 client_cntxmng = brokermod.get_client() @@ -310,12 +307,12 @@ async def start_quoter( clients[broker] = ( brokermod, client, client_cntxmng, get_quotes) tickers2qs = broker2tickersubs.setdefault( - broker, {}.fromkeys(tickers, {queue,})) + broker, {}.fromkeys(tickers, {queue, })) else: log.info(f"Subscribing with existing `{broker}` daemon") brokermod, client, _, get_quotes = clients[broker] tickers2qs = broker2tickersubs[broker] - # update map from each symbol to requesting new client's queue + # update map from each symbol to requesting client's queue for ticker in tickers: tickers2qs.setdefault(ticker, set()).add(queue) @@ -331,7 +328,8 @@ async def start_quoter( tickers = set(tickers) - set(quotes) for ticker in tickers: log.warn( - f"Symbol `{ticker}` not found by broker `{brokermod.name}`") + f"Symbol `{ticker}` not found by broker `{brokermod.name}`" + ) tickers2qs.pop(ticker, None) # pop any tickers that return "empty" quotes From 482f9531caf3ab43045db46b9f69c21064a3de3c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 22 Apr 2018 13:27:41 -0400 Subject: [PATCH 23/23] Try to connect to daemon once on startup; don't poll --- piker/cli.py | 33 ++++++++++----------------------- 1 file changed, 10 insertions(+), 23 deletions(-) diff --git a/piker/cli.py b/piker/cli.py index a383eb35..c971ed3d 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -1,14 +1,10 @@ """ Console interface to broker client/daemons. """ -from collections import defaultdict from functools import partial -from importlib import import_module from multiprocessing import Process import json import os -import signal -import time import click import pandas as pd @@ -16,7 +12,7 @@ import trio from . import watchlists as wl from .brokers import core, get_brokermod -from .brokers.core import _daemon_main +from .brokers.core import _daemon_main, Client from .log import get_console_log, colorize_json, get_logger log = get_logger('cli') @@ -135,29 +131,19 @@ def watch(loglevel, broker, rate, name): watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) tickers = watchlists[name] - # setup ticker stream - from .brokers.core import Client - async def main(timeout=1): + async def subscribe(client): # initial request for symbols price streams await client.send((brokermod.name, tickers)) client = Client(('127.0.0.1', 1616), subscribe) - start = time.time() - down = False - while True: - try: - await client.connect() - break - except OSError as oserr: - if not down: - log.info("Waiting on daemon to come up...") - down = True - await trio.sleep(0.1) - if time.time() - start > timeout: - raise - continue + try: + await client.connect() + except OSError as oserr: + await trio.sleep(0.5) + # will raise indicating child proc should be spawned + await client.connect() async with trio.open_nursery() as nursery: nursery.start_soon( @@ -171,8 +157,9 @@ def watch(loglevel, broker, rate, name): try: trio.run(main) except OSError as oserr: + log.warn("No broker daemon could be found") log.warn(oserr) - log.info("Spawning local broker-daemon...") + log.warning("Spawning local broker-daemon...") child = Process( target=run, args=(_daemon_main, loglevel),