diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 96bfa1e1..9580add8 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -12,7 +12,7 @@ import typing from typing import ( Coroutine, Callable, Dict, List, Any, Tuple, AsyncGenerator, - Sequence, + Sequence ) import contextlib from operator import itemgetter @@ -47,7 +47,7 @@ async def wait_for_network( continue except socket.gaierror: if not down: # only report/log network down once - log.warn(f"Network is down waiting for re-establishment...") + log.warn("Network is down waiting for re-establishment...") down = True await trio.sleep(sleep) @@ -83,7 +83,6 @@ class BrokerFeed: async def stream_poll_requests( get_topics: typing.Callable, get_quotes: Coroutine, - feed: BrokerFeed, rate: int = 3, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: @@ -93,17 +92,18 @@ async def stream_poll_requests( This routine is built for brokers who support quote polling for multiple symbols per request. The ``get_topics()`` func is called to retreive the set of symbols each iteration and ``get_quotes()`` is to retreive - the quotes. - + the quotes. A stock-broker client ``get_quotes()`` async function must be provided which returns an async quote retrieval function. - """ - broker_limit = getattr(feed.mod, '_rate_limit', float('inf')) - if broker_limit < rate: - rate = broker_limit - log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") + .. note:: + This code is mostly tailored (for now) to the questrade backend. + It is currently the only broker that doesn't support streaming without + paying for data. See the note in the diffing section regarding volume + differentials which needs to be addressed in order to get cross-broker + support. + """ sleeptime = round(1. / rate, 3) _cache = {} # ticker to quote caching @@ -136,6 +136,7 @@ async def stream_poll_requests( for quote in quotes: symbol = quote['symbol'] last = _cache.setdefault(symbol, {}) + last_volume = last.get('volume', 0) # find all keys that have match to a new value compared # to the last quote received @@ -153,9 +154,22 @@ async def stream_poll_requests( # shares traded is useful info and it's possible # that the set difference from above will disregard # a "size" value since the same # of shares were traded - size = quote.get('size') - if size and 'volume' in payload: - payload['size'] = size + volume = payload.get('volume') + if volume: + volume_since_last_quote = volume - last_volume + assert volume_since_last_quote > 0 + payload['volume_delta'] = volume_since_last_quote + + # TODO: We can emit 2 ticks here: + # - one for the volume differential + # - one for the last known trade size + # The first in theory can be unwound and + # interpolated assuming the broker passes an + # accurate daily VWAP value. + # To make this work we need a universal ``size`` + # field that is normalized before hitting this logic. + # XXX: very questrade specific + payload['size'] = quote['lastTradeSize'] # XXX: we append to a list for the options case where the # subscription topic (key) is the same for all @@ -312,6 +326,7 @@ async def start_quote_stream( # do a smoke quote (note this mutates the input list and filters # out bad symbols for now) payload = await smoke_quote(get_quotes, symbols, broker) + formatter = feed.mod.format_stock_quote elif feed_type == 'option': # FIXME: yeah we need maybe a more general way to specify @@ -326,9 +341,16 @@ async def start_quote_stream( quote['symbol']: quote for quote in await get_quotes(symbols) } + formatter = feed.mod.format_option_quote - def packetizer(topic, quotes): - return {quote['symbol']: quote for quote in quotes} + sd = await feed.client.symbol_info(symbols) + # formatter = partial(formatter, symbol_data=sd) + + packetizer = partial( + feed.mod.packetizer, + formatter=formatter, + symbol_data=sd, + ) # push initial smoke quote response for client initialization await ctx.send_yield(payload) @@ -342,7 +364,6 @@ async def start_quote_stream( packetizer=packetizer, # actual func args - feed=feed, get_quotes=get_quotes, diff_cached=diff_cached, rate=rate, diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 961190c1..de0d1e7a 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -12,6 +12,7 @@ import configparser from typing import ( List, Tuple, Dict, Any, Iterator, NamedTuple, AsyncGenerator, + Callable, ) import arrow @@ -26,7 +27,7 @@ import asks from ..calc import humanize, percent_change from . import config from ._util import resproc, BrokerError, SymbolNotFound -from ..log import get_logger, colorize_json +from ..log import get_logger, colorize_json, get_console_log from .._async_utils import async_lifo_cache from . import get_brokermod @@ -933,7 +934,8 @@ _qt_option_keys = { # "theta": ('theta', partial(round, ndigits=3)), # "vega": ('vega', partial(round, ndigits=3)), '$ vol': ('$ vol', humanize), - 'volume': ('vol', humanize), + # XXX: required key to trigger trade execution datum msg + 'volume': ('volume', humanize), # "2021-01-15T00:00:00.000000-05:00", # "isHalted": false, # "key": [ @@ -1031,18 +1033,20 @@ async def get_cached_client( log.info(f"Loading existing `{brokername}` daemon") async with lock: client = clients[brokername] + client._consumers += 1 + yield client except KeyError: log.info(f"Creating new client for broker {brokername}") async with lock: brokermod = get_brokermod(brokername) exit_stack = contextlib.AsyncExitStack() client = await exit_stack.enter_async_context( - brokermod.get_client()) + brokermod.get_client() + ) + client._consumers = 0 client._exit_stack = exit_stack clients[brokername] = client - else: - client._consumers += 1 - yield client + yield client finally: client._consumers -= 1 if client._consumers <= 0: @@ -1097,6 +1101,23 @@ async def smoke_quote(get_quotes, tickers): # , broker): ########################################### +# function to format packets delivered to subscribers +def packetizer( + topic: str, + quotes: Dict[str, Any], + formatter: Callable, + symbol_data: Dict[str, Any], +) -> Dict[str, Any]: + """Normalize quotes by name into dicts using broker-specific + processing. + """ + new = {} + for quote in quotes: + new[quote['symbol']], _ = formatter(quote, symbol_data) + + return new + + @tractor.stream async def stream_quotes( ctx: tractor.Context, # marks this as a streaming func @@ -1104,8 +1125,11 @@ async def stream_quotes( feed_type: str = 'stock', diff_cached: bool = True, rate: int = 3, + loglevel: str = None, # feed_type: str = 'stock', ) -> AsyncGenerator[str, Dict[str, Any]]: + # XXX: why do we need this again? + get_console_log(tractor.current_actor().loglevel) async with get_cached_client('questrade') as client: if feed_type == 'stock': @@ -1124,20 +1148,7 @@ async def stream_quotes( for quote in await get_quotes(symbols) } - symbol_data = await client.symbol_info(symbols) - - # function to format packets delivered to subscribers - def packetizer( - topic: str, - quotes: Dict[str, Any] - ) -> Dict[str, Any]: - """Normalize quotes by name into dicts. - """ - new = {} - for quote in quotes: - new[quote['symbol']], _ = formatter(quote, symbol_data) - - return new + sd = await client.symbol_info(symbols) # push initial smoke quote response for client initialization await ctx.send_yield(payload) @@ -1150,7 +1161,11 @@ async def stream_quotes( task_name=feed_type, ctx=ctx, topics=symbols, - packetizer=packetizer, + packetizer=partial( + packetizer, + formatter=formatter, + symboal_data=sd, + ), # actual func args get_quotes=get_quotes, diff --git a/piker/data/__init__.py b/piker/data/__init__.py index af05da36..dd2c6b7b 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -6,19 +6,35 @@ and storing data from your brokers as well as sharing your feeds with other fellow pikers. """ from contextlib import asynccontextmanager +from importlib import import_module +from types import ModuleType from typing import ( Dict, List, Any, Sequence, AsyncIterator, Optional ) +import trio import tractor from ..brokers import get_brokermod -from ..log import get_logger +from ..log import get_logger, get_console_log log = get_logger(__name__) +__ingestors__ = [ + 'marketstore', +] + + +def get_ingestor(name: str) -> ModuleType: + """Return the imported ingestor module by name. + """ + module = import_module('.' + name, 'piker.data') + # we only allow monkeying because it's for internal keying + module.name = module.__name__.split('.')[-1] + return module + _data_mods = [ 'piker.brokers.core', @@ -30,7 +46,6 @@ _data_mods = [ async def maybe_spawn_brokerd( brokername: str, sleep: float = 0.5, - tries: int = 10, loglevel: Optional[str] = None, expose_mods: List = [], **tractor_kwargs, @@ -38,6 +53,11 @@ async def maybe_spawn_brokerd( """If no ``brokerd.{brokername}`` daemon-actor can be found, spawn one in a local subactor and return a portal to it. """ + if loglevel: + get_console_log(loglevel) + + tractor_kwargs['loglevel'] = loglevel + brokermod = get_brokermod(brokername) dname = f'brokerd.{brokername}' async with tractor.find_actor(dname) as portal: @@ -69,21 +89,28 @@ async def maybe_spawn_brokerd( async def open_feed( name: str, symbols: Sequence[str], + loglevel: str = 'info', ) -> AsyncIterator[Dict[str, Any]]: try: mod = get_brokermod(name) except ImportError: - # TODO: try to pull up ingest feeds - # - market store - # - influx - raise + mod = get_ingestormod(name) async with maybe_spawn_brokerd( mod.name, + loglevel=loglevel, ) as portal: stream = await portal.run( mod.__name__, 'stream_quotes', symbols=symbols, ) - yield stream + # Feed is required to deliver an initial quote asap. + # TODO: should we timeout and raise a more explicit error? + # with trio.fail_after(5): + with trio.fail_after(float('inf')): + # Retreive initial quote for each symbol + # such that consumer code can know the data layout + first_quote = await stream.__anext__() + log.info(f"Received first quote {first_quote}") + yield (first_quote, stream)