diff --git a/README.rst b/README.rst index 10d45005..04b71f4a 100644 --- a/README.rst +++ b/README.rst @@ -34,6 +34,15 @@ broker quote query ``rate`` with ``-r``:: piker watch indexes -l info -r 10 +It is also possible to run the broker-client micro service as a daemon:: + + pikerd -l info + +Then start the client app as normal:: + + piker watch indexes -l info + + .. _trio: https://github.com/python-trio/trio .. _pipenv: https://docs.pipenv.org/ diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 8ae3f1dc..2e12b3ea 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -13,7 +13,10 @@ __brokers__ = [ def get_brokermod(brokername: str) -> ModuleType: """Return the imported broker module by name. """ - return import_module('.' + brokername, 'piker.brokers') + module = import_module('.' + brokername, 'piker.brokers') + # we only allows monkeys because it's for internal keying + module.name = module.__name__.split('.')[-1] + return module def iter_brokermods(): diff --git a/piker/brokers/core.py b/piker/brokers/core.py index fac9d72c..6d62d14a 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -6,11 +6,13 @@ import inspect from functools import partial import socket from types import ModuleType -from typing import AsyncContextManager +from typing import Coroutine, Callable +import msgpack import trio from ..log import get_logger +from . import get_brokermod log = get_logger('broker.core') @@ -46,14 +48,14 @@ async def quote(brokermod: ModuleType, tickers: [str]) -> dict: return results -async def wait_for_network(get_quotes, sleep=1): +async def wait_for_network(net_func: Callable, sleep: int = 1) -> dict: """Wait until the network comes back up. """ down = False while True: try: with trio.move_on_after(1) as cancel_scope: - quotes = await get_quotes() + quotes = await net_func() if down: log.warn("Network is back up") return quotes @@ -67,11 +69,135 @@ async def wait_for_network(get_quotes, sleep=1): await trio.sleep(sleep) -async def poll_tickers( - client: 'Client', - quoter: AsyncContextManager, - tickers: [str], - q: trio.Queue, +class StreamQueue: + """Stream wrapped as a queue that delivers ``msgpack`` serialized objects. + """ + def __init__(self, stream): + self.stream = stream + self.peer = stream.socket.getpeername() + self._agen = self._iter_packets() + + async def _iter_packets(self): + """Yield packets from the underlying stream. + """ + unpacker = msgpack.Unpacker(raw=False) + while True: + try: + data = await self.stream.receive_some(2**10) + log.trace(f"Data is {data}") + except trio.BrokenStreamError: + log.error(f"Stream connection {self.peer} broke") + return + + if data == b'': + log.debug("Stream connection was closed") + return + + unpacker.feed(data) + for packet in unpacker: + yield packet + + async def put(self, data): + return await self.stream.send_all( + msgpack.dumps(data, use_bin_type=True)) + + async def get(self): + return await self._agen.asend(None) + + async def __aiter__(self): + return self._agen + + +class Client: + """The most basic client. + + Use this to talk to any micro-service daemon or other client(s) over a + TCP socket managed by ``trio``. + """ + def __init__( + self, sockaddr: tuple, + startup_seq: Coroutine, + auto_reconnect: bool = True, + ): + self.sockaddr = sockaddr + self._startup_seq = startup_seq + self._autorecon = auto_reconnect + self.squeue = None + + async def connect(self, sockaddr: tuple = None, **kwargs): + sockaddr = sockaddr or self.sockaddr + stream = await trio.open_tcp_stream(*sockaddr, **kwargs) + self.squeue = StreamQueue(stream) + await self._startup_seq(self) + return stream + + async def send(self, item): + await self.squeue.put(item) + + async def recv(self): + try: + return await self.squeue.get() + except trio.BrokenStreamError as err: + if self._autorecon: + await self._reconnect() + return await self.recv() + + async def aclose(self, *args): + await self.squeue.stream.aclose() + + async def __aenter__(self): + await self.connect(self.sockaddr) + return self + + async def __aexit__(self, *args): + await self.aclose(*args) + + async def _reconnect(self): + """Handle connection failures by polling until a reconnect can be + established. + """ + down = False + while True: + try: + with trio.move_on_after(3) as cancel_scope: + await self.connect() + cancelled = cancel_scope.cancelled_caught + if cancelled: + log.warn( + "Reconnect timed out after 3 seconds, retrying...") + continue + else: + log.warn("Stream connection re-established!") + break + except (OSError, ConnectionRefusedError): + if not down: + down = True + log.warn( + f"Connection to {self.sockaddr} went down, waiting" + " for re-establishment") + await trio.sleep(1) + + async def aiter_recv(self): + """Async iterate items from underlying stream. + """ + while True: + try: + async for item in self.squeue: + yield item + except trio.BrokenStreamError as err: + if not self._autorecon: + raise + if self._autorecon: # attempt reconnect + await self._reconnect() + continue + else: + return + + +async def stream_quotes( + brokermod: ModuleType, + get_quotes: Coroutine, + tickers2qs: {str: StreamQueue}, rate: int = 5, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -81,54 +207,193 @@ async def poll_tickers( A broker-client ``quoter`` async context manager must be provided which returns an async quote function. """ + broker_limit = getattr(brokermod, '_rate_limit', float('inf')) + if broker_limit < rate: + rate = broker_limit + log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") + sleeptime = round(1. / rate, 3) _cache = {} # ticker to quote caching - async with quoter(client, tickers) as get_quotes: - while True: # use an event here to trigger exit? - prequote_start = time.time() + while True: # use an event here to trigger exit? + prequote_start = time.time() - with trio.move_on_after(3) as cancel_scope: - quotes = await get_quotes(tickers) + if not any(tickers2qs.values()): + log.warn(f"No subs left for broker {brokermod.name}, exiting task") + break - cancelled = cancel_scope.cancelled_caught - if cancelled: - log.warn("Quote query timed out after 3 seconds, retrying...") - # handle network outages by idling until response is received - quotes = await wait_for_network(partial(get_quotes, tickers)) + tickers = list(tickers2qs.keys()) + with trio.move_on_after(3) as cancel_scope: + quotes = await get_quotes(tickers) - postquote_start = time.time() + cancelled = cancel_scope.cancelled_caught + if cancelled: + log.warn("Quote query timed out after 3 seconds, retrying...") + # handle network outages by idling until response is received + quotes = await wait_for_network(partial(get_quotes, tickers)) + + postquote_start = time.time() + q_payloads = {} + for symbol, quote in quotes.items(): + if diff_cached: + # if cache is enabled then only deliver "new" changes + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info( + f"New quote {quote['symbol']}:\n{new}") + _cache[symbol] = quote + for queue in tickers2qs[symbol]: + q_payloads.setdefault(queue, {})[symbol] = quote + else: + for queue in tickers2qs[symbol]: + q_payloads.setdefault(queue, {})[symbol] = quote + + # deliver to each subscriber + if q_payloads: + for queue, payload in q_payloads.items(): + try: + await queue.put(payload) + except ( + # That's right, anything you can think of... + trio.ClosedStreamError, ConnectionResetError, + ConnectionRefusedError, + ): + log.warn(f"{queue.peer} went down?") + for qset in tickers2qs.values(): + qset.discard(queue) + + req_time = round(postquote_start - prequote_start, 3) + proc_time = round(time.time() - postquote_start, 3) + tot = req_time + proc_time + log.debug(f"Request + processing took {tot}") + delay = sleeptime - tot + if delay <= 0: + log.warn( + f"Took {req_time} (request) + {proc_time} (processing) " + f"= {tot} secs (> {sleeptime}) for processing quotes?") + else: + log.debug(f"Sleeping for {delay}") + await trio.sleep(delay) + + +async def start_quoter( + broker2tickersubs: dict, + clients: dict, + dtasks: set, # daemon task registry + nursery: "Nusery", + stream: trio.SocketStream, +) -> None: + """Handle per-broker quote stream subscriptions. + + Spawns new quoter tasks for each broker backend on-demand. + Since most brokers seems to support batch quote requests we + limit to one task per process for now. + """ + queue = StreamQueue(stream) # wrap in a shabby queue-like api + log.info(f"Accepted new connection from {queue.peer}") + async with queue.stream: + async for broker, tickers in queue: + log.info( + f"{queue.peer} subscribed to {broker} for tickers {tickers}") + + if broker not in broker2tickersubs: + brokermod = get_brokermod(broker) + + # TODO: move to AsyncExitStack in 3.7 + client_cntxmng = brokermod.get_client() + client = await client_cntxmng.__aenter__() + get_quotes = await brokermod.quoter(client, tickers) + clients[broker] = ( + brokermod, client, client_cntxmng, get_quotes) + tickers2qs = broker2tickersubs.setdefault( + broker, {}.fromkeys(tickers, {queue, })) + else: + log.info(f"Subscribing with existing `{broker}` daemon") + brokermod, client, _, get_quotes = clients[broker] + tickers2qs = broker2tickersubs[broker] + # update map from each symbol to requesting client's queue + for ticker in tickers: + tickers2qs.setdefault(ticker, set()).add(queue) + + # beginning of section to be trimmed out with #37 + ################################################# + # get a single quote filtering out any bad tickers + # NOTE: this code is always run for every new client + # subscription even when a broker quoter task is already running + # since the new client needs to know what symbols are accepted + log.warn(f"Retrieving smoke quote for {queue.peer}") + quotes = await get_quotes(tickers) + # pop any tickers that aren't returned in the first quote + tickers = set(tickers) - set(quotes) + for ticker in tickers: + log.warn( + f"Symbol `{ticker}` not found by broker `{brokermod.name}`" + ) + tickers2qs.pop(ticker, None) + + # pop any tickers that return "empty" quotes payload = {} for symbol, quote in quotes.items(): - # FIXME: None is returned if a symbol can't be found. - # Consider filtering out such symbols before starting poll loop if quote is None: + log.warn( + f"Symbol `{symbol}` not found by broker" + f" `{brokermod.name}`") + tickers2qs.pop(symbol, None) continue + payload[symbol] = quote - if diff_cached: - # if cache is enabled then only deliver "new" changes - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.info( - f"New quote {quote['symbol']}:\n{new}") - _cache[symbol] = quote - payload[symbol] = quote - else: - payload[symbol] = quote + # push initial quotes response for client initialization + await queue.put(payload) - if payload: - q.put_nowait(payload) + # end of section to be trimmed out with #37 + ########################################### - req_time = round(postquote_start - prequote_start, 3) - proc_time = round(time.time() - postquote_start, 3) - tot = req_time + proc_time - log.debug(f"Request + processing took {tot}") - delay = sleeptime - tot - if delay <= 0: - log.warn( - f"Took {req_time} (request) + {proc_time} (processing) " - f"= {tot} secs (> {sleeptime}) for processing quotes?") - else: - log.debug(f"Sleeping for {delay}") - await trio.sleep(delay) + if broker not in dtasks: # no quoter task yet + # task should begin on the next checkpoint/iteration + log.info(f"Spawning quoter task for {brokermod.name}") + nursery.start_soon( + stream_quotes, brokermod, get_quotes, tickers2qs) + dtasks.add(broker) + + log.debug("Waiting on subscription request") + else: + log.info(f"client @ {queue.peer} disconnected") + # drop any lingering subscriptions + for ticker, qset in tickers2qs.items(): + qset.discard(queue) + + # if there are no more subscriptions with this broker + # drop from broker subs dict + if not any(tickers2qs.values()): + log.info(f"No more subscriptions for {broker}") + broker2tickersubs.pop(broker, None) + dtasks.discard(broker) + + # TODO: move to AsyncExitStack in 3.7 + for _, _, cntxmng, _ in clients.values(): + # FIXME: yes I know it's totally wrong... + await cntxmng.__aexit__(None, None, None) + + +async def _daemon_main() -> None: + """Entry point for the broker daemon which waits for connections + before spawning micro-services. + """ + # global space for broker-daemon subscriptions + broker2tickersubs = {} + clients = {} + dtasks = set() + + async with trio.open_nursery() as nursery: + listeners = await nursery.start( + partial( + trio.serve_tcp, + partial( + start_quoter, broker2tickersubs, clients, + dtasks, nursery + ), + 1616, host='127.0.0.1' + ) + ) + log.debug(f"Spawned {listeners}") diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index ee0886cb..3f0ebbd7 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -284,16 +284,33 @@ async def get_client() -> Client: write_conf(client) -@asynccontextmanager async def quoter(client: Client, tickers: [str]): """Quoter context. """ - t2ids = await client.tickers2ids(tickers) - ids = ','.join(map(str, t2ids.values())) + t2ids = {} + ids = '' + + def filter_symbols(quotes_dict): + nonlocal t2ids + for symbol, quote in quotes_dict.items(): + if quote['low52w'] is None: + log.warn( + f"{symbol} seems to be defunct discarding from tickers") + t2ids.pop(symbol) async def get_quote(tickers): """Query for quotes using cached symbol ids. """ + if not tickers: + return {} + nonlocal ids, t2ids + new, current = set(tickers), set(t2ids.keys()) + if new != current: + # update ticker ids cache + log.debug(f"Tickers set changed {new - current}") + t2ids = await client.tickers2ids(tickers) + ids = ','.join(map(str, t2ids.values())) + try: quotes_resp = await client.api.quotes(ids=ids) except QuestradeError as qterr: @@ -310,20 +327,17 @@ async def quoter(client: Client, tickers: [str]): quotes[quote['symbol']] = quote if quote.get('delay', 0) > 0: - log.warning(f"Delayed quote:\n{quote}") + log.warn(f"Delayed quote:\n{quote}") return quotes first_quotes_dict = await get_quote(tickers) - for symbol, quote in first_quotes_dict.items(): - if quote['low52w'] is None: - log.warn(f"{symbol} seems to be defunct discarding from tickers") - t2ids.pop(symbol) + filter_symbols(first_quotes_dict) # re-save symbol ids cache ids = ','.join(map(str, t2ids.values())) - yield get_quote + return get_quote # Questrade key conversion / column order diff --git a/piker/brokers/robinhood.py b/piker/brokers/robinhood.py index 42ba2982..5a220251 100644 --- a/piker/brokers/robinhood.py +++ b/piker/brokers/robinhood.py @@ -72,11 +72,10 @@ async def get_client() -> Client: yield Client() -@asynccontextmanager async def quoter(client: Client, tickers: [str]): """Quoter context. """ - yield client.quote + return client.quote # Robinhood key conversion / column order diff --git a/piker/cli.py b/piker/cli.py index 171a71d8..c971ed3d 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -2,19 +2,18 @@ Console interface to broker client/daemons. """ from functools import partial -from importlib import import_module -import os -from collections import defaultdict +from multiprocessing import Process import json +import os import click -import trio import pandas as pd +import trio - -from .log import get_console_log, colorize_json, get_logger from . import watchlists as wl from .brokers import core, get_brokermod +from .brokers.core import _daemon_main, Client +from .log import get_console_log, colorize_json, get_logger log = get_logger('cli') DEFAULT_BROKER = 'robinhood' @@ -35,6 +34,14 @@ def run(main, loglevel='info'): log.debug("Exiting piker") +@click.command() +@click.option('--loglevel', '-l', default='warning', help='Logging level') +def pikerd(loglevel): + """Spawn the piker daemon. + """ + run(_daemon_main, loglevel) + + @click.group() def cli(): pass @@ -115,20 +122,52 @@ def quote(loglevel, broker, tickers, df_output): @click.option('--rate', '-r', default=5, help='Logging level') @click.argument('name', nargs=1, required=True) def watch(loglevel, broker, rate, name): - """Spawn a watchlist. + """Spawn a real-time watchlist. """ from .ui.watchlist import _async_main log = get_console_log(loglevel) # activate console logging brokermod = get_brokermod(broker) watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) - broker_limit = getattr(brokermod, '_rate_limit', float('inf')) + tickers = watchlists[name] - if broker_limit < rate: - rate = broker_limit - log.warn(f"Limiting {brokermod.__name__} query rate to {rate}/sec") + async def main(timeout=1): - trio.run(_async_main, name, watchlists[name], brokermod, rate) + async def subscribe(client): + # initial request for symbols price streams + await client.send((brokermod.name, tickers)) + + client = Client(('127.0.0.1', 1616), subscribe) + try: + await client.connect() + except OSError as oserr: + await trio.sleep(0.5) + # will raise indicating child proc should be spawned + await client.connect() + + async with trio.open_nursery() as nursery: + nursery.start_soon( + _async_main, name, client, tickers, + brokermod, rate + ) + + # signal exit of stream handler task + await client.aclose() + + try: + trio.run(main) + except OSError as oserr: + log.warn("No broker daemon could be found") + log.warn(oserr) + log.warning("Spawning local broker-daemon...") + child = Process( + target=run, + args=(_daemon_main, loglevel), + daemon=True, + ) + child.start() + trio.run(main, 5) + child.join() @cli.group() diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index 9f57d708..a0944ffb 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -7,7 +7,6 @@ Launch with ``piker watch ``. """ from itertools import chain from types import ModuleType -from functools import partial import trio from kivy.uix.boxlayout import BoxLayout @@ -21,7 +20,6 @@ from kivy.core.window import Window from ..log import get_logger from .pager import PagerView -from ..brokers.core import poll_tickers log = get_logger('watchlist') @@ -49,7 +47,7 @@ _kv = (f''' #:kivy 1.10.0 - font_size: 18 + font_size: 20 # text_size: self.size size: self.texture_size color: {colorcode('gray')} @@ -318,9 +316,10 @@ class TickerTable(GridLayout): async def update_quotes( + nursery: 'Nursery', brokermod: ModuleType, widgets: dict, - queue: trio.Queue, + client: 'Client', symbol_data: dict, first_quotes: dict ): @@ -360,9 +359,7 @@ async def update_quotes( grid.render_rows(cache) # core cell update loop - while True: - log.debug("Waiting on quotes") - quotes = await queue.get() # new quotes data only + async for quotes in client.aiter_recv(): # new quotes data only for symbol, quote in quotes.items(): record, displayable = brokermod.format_quote( quote, symbol_data=symbol_data) @@ -372,6 +369,10 @@ async def update_quotes( color_row(row, record) grid.render_rows(cache) + log.debug("Waiting on quotes") + + log.warn("Server connection dropped") + nursery.cancel_scope.cancel() async def run_kivy(root, nursery): @@ -381,82 +382,78 @@ async def run_kivy(root, nursery): nursery.cancel_scope.cancel() # cancel all other tasks that may be running -async def _async_main(name, tickers, brokermod, rate): +async def _async_main(name, client, tickers, brokermod, rate): '''Launch kivy app + all other related tasks. This is started with cli command `piker watch`. ''' - queue = trio.Queue(1000) - async with brokermod.get_client() as client: - async with trio.open_nursery() as nursery: - # get long term data including last days close price - sd = await client.symbol_data(tickers) + # get initial symbol data + async with brokermod.get_client() as bclient: + # get long term data including last days close price + sd = await bclient.symbol_data(tickers) - nursery.start_soon( - partial(poll_tickers, client, brokermod.quoter, tickers, queue, - rate=rate) - ) + async with trio.open_nursery() as nursery: + # get first quotes response + log.debug("Waiting on first quote...") + quotes = await client.recv() + first_quotes = [ + brokermod.format_quote(quote, symbol_data=sd)[0] + for quote in quotes.values()] - # get first quotes response - quotes = await queue.get() - first_quotes = [ - brokermod.format_quote(quote, symbol_data=sd)[0] - for quote in quotes.values()] + if first_quotes[0].get('last') is None: + log.error("Broker API is down temporarily") + nursery.cancel_scope.cancel() + return - if first_quotes[0].get('last') is None: - log.error("Broker API is down temporarily") - nursery.cancel_scope.cancel() - return + # build out UI + Window.set_title(f"watchlist: {name}\t(press ? for help)") + Builder.load_string(_kv) + box = BoxLayout(orientation='vertical', padding=5, spacing=5) - # build out UI - Window.set_title(f"watchlist: {name}\t(press ? for help)") - Builder.load_string(_kv) - box = BoxLayout(orientation='vertical', padding=5, spacing=5) + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._bidasks - # define bid-ask "stacked" cells - # (TODO: needs some rethinking and renaming for sure) - bidasks = brokermod._bidasks + # add header row + headers = first_quotes[0].keys() + header = Row( + {key: key for key in headers}, + headers=headers, + bidasks=bidasks, + is_header_row=True, + size_hint=(1, None), + ) + box.add_widget(header) - # add header row - headers = first_quotes[0].keys() - header = Row( - {key: key for key in headers}, - headers=headers, - bidasks=bidasks, - is_header_row=True, - size_hint=(1, None), - ) - box.add_widget(header) + # build grid + grid = TickerTable( + cols=1, + size_hint=(1, None), + ) + for ticker_record in first_quotes: + grid.append_row(ticker_record, bidasks=bidasks) + # associate the col headers row with the ticker table even though + # they're technically wrapped separately in containing BoxLayout + header.table = grid - # build grid - grid = TickerTable( - cols=1, - size_hint=(1, None), - ) - for ticker_record in first_quotes: - grid.append_row(ticker_record, bidasks=bidasks) - # associate the col headers row with the ticker table even though - # they're technically wrapped separately in containing BoxLayout - header.table = grid + # mark the initial sorted column header as bold and underlined + sort_cell = header.get_cell(grid.sort_key) + sort_cell.bold = sort_cell.underline = True + grid.last_clicked_col_cell = sort_cell - # mark the initial sorted column header as bold and underlined - sort_cell = header.get_cell(grid.sort_key) - sort_cell.bold = sort_cell.underline = True - grid.last_clicked_col_cell = sort_cell + # set up a pager view for large ticker lists + grid.bind(minimum_height=grid.setter('height')) + pager = PagerView(box, grid, nursery) + box.add_widget(pager) - # set up a pager view for large ticker lists - grid.bind(minimum_height=grid.setter('height')) - pager = PagerView(box, grid, nursery) - box.add_widget(pager) - - widgets = { - # 'anchor': anchor, - 'root': box, - 'grid': grid, - 'box': box, - 'header': header, - 'pager': pager, - } - nursery.start_soon(run_kivy, widgets['root'], nursery) - nursery.start_soon( - update_quotes, brokermod, widgets, queue, sd, quotes) + widgets = { + # 'anchor': anchor, + 'root': box, + 'grid': grid, + 'box': box, + 'header': header, + 'pager': pager, + } + nursery.start_soon(run_kivy, widgets['root'], nursery) + nursery.start_soon( + update_quotes, nursery, brokermod, widgets, client, sd, quotes) diff --git a/setup.py b/setup.py index 07d86eb4..3ba37c91 100755 --- a/setup.py +++ b/setup.py @@ -31,11 +31,12 @@ setup( entry_points={ 'console_scripts': [ 'piker = piker.cli:cli', + 'pikerd = piker.cli:pikerd', ] }, install_requires=[ 'click', 'colorlog', 'trio', 'attrs', 'async_generator', - 'pygments', 'cython', 'asks', 'pandas', + 'pygments', 'cython', 'asks', 'pandas', 'msgpack', #'kivy', see requirement.txt; using a custom branch atm ], extras_require={