diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index b56efb0d..950b7070 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -158,7 +158,18 @@ class Client: """ t2ids = await self.tickers2ids(tickers) ids = ','.join(map(str, t2ids.values())) - return await self.api.quotes(ids=ids) + return (await self.api.quotes(ids=ids))['quotes'] + + async def symbols(self, tickers): + """Return quotes for each ticker in ``tickers``. + """ + t2ids = await self.tickers2ids(tickers) + ids = ','.join(map(str, t2ids.values())) + symbols = {} + for pkt in (await self.api.symbols(ids=ids))['symbols']: + symbols[pkt['symbol']] = pkt + + return symbols class _API: @@ -282,7 +293,10 @@ async def serve_forever(tasks) -> None: async def poll_tickers( - client: Client, tickers: [str], q: trio.Queue, rate: int = 2, + client: Client, tickers: [str], + q: trio.Queue, + rate: int = 3, + cache: bool = False, # only deliver "new" changes to the queue ) -> None: """Stream quotes for a sequence of tickers at the given ``rate`` per second. @@ -298,15 +312,21 @@ async def poll_tickers( quotes = quotes_resp['quotes'] # log.trace(quotes) - # only push quotes with "new" data payload = [] for quote in quotes: - symbol = quote['symbol'] - last = _cache.setdefault(symbol, {}) - new = set(quote.items()) - set(last.items()) - if new: - log.debug(f"New quote {symbol} data:\n{new}") - _cache[symbol] = quote + + if quote['delay'] > 0: + log.warning(f"Delayed quote:\n{quote}") + + if cache: # if cache is enabled then only deliver "new" changes + symbol = quote['symbol'] + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.debug(f"New quote {symbol} data:\n{new}") + _cache[symbol] = quote + payload.append(quote) + else: payload.append(quote) if payload: