From 9bbf0e0d7a4dc2a2372584d766f66c024bef8bb7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Aug 2020 00:03:09 -0400 Subject: [PATCH] Add a normalizer routine which emits quote differentials/ticks --- piker/brokers/questrade.py | 115 +++++++++++++++++++++++++++++++------ 1 file changed, 98 insertions(+), 17 deletions(-) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 62fd1773..98989741 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -9,6 +9,7 @@ from datetime import datetime from functools import partial import itertools import configparser +from pprint import pformat from typing import ( List, Tuple, Dict, Any, Iterator, NamedTuple, AsyncGenerator, @@ -837,7 +838,8 @@ _qt_stock_keys = { # 'low52w': 'low52w', # put in info widget # 'high52w': 'high52w', # "lastTradePriceTrHrs": 7.99, - 'lastTradeTime': ('fill_time', datetime.fromisoformat), + # 'lastTradeTime': ('fill_time', datetime.fromisoformat), + 'lastTradeTime': 'fill_time', "lastTradeTick": 'tick', # ("Equal", "Up", "Down") # "symbolId": 3575753, # "tier": "", @@ -913,6 +915,7 @@ def format_stock_quote( new[new_key] = value displayable[new_key] = display_value + new['displayable'] = displayable return new, displayable @@ -973,6 +976,7 @@ def format_option_quote( quote: dict, symbol_data: dict, keymap: dict = _qt_option_keys, + include_displayables: bool = True, ) -> Tuple[dict, dict]: """Remap a list of quote dicts ``quotes`` using the mapping of old keys -> new keys ``keymap`` returning 2 dicts: one with raw data and the other @@ -1060,7 +1064,10 @@ async def get_cached_client( await client._exit_stack.aclose() -async def smoke_quote(get_quotes, tickers): # , broker): +async def smoke_quote( + get_quotes, + tickers +): """Do an initial "smoke" request for symbols in ``tickers`` filtering out any symbols not supported by the broker queried in the call to ``get_quotes()``. @@ -1099,6 +1106,7 @@ async def smoke_quote(get_quotes, tickers): # , broker): log.error( f"{symbol} seems to be defunct") + quote['symbol'] = symbol payload[symbol] = quote return payload @@ -1107,20 +1115,90 @@ async def smoke_quote(get_quotes, tickers): # , broker): ########################################### +# unbounded, shared between streaming tasks +_symbol_info_cache = {} + + # function to format packets delivered to subscribers def packetizer( topic: str, quotes: Dict[str, Any], - formatter: Callable, - symbol_data: Dict[str, Any], ) -> Dict[str, Any]: """Normalize quotes by name into dicts using broker-specific processing. """ - new = {} - for quote in quotes: - new[quote['symbol']], _ = formatter(quote, symbol_data) + # repack into symbol keyed dict + return {q['symbol']: q for q in quotes} + +def normalize( + quotes: Dict[str, Any], + _cache: Dict[str, Any], # dict held in scope of the streaming loop + formatter: Callable, +) -> Dict[str, Any]: + """Deliver normalized quotes by name into dicts using + broker-specific processing; only emit changes differeing from the + last quote sample creating a psuedo-tick type datum. + """ + new = {} + # XXX: this is effectively emitting "sampled ticks" + # useful for polling setups but obviously should be + # disabled if you're already rx-ing per-tick data. + for quote in quotes: + symbol = quote['symbol'] + + # look up last quote from cache + last = _cache.setdefault(symbol, {}) + _cache[symbol] = quote + + # compute volume difference + last_volume = last.get('volume', 0) + current_volume = quote['volume'] + volume_diff = current_volume - last_volume + + # find all keys that have match to a new value compared + # to the last quote received + changed = set(quote.items()) - set(last.items()) + if changed: + log.info(f"New quote {symbol}:\n{changed}") + + # TODO: can we reduce the # of iterations here and in + # called funcs? + payload = {k: quote[k] for k, v in changed} + payload['symbol'] = symbol # required by formatter + + # TODO: we should probaby do the "computed" fields + # processing found inside this func in a downstream actor? + fquote, _ = formatter(payload, _symbol_info_cache) + fquote['key'] = fquote['symbol'] = symbol + + # if there was volume likely the last size of + # shares traded is useful info and it's possible + # that the set difference from above will disregard + # a "size" value since the same # of shares were traded + # volume = payload.get('volume') + if volume_diff: + if volume_diff < 0: + log.error(f"Uhhh {symbol} volume: {volume_diff} ?") + + fquote['volume_delta'] = volume_diff + + # TODO: We can emit 2 ticks here: + # - one for the volume differential + # - one for the last known trade size + # The first in theory can be unwound and + # interpolated assuming the broker passes an + # accurate daily VWAP value. + # To make this work we need a universal ``size`` + # field that is normalized before hitting this logic. + fquote['size'] = quote.get('lastTradeSize', 0) + if 'last' not in fquote: + fquote['last'] = quote.get('lastTradePrice', float('nan')) + + new[symbol] = fquote + + if new: + log.info(f"New quotes:\n{pformat(new)}") return new @@ -1129,13 +1207,12 @@ async def stream_quotes( ctx: tractor.Context, # marks this as a streaming func symbols: List[str], feed_type: str = 'stock', - diff_cached: bool = True, rate: int = 3, loglevel: str = None, # feed_type: str = 'stock', ) -> AsyncGenerator[str, Dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(tractor.current_actor().loglevel) + get_console_log(loglevel) async with get_cached_client('questrade') as client: if feed_type == 'stock': @@ -1144,17 +1221,25 @@ async def stream_quotes( # do a smoke quote (note this mutates the input list and filters # out bad symbols for now) - payload = await smoke_quote(get_quotes, list(symbols)) + first_quotes = await smoke_quote(get_quotes, list(symbols)) else: formatter = format_option_quote get_quotes = await option_quoter(client, symbols) # packetize - payload = { + first_quotes = { quote['symbol']: quote for quote in await get_quotes(symbols) } + # update global symbol data state sd = await client.symbol_info(symbols) + _symbol_info_cache.update(sd) + + # pre-process first set of quotes + payload = {} + for sym, quote in first_quotes.items(): + fquote, _ = formatter(quote, sd) + payload[sym] = fquote # push initial smoke quote response for client initialization await ctx.send_yield(payload) @@ -1167,15 +1252,11 @@ async def stream_quotes( task_name=feed_type, ctx=ctx, topics=symbols, - packetizer=partial( - packetizer, - formatter=formatter, - symbol_data=sd, - ), + packetizer=packetizer, # actual target "streaming func" args get_quotes=get_quotes, - diff_cached=diff_cached, + normalizer=partial(normalize, formatter=formatter), rate=rate, ) log.info("Terminating stream quoter task")