diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 0290e0cf..ae58c837 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -3,6 +3,8 @@ Core broker-daemon tasks and API. """ import time import inspect +from functools import partial +import socket from types import ModuleType from typing import AsyncContextManager @@ -44,6 +46,21 @@ async def quote(brokermod: ModuleType, tickers: [str]) -> dict: return results +async def wait_for_network(get_quotes, sleep=1): + """Wait until the network comes back up. + """ + while True: + try: + with trio.move_on_after(1) as cancel_scope: + return await get_quotes() + if cancel_scope.cancelled_caught: + log.warn("Quote query timed out") + continue + except socket.gaierror: + log.warn(f"Network is down waiting for reestablishment...") + await trio.sleep(sleep) + + async def poll_tickers( client: 'Client', quoter: AsyncContextManager, @@ -64,30 +81,35 @@ async def poll_tickers( async with quoter(client, tickers) as get_quotes: while True: # use an event here to trigger exit? prequote_start = time.time() - quotes = await get_quotes(tickers) + + with trio.move_on_after(3) as cancel_scope: + quotes = await get_quotes(tickers) + + cancelled = cancel_scope.cancelled_caught + if cancelled: + log.warn("Quote query timed out after 3 seconds, retrying...") + # handle network outages by idling until response is received + quotes = await wait_for_network(partial(get_quotes, tickers)) + postquote_start = time.time() - payload = [] + payload = {} for symbol, quote in quotes.items(): # FIXME: None is returned if a symbol can't be found. # Consider filtering out such symbols before starting poll loop if quote is None: continue - if quote.get('delay', 0) > 0: - log.warning(f"Delayed quote:\n{quote}") - if diff_cached: # if cache is enabled then only deliver "new" changes - symbol = quote['symbol'] last = _cache.setdefault(symbol, {}) new = set(quote.items()) - set(last.items()) if new: log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote - payload.append(quote) + payload[symbol] = quote else: - payload.append(quote) + payload[symbol] = quote if payload: q.put_nowait(payload) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 7127ae75..ee0886cb 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -304,7 +304,24 @@ async def quoter(client: Client, tickers: [str]): else: raise - return {quote['symbol']: quote for quote in quotes_resp['quotes']} + # dict packing and post-processing + quotes = {} + for quote in quotes_resp['quotes']: + quotes[quote['symbol']] = quote + + if quote.get('delay', 0) > 0: + log.warning(f"Delayed quote:\n{quote}") + + return quotes + + first_quotes_dict = await get_quote(tickers) + for symbol, quote in first_quotes_dict.items(): + if quote['low52w'] is None: + log.warn(f"{symbol} seems to be defunct discarding from tickers") + t2ids.pop(symbol) + + # re-save symbol ids cache + ids = ','.join(map(str, t2ids.values())) yield get_quote diff --git a/piker/ui/watchlist.py b/piker/ui/watchlist.py index b6bc410c..9f57d708 100644 --- a/piker/ui/watchlist.py +++ b/piker/ui/watchlist.py @@ -349,10 +349,8 @@ async def update_quotes( grid.quote_cache = cache # initial coloring - for quote in first_quotes: - sym = quote['symbol'] + for sym, quote in first_quotes.items(): row = grid.symbols2rows[sym] - # record, displayable = qtconvert(quote, symbol_data=symbol_data) record, displayable = brokermod.format_quote( quote, symbol_data=symbol_data) row.update(record, displayable) @@ -365,12 +363,11 @@ async def update_quotes( while True: log.debug("Waiting on quotes") quotes = await queue.get() # new quotes data only - for quote in quotes: - # record, displayable = qtconvert(quote, symbol_data=symbol_data) + for symbol, quote in quotes.items(): record, displayable = brokermod.format_quote( quote, symbol_data=symbol_data) - row = grid.symbols2rows[record['symbol']] - cache[record['symbol']] = (record, row) + row = grid.symbols2rows[symbol] + cache[symbol] = (record, row) row.update(record, displayable) color_row(row, record) @@ -401,11 +398,10 @@ async def _async_main(name, tickers, brokermod, rate): ) # get first quotes response - pkts = await queue.get() + quotes = await queue.get() first_quotes = [ - # qtconvert(quote, symbol_data=sd)[0] for quote in pkts] brokermod.format_quote(quote, symbol_data=sd)[0] - for quote in pkts] + for quote in quotes.values()] if first_quotes[0].get('last') is None: log.error("Broker API is down temporarily") @@ -463,4 +459,4 @@ async def _async_main(name, tickers, brokermod, rate): } nursery.start_soon(run_kivy, widgets['root'], nursery) nursery.start_soon( - update_quotes, brokermod, widgets, queue, sd, pkts) + update_quotes, brokermod, widgets, queue, sd, quotes)