diff --git a/README.rst b/README.rst index ce7352f9..b5b71d2f 100644 --- a/README.rst +++ b/README.rst @@ -9,8 +9,8 @@ trading and financial analysis targetted at hardcore Linux users. It tries to use as much bleeding edge tech as possible including (but not limited to): -- Python 3.7+ for glue and business logic -- trio_ and asyncio_ for async +- Python 3.7+ for glue_ and business logic +- trio_ for async - tractor_ as the underlying actor model - marketstore_ for historical and real-time tick data persistence and sharing - techtonicdb_ for L2 book storage @@ -23,6 +23,7 @@ It tries to use as much bleeding edge tech as possible including (but not limite .. _marketstore: https://github.com/alpacahq/marketstore .. _techtonicdb: https://github.com/0b01/tectonicdb .. _Qt: https://www.qt.io/ +.. _glue: https://numpy.org/doc/stable/user/c-info.python-as-glue.html#using-python-as-glue Focus and Features: diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index 370722b6..b18219bf 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -14,7 +14,7 @@ import tractor from ..cli import cli from .. import watchlists as wl from ..log import get_console_log, colorize_json, get_logger -from ..brokers.core import maybe_spawn_brokerd_as_subactor +from ..data import maybe_spawn_brokerd from ..brokers import core, get_brokermod, data log = get_logger('cli') @@ -99,7 +99,7 @@ def quote(config, tickers, df_output): @cli.command() @click.option('--df-output', '-df', flag_value=True, help='Output in `pandas.DataFrame` format') -@click.option('--count', '-c', default=100, +@click.option('--count', '-c', default=1000, help='Number of bars to retrieve') @click.argument('symbol', required=True) @click.pass_obj @@ -117,10 +117,11 @@ def bars(config, symbol, count, df_output): brokermod, symbol, count=count, + as_np=df_output ) ) - if not bars: + if not len(bars): log.error(f"No quotes could be found for {symbol}?") return @@ -130,49 +131,6 @@ def bars(config, symbol, count, df_output): click.echo(colorize_json(bars)) -@cli.command() -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--rate', '-r', default=3, help='Quote rate limit') -@click.option('--test', '-t', help='Test quote stream file') -@click.option('--dhost', '-dh', default='127.0.0.1', - help='Daemon host address to connect to') -@click.argument('name', nargs=1, required=True) -@click.pass_obj -def monitor(config, rate, name, dhost, test, tl): - """Start a real-time watchlist UI - """ - # global opts - brokermod = config['brokermod'] - loglevel = config['loglevel'] - log = config['log'] - - watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) - watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) - tickers = watchlists[name] - if not tickers: - log.error(f"No symbols found for watchlist `{name}`?") - return - - from ..ui.monitor import _async_main - - async def main(tries): - async with maybe_spawn_brokerd_as_subactor( - tries=tries, loglevel=loglevel - ) as portal: - # run app "main" - await _async_main( - name, portal, tickers, - brokermod, rate, test=test, - ) - - tractor.run( - partial(main, tries=1), - name='monitor', - loglevel=loglevel if tl else None, - rpc_module_paths=['piker.ui.monitor'], - start_method='forkserver', - ) - @cli.command() @click.option('--rate', '-r', default=5, help='Logging level') @@ -198,7 +156,7 @@ def record(config, rate, name, dhost, filename): return async def main(tries): - async with maybe_spawn_brokerd_as_subactor( + async with maybe_spawn_brokerd( tries=tries, loglevel=loglevel ) as portal: # run app "main" @@ -271,37 +229,40 @@ def optsquote(config, symbol, df_output, date): @cli.command() -@click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--date', '-d', help='Contracts expiry date') -@click.option('--test', '-t', help='Test quote stream file') -@click.option('--rate', '-r', default=1, help='Logging level') -@click.argument('symbol', required=True) +@click.argument('tickers', nargs=-1, required=True) @click.pass_obj -def optschain(config, symbol, date, tl, rate, test): - """Start an option chain UI +def symbol_info(config, tickers): + """Print symbol quotes to the console """ # global opts - loglevel = config['loglevel'] - brokername = config['broker'] + brokermod = config['brokermod'] - from ..ui.option_chain import _async_main + quotes = trio.run(partial(core.symbol_info, brokermod, tickers)) + if not quotes: + log.error(f"No quotes could be found for {tickers}?") + return - async def main(tries): - async with maybe_spawn_brokerd_as_subactor( - tries=tries, loglevel=loglevel - ): - # run app "main" - await _async_main( - symbol, - brokername, - rate=rate, - loglevel=loglevel, - test=test, - ) + if len(quotes) < len(tickers): + syms = tuple(map(itemgetter('symbol'), quotes)) + for ticker in tickers: + if ticker not in syms: + brokermod.log.warn(f"Could not find symbol {ticker}?") - tractor.run( - partial(main, tries=1), - name='kivy-options-chain', - loglevel=loglevel if tl else None, - start_method='forkserver', - ) + click.echo(colorize_json(quotes)) + + +@cli.command() +@click.argument('pattern', required=True) +@click.pass_obj +def search(config, pattern): + """Search for symbols from broker backend(s). + """ + # global opts + brokermod = config['brokermod'] + + quotes = trio.run(partial(core.symbol_search, brokermod, pattern)) + if not quotes: + log.error(f"No matches could be found for {pattern}?") + return + + click.echo(colorize_json(quotes)) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index d2c958e6..2e672c61 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -1,23 +1,18 @@ """ -Broker high level API layer. +Broker high level cross-process API layer. + +This API should be kept "remote service compatible" meaning inputs to +routines should be primitive data types where possible. """ import inspect from types import ModuleType from typing import List, Dict, Any, Optional -from async_generator import asynccontextmanager -import tractor - from ..log import get_logger -from .data import DataFeed from . import get_brokermod -log = get_logger('broker.core') -_data_mods = [ - 'piker.brokers.core', - 'piker.brokers.data', -] +log = get_logger(__name__) async def api(brokername: str, methname: str, **kwargs) -> dict: @@ -25,12 +20,11 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: """ brokermod = get_brokermod(brokername) async with brokermod.get_client() as client: - - meth = getattr(client.api, methname, None) + meth = getattr(client, methname, None) if meth is None: - log.warning( + log.debug( f"Couldn't find API method {methname} looking up on client") - meth = getattr(client, methname, None) + meth = getattr(client.api, methname, None) if meth is None: log.error(f"No api method `{methname}` could be found?") @@ -48,24 +42,6 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: return await meth(**kwargs) -@asynccontextmanager -async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None): - """If no ``brokerd`` daemon-actor can be found spawn one in a - local subactor. - """ - async with tractor.open_nursery() as nursery: - async with tractor.find_actor('brokerd') as portal: - if not portal: - log.info( - "No broker daemon could be found, spawning brokerd..") - portal = await nursery.start_actor( - 'brokerd', - rpc_module_paths=_data_mods, - loglevel=loglevel, - ) - yield portal - - async def stocks_quote( brokermod: ModuleType, tickers: List[str] @@ -121,3 +97,26 @@ async def bars( """ async with brokermod.get_client() as client: return await client.bars(symbol, **kwargs) + + +async def symbol_info( + brokermod: ModuleType, + symbol: str, + **kwargs, +) -> Dict[str, Dict[str, Dict[str, Any]]]: + """Return symbol info from broker. + """ + async with brokermod.get_client() as client: + return await client.symbol_info(symbol, **kwargs) + + +async def symbol_search( + brokermod: ModuleType, + symbol: str, + **kwargs, +) -> Dict[str, Dict[str, Dict[str, Any]]]: + """Return symbol info from broker. + """ + async with brokermod.get_client() as client: + # TODO: support multiple asset type concurrent searches. + return await client.search_stocks(symbol, **kwargs) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index 8d03d684..9580add8 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -12,7 +12,7 @@ import typing from typing import ( Coroutine, Callable, Dict, List, Any, Tuple, AsyncGenerator, - Sequence, + Sequence ) import contextlib from operator import itemgetter @@ -25,7 +25,7 @@ from ..log import get_logger, get_console_log from . import get_brokermod -log = get_logger('broker.data') +log = get_logger(__name__) async def wait_for_network( @@ -47,7 +47,7 @@ async def wait_for_network( continue except socket.gaierror: if not down: # only report/log network down once - log.warn(f"Network is down waiting for re-establishment...") + log.warn("Network is down waiting for re-establishment...") down = True await trio.sleep(sleep) @@ -80,24 +80,30 @@ class BrokerFeed: @tractor.msg.pub(tasks=['stock', 'option']) -async def stream_requests( +async def stream_poll_requests( get_topics: typing.Callable, get_quotes: Coroutine, - feed: BrokerFeed, rate: int = 3, # delay between quote requests diff_cached: bool = True, # only deliver "new" quotes to the queue ) -> None: """Stream requests for quotes for a set of symbols at the given ``rate`` (per second). + This routine is built for brokers who support quote polling for multiple + symbols per request. The ``get_topics()`` func is called to retreive the + set of symbols each iteration and ``get_quotes()`` is to retreive + the quotes. + A stock-broker client ``get_quotes()`` async function must be provided which returns an async quote retrieval function. - """ - broker_limit = getattr(feed.mod, '_rate_limit', float('inf')) - if broker_limit < rate: - rate = broker_limit - log.warn(f"Limiting {feed.mod.__name__} query rate to {rate}/sec") + .. note:: + This code is mostly tailored (for now) to the questrade backend. + It is currently the only broker that doesn't support streaming without + paying for data. See the note in the diffing section regarding volume + differentials which needs to be addressed in order to get cross-broker + support. + """ sleeptime = round(1. / rate, 3) _cache = {} # ticker to quote caching @@ -130,17 +136,47 @@ async def stream_requests( for quote in quotes: symbol = quote['symbol'] last = _cache.setdefault(symbol, {}) + last_volume = last.get('volume', 0) + + # find all keys that have match to a new value compared + # to the last quote received new = set(quote.items()) - set(last.items()) if new: log.info( f"New quote {quote['symbol']}:\n{new}") _cache[symbol] = quote + + # only ship diff updates and other required fields + payload = {k: quote[k] for k, v in new} + payload['symbol'] = symbol + + # if there was volume likely the last size of + # shares traded is useful info and it's possible + # that the set difference from above will disregard + # a "size" value since the same # of shares were traded + volume = payload.get('volume') + if volume: + volume_since_last_quote = volume - last_volume + assert volume_since_last_quote > 0 + payload['volume_delta'] = volume_since_last_quote + + # TODO: We can emit 2 ticks here: + # - one for the volume differential + # - one for the last known trade size + # The first in theory can be unwound and + # interpolated assuming the broker passes an + # accurate daily VWAP value. + # To make this work we need a universal ``size`` + # field that is normalized before hitting this logic. + # XXX: very questrade specific + payload['size'] = quote['lastTradeSize'] + # XXX: we append to a list for the options case where the # subscription topic (key) is the same for all # expiries even though this is uncessary for the # stock case (different topic [i.e. symbol] for each # quote). - new_quotes.setdefault(quote['key'], []).append(quote) + new_quotes.setdefault(quote['key'], []).append(payload) else: # log.debug(f"Delivering quotes:\n{quotes}") for quote in quotes: @@ -168,7 +204,7 @@ async def symbol_data(broker: str, tickers: List[str]): """Retrieve baseline symbol info from broker. """ async with get_cached_feed(broker) as feed: - return await feed.client.symbol_data(tickers) + return await feed.client.symbol_info(tickers) async def smoke_quote(get_quotes, tickers, broker): @@ -290,6 +326,7 @@ async def start_quote_stream( # do a smoke quote (note this mutates the input list and filters # out bad symbols for now) payload = await smoke_quote(get_quotes, symbols, broker) + formatter = feed.mod.format_stock_quote elif feed_type == 'option': # FIXME: yeah we need maybe a more general way to specify @@ -304,14 +341,21 @@ async def start_quote_stream( quote['symbol']: quote for quote in await get_quotes(symbols) } + formatter = feed.mod.format_option_quote - def packetizer(topic, quotes): - return {quote['symbol']: quote for quote in quotes} + sd = await feed.client.symbol_info(symbols) + # formatter = partial(formatter, symbol_data=sd) + + packetizer = partial( + feed.mod.packetizer, + formatter=formatter, + symbol_data=sd, + ) # push initial smoke quote response for client initialization await ctx.send_yield(payload) - await stream_requests( + await stream_poll_requests( # ``msg.pub`` required kwargs task_name=feed_type, @@ -320,7 +364,6 @@ async def start_quote_stream( packetizer=packetizer, # actual func args - feed=feed, get_quotes=get_quotes, diff_cached=diff_cached, rate=rate, @@ -378,15 +421,19 @@ class DataFeed: # subscribe for tickers (this performs a possible filtering # where invalid symbols are discarded) sd = await self.portal.run( - "piker.brokers.data", 'symbol_data', - broker=self.brokermod.name, tickers=symbols) + "piker.brokers.data", + 'symbol_data', + broker=self.brokermod.name, + tickers=symbols + ) self._symbol_data_cache.update(sd) if test: # stream from a local test file quote_gen = await self.portal.run( - "piker.brokers.data", 'stream_from_file', - filename=test + "piker.brokers.data", + 'stream_from_file', + filename=test, ) else: log.info(f"Starting new stream for {symbols}") diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 9d2d3892..de0d1e7a 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -3,14 +3,21 @@ Questrade API backend. """ from __future__ import annotations import inspect +import contextlib import time from datetime import datetime from functools import partial +import itertools import configparser -from typing import List, Tuple, Dict, Any, Iterator, NamedTuple +from typing import ( + List, Tuple, Dict, Any, Iterator, NamedTuple, + AsyncGenerator, + Callable, +) import arrow import trio +import tractor from async_generator import asynccontextmanager import pandas as pd import numpy as np @@ -20,8 +27,9 @@ import asks from ..calc import humanize, percent_change from . import config from ._util import resproc, BrokerError, SymbolNotFound -from ..log import get_logger, colorize_json +from ..log import get_logger, colorize_json, get_console_log from .._async_utils import async_lifo_cache +from . import get_brokermod log = get_logger(__name__) @@ -407,10 +415,10 @@ class Client: return symbols2ids - async def symbol_data(self, tickers: List[str]): - """Return symbol data for ``tickers``. + async def symbol_info(self, symbols: List[str]): + """Return symbol data for ``symbols``. """ - t2ids = await self.tickers2ids(tickers) + t2ids = await self.tickers2ids(symbols) ids = ','.join(t2ids.values()) symbols = {} for pkt in (await self.api.symbols(ids=ids))['symbols']: @@ -418,6 +426,9 @@ class Client: return symbols + # TODO: deprecate + symbol_data = symbol_info + async def quote(self, tickers: [str]): """Return stock quotes for each ticker in ``tickers``. """ @@ -598,6 +609,24 @@ class Client: f"Took {time.time() - start} seconds to retreive {len(bars)} bars") return bars + async def search_stocks( + self, + pattern: str, + # how many contracts to return + upto: int = 10, + ) -> Dict[str, str]: + details = {} + results = await self.api.search(prefix=pattern) + for result in results['symbols']: + sym = result['symbol'] + if '.' not in sym: + sym = f"{sym}.{result['listingExchange']}" + + details[sym] = result + + if len(details) == upto: + return details + # marketstore TSD compatible numpy dtype for bar _qt_bars_dt = [ @@ -839,29 +868,43 @@ def format_stock_quote( and the second is the same but with all values converted to a "display-friendly" string format. """ - last = quote['lastTradePrice'] symbol = quote['symbol'] previous = symbol_data[symbol]['prevDayClosePrice'] - change = percent_change(previous, last) - share_count = symbol_data[symbol].get('outstandingShares', None) - mktcap = share_count * last if (last and share_count) else 0 - computed = { - 'symbol': quote['symbol'], - '%': round(change, 3), - 'MC': mktcap, - # why QT do you have to be an asshole shipping null values!!! - '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3), - 'close': previous, - } + + computed = {'symbol': symbol} + last = quote.get('lastTradePrice') + if last: + change = percent_change(previous, last) + share_count = symbol_data[symbol].get('outstandingShares', None) + mktcap = share_count * last if (last and share_count) else 0 + computed.update({ + # 'symbol': quote['symbol'], + '%': round(change, 3), + 'MC': mktcap, + # why questrade do you have to be shipping null values!!! + # '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3), + 'close': previous, + }) + + vwap = quote.get('VWAP') + volume = quote.get('volume') + if volume is not None: # could be 0 + # why questrade do you have to be an asshole shipping null values!!! + computed['$ vol'] = round((vwap or 0) * (volume or 0), 3) + new = {} displayable = {} - for key, new_key in keymap.items(): - display_value = value = computed.get(key) or quote.get(key) + for key, value in itertools.chain(quote.items(), computed.items()): + new_key = keymap.get(key) + if not new_key: + continue # API servers can return `None` vals when markets are closed (weekend) value = 0 if value is None else value + display_value = value + # convert values to a displayble format using available formatting func if isinstance(new_key, tuple): new_key, func = new_key @@ -891,7 +934,8 @@ _qt_option_keys = { # "theta": ('theta', partial(round, ndigits=3)), # "vega": ('vega', partial(round, ndigits=3)), '$ vol': ('$ vol', humanize), - 'volume': ('vol', humanize), + # XXX: required key to trigger trade execution datum msg + 'volume': ('volume', humanize), # "2021-01-15T00:00:00.000000-05:00", # "isHalted": false, # "key": [ @@ -939,7 +983,7 @@ def format_option_quote( "display-friendly" string format. """ # TODO: need historical data.. - # (cause why would QT keep their quote structure consistent across + # (cause why would questrade keep their quote structure consistent across # assets..) # previous = symbol_data[symbol]['prevDayClosePrice'] # change = percent_change(previous, last) @@ -968,3 +1012,164 @@ def format_option_quote( displayable[new_key] = display_value return new, displayable + + +@asynccontextmanager +async def get_cached_client( + brokername: str, + *args, + **kwargs, +) -> 'Client': + """Get a cached broker client from the current actor's local vars. + + If one has not been setup do it and cache it. + """ + # check if a cached client is in the local actor's statespace + ss = tractor.current_actor().statespace + clients = ss.setdefault('clients', {'_lock': trio.Lock()}) + lock = clients['_lock'] + client = None + try: + log.info(f"Loading existing `{brokername}` daemon") + async with lock: + client = clients[brokername] + client._consumers += 1 + yield client + except KeyError: + log.info(f"Creating new client for broker {brokername}") + async with lock: + brokermod = get_brokermod(brokername) + exit_stack = contextlib.AsyncExitStack() + client = await exit_stack.enter_async_context( + brokermod.get_client() + ) + client._consumers = 0 + client._exit_stack = exit_stack + clients[brokername] = client + yield client + finally: + client._consumers -= 1 + if client._consumers <= 0: + # teardown the client + await client._exit_stack.aclose() + + +async def smoke_quote(get_quotes, tickers): # , broker): + """Do an initial "smoke" request for symbols in ``tickers`` filtering + out any symbols not supported by the broker queried in the call to + ``get_quotes()``. + """ + from operator import itemgetter + # TODO: trim out with #37 + ################################################# + # get a single quote filtering out any bad tickers + # NOTE: this code is always run for every new client + # subscription even when a broker quoter task is already running + # since the new client needs to know what symbols are accepted + log.warn(f"Retrieving smoke quote for symbols {tickers}") + quotes = await get_quotes(tickers) + + # report any tickers that aren't returned in the first quote + invalid_tickers = set(tickers) - set(map(itemgetter('key'), quotes)) + for symbol in invalid_tickers: + tickers.remove(symbol) + log.warn( + f"Symbol `{symbol}` not found") # by broker `{broker}`" + # ) + + # pop any tickers that return "empty" quotes + payload = {} + for quote in quotes: + symbol = quote['symbol'] + if quote is None: + log.warn( + f"Symbol `{symbol}` not found") + # XXX: not this mutates the input list (for now) + tickers.remove(symbol) + continue + + # report any unknown/invalid symbols (QT specific) + if quote.get('low52w', False) is None: + log.error( + f"{symbol} seems to be defunct") + + payload[symbol] = quote + + return payload + + # end of section to be trimmed out with #37 + ########################################### + + +# function to format packets delivered to subscribers +def packetizer( + topic: str, + quotes: Dict[str, Any], + formatter: Callable, + symbol_data: Dict[str, Any], +) -> Dict[str, Any]: + """Normalize quotes by name into dicts using broker-specific + processing. + """ + new = {} + for quote in quotes: + new[quote['symbol']], _ = formatter(quote, symbol_data) + + return new + + +@tractor.stream +async def stream_quotes( + ctx: tractor.Context, # marks this as a streaming func + symbols: List[str], + feed_type: str = 'stock', + diff_cached: bool = True, + rate: int = 3, + loglevel: str = None, + # feed_type: str = 'stock', +) -> AsyncGenerator[str, Dict[str, Any]]: + # XXX: why do we need this again? + get_console_log(tractor.current_actor().loglevel) + + async with get_cached_client('questrade') as client: + if feed_type == 'stock': + formatter = format_stock_quote + get_quotes = await stock_quoter(client, symbols) + + # do a smoke quote (note this mutates the input list and filters + # out bad symbols for now) + payload = await smoke_quote(get_quotes, list(symbols)) + else: + formatter = format_option_quote + get_quotes = await option_quoter(client, symbols) + # packetize + payload = { + quote['symbol']: quote + for quote in await get_quotes(symbols) + } + + sd = await client.symbol_info(symbols) + + # push initial smoke quote response for client initialization + await ctx.send_yield(payload) + + from .data import stream_poll_requests + + await stream_poll_requests( + + # ``msg.pub`` required kwargs + task_name=feed_type, + ctx=ctx, + topics=symbols, + packetizer=partial( + packetizer, + formatter=formatter, + symboal_data=sd, + ), + + # actual func args + get_quotes=get_quotes, + diff_cached=diff_cached, + rate=rate, + ) + log.info("Terminating stream quoter task") diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index cc528f8a..ea72b6b6 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -8,7 +8,6 @@ import tractor from ..log import get_console_log, get_logger from ..brokers import get_brokermod, config -from ..brokers.core import _data_mods log = get_logger('cli') DEFAULT_BROKER = 'questrade' @@ -17,6 +16,7 @@ _config_dir = click.get_app_dir('piker') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') _context_defaults = dict( default_map={ + # Questrade specific quote poll rates 'monitor': { 'rate': 3, }, @@ -34,6 +34,7 @@ _context_defaults = dict( def pikerd(loglevel, host, tl): """Spawn the piker broker-daemon. """ + from ..data import _data_mods get_console_log(loglevel) tractor.run_daemon( rpc_module_paths=_data_mods, @@ -64,7 +65,12 @@ def cli(ctx, broker, loglevel, configdir): }) +def _load_clis() -> None: + from ..data import marketstore as _ + from ..brokers import cli as _ # noqa + from ..ui import cli as _ # noqa + from ..watchlists import cli as _ # noqa + + # load downstream cli modules -from ..brokers import cli as _ -from ..watchlists import cli as _ -from ..data import marketstore as _ +_load_clis() diff --git a/piker/data/__init__.py b/piker/data/__init__.py new file mode 100644 index 00000000..bb8fca8b --- /dev/null +++ b/piker/data/__init__.py @@ -0,0 +1,118 @@ +""" +Data feed apis and infra. + +We provide tsdb integrations for retrieving +and storing data from your brokers as well as +sharing your feeds with other fellow pikers. +""" +from contextlib import asynccontextmanager +from importlib import import_module +from types import ModuleType +from typing import ( + Dict, List, Any, + Sequence, AsyncIterator, Optional +) + +import trio +import tractor + +from ..brokers import get_brokermod +from ..log import get_logger, get_console_log + + +log = get_logger(__name__) + +__ingestors__ = [ + 'marketstore', +] + + +def get_ingestor(name: str) -> ModuleType: + """Return the imported ingestor module by name. + """ + module = import_module('.' + name, 'piker.data') + # we only allow monkeying because it's for internal keying + module.name = module.__name__.split('.')[-1] + return module + + +_data_mods = [ + 'piker.brokers.core', + 'piker.brokers.data', +] + + +@asynccontextmanager +async def maybe_spawn_brokerd( + brokername: str, + sleep: float = 0.5, + loglevel: Optional[str] = None, + expose_mods: List = [], + **tractor_kwargs, +) -> tractor._portal.Portal: + """If no ``brokerd.{brokername}`` daemon-actor can be found, + spawn one in a local subactor and return a portal to it. + """ + if loglevel: + get_console_log(loglevel) + + tractor_kwargs['loglevel'] = loglevel + + brokermod = get_brokermod(brokername) + dname = f'brokerd.{brokername}' + async with tractor.find_actor(dname) as portal: + # WTF: why doesn't this work? + log.info(f"YOYOYO {__name__}") + if portal is not None: + yield portal + else: + log.info(f"Spawning {brokername} broker daemon") + tractor_kwargs = getattr(brokermod, '_spawn_kwargs', {}) + async with tractor.open_nursery() as nursery: + try: + # spawn new daemon + portal = await nursery.start_actor( + dname, + rpc_module_paths=_data_mods + [brokermod.__name__], + loglevel=loglevel, + **tractor_kwargs + ) + async with tractor.wait_for_actor(dname) as portal: + yield portal + finally: + # client code may block indefinitely so cancel when + # teardown is invoked + await nursery.cancel() + + +@asynccontextmanager +async def open_feed( + name: str, + symbols: Sequence[str], + loglevel: str = 'info', +) -> AsyncIterator[Dict[str, Any]]: + """Open a "data feed" which provides streamed real-time quotes. + """ + try: + mod = get_brokermod(name) + except ImportError: + mod = get_ingestormod(name) + + async with maybe_spawn_brokerd( + mod.name, + loglevel=loglevel, + ) as portal: + stream = await portal.run( + mod.__name__, + 'stream_quotes', + symbols=symbols, + ) + # Feed is required to deliver an initial quote asap. + # TODO: should we timeout and raise a more explicit error? + # with trio.fail_after(5): + with trio.fail_after(float('inf')): + # Retreive initial quote for each symbol + # such that consumer code can know the data layout + first_quote = await stream.__anext__() + log.info(f"Received first quote {first_quote}") + yield (first_quote, stream) diff --git a/piker/data/marketstore.py b/piker/data/marketstore.py new file mode 100644 index 00000000..84e62ecb --- /dev/null +++ b/piker/data/marketstore.py @@ -0,0 +1,326 @@ +""" +``marketstore`` integration. + +- client management routines +- ticK data ingest routines +- websocket client for subscribing to write triggers +- todo: tick sequence stream-cloning for testing +- todo: docker container management automation +""" +from contextlib import asynccontextmanager +from typing import Dict, Any, List, Callable, Tuple +import time +from math import isnan + +import msgpack +import numpy as np +import pandas as pd +import pymarketstore as pymkts +from trio_websocket import open_websocket_url + +from ..log import get_logger, get_console_log +from ..data import open_feed + + +log = get_logger(__name__) + +_tick_tbk_ids: Tuple[str, str] = ('1Sec', 'TICK') +_tick_tbk: str = '{}/' + '/'.join(_tick_tbk_ids) +_url: str = 'http://localhost:5993/rpc' +_quote_dt = [ + # these two are required for as a "primary key" + ('Epoch', 'i8'), + ('Nanoseconds', 'i4'), + + ('Tick', 'i4'), # (-1, 0, 1) = (on bid, same, on ask) + # ('fill_time', 'f4'), + ('Last', 'f4'), + ('Bid', 'f4'), + ('Bsize', 'i8'), + ('Asize', 'i8'), + ('Ask', 'f4'), + ('Size', 'i8'), + ('Volume', 'i8'), + # ('brokerd_ts', 'i64'), + # ('VWAP', 'f4') +] +_quote_tmp = {}.fromkeys(dict(_quote_dt).keys(), np.nan) +_tick_map = { + 'Up': 1, + 'Equal': 0, + 'Down': -1, + None: np.nan, +} + + +class MarketStoreError(Exception): + "Generic marketstore client error" + + +def err_on_resp(response: dict) -> None: + """Raise any errors found in responses from client request. + """ + responses = response['responses'] + if responses is not None: + for r in responses: + err = r['error'] + if err: + raise MarketStoreError(err) + + +def quote_to_marketstore_structarray( + quote: Dict[str, Any], + last_fill: str, +) -> np.array: + """Return marketstore writeable structarray from quote ``dict``. + """ + if last_fill: + # new fill bby + now = timestamp(last_fill) + else: + # this should get inserted upstream by the broker-client to + # subtract from IPC latency + now = time.time_ns() + + secs, ns = now / 10**9, now % 10**9 + + # pack into List[Tuple[str, Any]] + array_input = [] + + # insert 'Epoch' entry first and then 'Nanoseconds'. + array_input.append(int(secs)) + array_input.append(int(ns)) + + # append remaining fields + for name, dt in _quote_dt[2:]: + if 'f' in dt: + none = np.nan + else: + # for ``np.int`` we use 0 as a null value + none = 0 + + # casefold? see https://github.com/alpacahq/marketstore/issues/324 + val = quote.get(name.casefold(), none) + array_input.append(val) + + return np.array([tuple(array_input)], dtype=_quote_dt) + + +def timestamp(datestr: str) -> int: + """Return marketstore compatible 'Epoch' integer in nanoseconds + from a date formatted str. + """ + return int(pd.Timestamp(datestr).value) + + +def mk_tbk(keys: Tuple[str, str, str]) -> str: + """Generate a marketstore table key from a tuple. + + Converts, + ``('SPY', '1Sec', 'TICK')`` -> ``"SPY/1Sec/TICK"``` + """ + return '{}/' + '/'.join(keys) + + +class Client: + """Async wrapper around the alpaca ``pymarketstore`` sync client. + + This will server as the shell for building out a proper async client + that isn't horribly documented and un-tested.. + """ + def __init__(self, url: str): + self._client = pymkts.Client(url) + + async def _invoke( + self, + meth: Callable, + *args, + **kwargs, + ) -> Any: + return err_on_resp(meth(*args, **kwargs)) + + async def destroy( + self, + tbk: Tuple[str, str, str], + ) -> None: + return await self._invoke(self._client.destroy, mk_tbk(tbk)) + + async def list_symbols( + self, + tbk: str, + ) -> List[str]: + return await self._invoke(self._client.list_symbols, mk_tbk(tbk)) + + async def write( + self, + symbol: str, + array: np.ndarray, + ) -> None: + start = time.time() + await self._invoke( + self._client.write, + array, + _tick_tbk.format(symbol), + isvariablelength=True + ) + log.debug(f"{symbol} write time (s): {time.time() - start}") + + def query( + self, + symbol, + tbk: Tuple[str, str] = _tick_tbk_ids, + ) -> pd.DataFrame: + # XXX: causes crash + # client.query(pymkts.Params(symbol, '*', 'OHCLV' + result = self._client.query( + pymkts.Params(symbol, *tbk), + ) + return result.first().df() + + +@asynccontextmanager +async def get_client( + url: str = _url, +) -> Client: + yield Client(url) + + +async def ingest_quote_stream( + symbols: List[str], + brokername: str, + tries: int = 1, + loglevel: str = None, +) -> None: + """Ingest a broker quote stream into marketstore in (sampled) tick format. + """ + async with open_feed( + brokername, + symbols, + loglevel=loglevel, + ) as (first_quotes, qstream): + + quote_cache = first_quotes.copy() + + async with get_client() as ms_client: + + # start ingest to marketstore + async for quotes in qstream: + log.info(quotes) + for symbol, quote in quotes.items(): + + # remap tick strs to ints + quote['tick'] = _tick_map[quote.get('tick', 'Equal')] + + # check for volume update (i.e. did trades happen + # since last quote) + new_vol = quote.get('volume', None) + if new_vol is None: + log.debug(f"No fills for {symbol}") + if new_vol == quote_cache.get('volume'): + # should never happen due to field diffing + # on sender side + log.error( + f"{symbol}: got same volume as last quote?") + + quote_cache.update(quote) + + a = quote_to_marketstore_structarray( + quote, + # TODO: check this closer to the broker query api + last_fill=quote.get('fill_time', '') + ) + await ms_client.write(symbol, a) + + +async def stream_quotes( + symbols: List[str], + host: str = 'localhost', + port: int = 5993, + diff_cached: bool = True, + loglevel: str = None, +) -> None: + """Open a symbol stream from a running instance of marketstore and + log to console. + """ + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) + + tbks: Dict[str, str] = {sym: f"{sym}/*/*" for sym in symbols} + + async with open_websocket_url(f'ws://{host}:{port}/ws') as ws: + # send subs topics to server + resp = await ws.send_message( + msgpack.dumps({'streams': list(tbks.values())}) + ) + log.info(resp) + + async def recv() -> Dict[str, Any]: + return msgpack.loads((await ws.get_message()), encoding='utf-8') + + streams = (await recv())['streams'] + log.info(f"Subscribed to {streams}") + + _cache = {} + + while True: + msg = await recv() + + # unpack symbol and quote data + # key is in format ``//`` + symbol = msg['key'].split('/')[0] + data = msg['data'] + + # calc time stamp(s) + s, ns = data.pop('Epoch'), data.pop('Nanoseconds') + ts = s * 10**9 + ns + data['broker_fill_time_ns'] = ts + + quote = {} + for k, v in data.items(): + if isnan(v): + continue + + quote[k.lower()] = v + + quote['symbol'] = symbol + + quotes = {} + + if diff_cached: + last = _cache.setdefault(symbol, {}) + new = set(quote.items()) - set(last.items()) + if new: + log.info(f"New quote {quote['symbol']}:\n{new}") + + # only ship diff updates and other required fields + payload = {k: quote[k] for k, v in new} + payload['symbol'] = symbol + + # if there was volume likely the last size of + # shares traded is useful info and it's possible + # that the set difference from above will disregard + # a "size" value since the same # of shares were traded + size = quote.get('size') + volume = quote.get('volume') + if size and volume: + new_volume_since_last = max( + volume - last.get('volume', 0), 0) + log.warning( + f"NEW VOLUME {symbol}:{new_volume_since_last}") + payload['size'] = size + payload['last'] = quote.get('last') + + # XXX: we append to a list for the options case where the + # subscription topic (key) is the same for all + # expiries even though this is uncessary for the + # stock case (different topic [i.e. symbol] for each + # quote). + quotes.setdefault(symbol, []).append(payload) + + # update cache + _cache[symbol].update(quote) + else: + quotes = {symbol: [{key.lower(): val for key, val in quote.items()}]} + + if quotes: + yield quotes diff --git a/piker/ui/__init__.py b/piker/ui/__init__.py deleted file mode 100644 index bc6e6ac9..00000000 --- a/piker/ui/__init__.py +++ /dev/null @@ -1,15 +0,0 @@ -""" -Stuff for your eyes. -""" -import os -import sys - -# XXX clear all flags at import to avoid upsetting -# ol' kivy see: https://github.com/kivy/kivy/issues/4225 -# though this is likely a ``click`` problem -sys.argv[1:] = [] - -# use the trio async loop -os.environ['KIVY_EVENTLOOP'] = 'trio' -import kivy -kivy.require('1.10.0') diff --git a/piker/ui/cli.py b/piker/ui/cli.py new file mode 100644 index 00000000..a869e307 --- /dev/null +++ b/piker/ui/cli.py @@ -0,0 +1,105 @@ +""" +Console interface to UI components. +""" +from functools import partial +import os +import click +import tractor + +from ..cli import cli +from .. import watchlists as wl +from ..data import maybe_spawn_brokerd + + +_config_dir = click.get_app_dir('piker') +_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') + + +def _kivy_import_hack(): + # Command line hacks to make it work. + # See the pkg mod. + from .kivy import kivy # noqa + + +@cli.command() +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--rate', '-r', default=3, help='Quote rate limit') +@click.option('--test', '-t', help='Test quote stream file') +@click.option('--dhost', '-dh', default='127.0.0.1', + help='Daemon host address to connect to') +@click.argument('name', nargs=1, required=True) +@click.pass_obj +def monitor(config, rate, name, dhost, test, tl): + """Start a real-time watchlist UI + """ + # global opts + brokermod = config['brokermod'] + loglevel = config['loglevel'] + log = config['log'] + + watchlist_from_file = wl.ensure_watchlists(_watchlists_data_path) + watchlists = wl.merge_watchlist(watchlist_from_file, wl._builtins) + tickers = watchlists[name] + if not tickers: + log.error(f"No symbols found for watchlist `{name}`?") + return + + _kivy_import_hack() + from .kivy.monitor import _async_main + + async def main(tries): + async with maybe_spawn_brokerd( + brokername=brokermod.name, + tries=tries, loglevel=loglevel + ) as portal: + # run app "main" + await _async_main( + name, portal, tickers, + brokermod, rate, test=test, + ) + + tractor.run( + partial(main, tries=1), + name='monitor', + loglevel=loglevel if tl else None, + rpc_module_paths=['piker.ui.kivy.monitor'], + start_method='forkserver', + ) + + +@cli.command() +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--date', '-d', help='Contracts expiry date') +@click.option('--test', '-t', help='Test quote stream file') +@click.option('--rate', '-r', default=1, help='Logging level') +@click.argument('symbol', required=True) +@click.pass_obj +def optschain(config, symbol, date, tl, rate, test): + """Start an option chain UI + """ + # global opts + loglevel = config['loglevel'] + brokername = config['broker'] + + _kivy_import_hack() + from .kivy.option_chain import _async_main + + async def main(tries): + async with maybe_spawn_brokerd( + tries=tries, loglevel=loglevel + ): + # run app "main" + await _async_main( + symbol, + brokername, + rate=rate, + loglevel=loglevel, + test=test, + ) + + tractor.run( + partial(main, tries=1), + name='kivy-options-chain', + loglevel=loglevel if tl else None, + start_method='forkserver', + ) diff --git a/piker/ui/kivy/__init__.py b/piker/ui/kivy/__init__.py index e69de29b..f0aa8b68 100644 --- a/piker/ui/kivy/__init__.py +++ b/piker/ui/kivy/__init__.py @@ -0,0 +1,15 @@ +""" +Legacy kivy components. +""" +import os +import sys + +# XXX clear all flags at import to avoid upsetting +# ol' kivy see: https://github.com/kivy/kivy/issues/4225 +# though this is likely a ``click`` problem +sys.argv[1:] = [] + +# use the trio async loop +os.environ['KIVY_EVENTLOOP'] = 'trio' +import kivy +kivy.require('1.10.0') diff --git a/piker/ui/monitor.py b/piker/ui/monitor.py index a7778fbf..cd73c3fa 100644 --- a/piker/ui/monitor.py +++ b/piker/ui/monitor.py @@ -51,23 +51,25 @@ async def update_quotes( chngcell = row.get_cell('%') # determine daily change color - color = colorcode('gray') percent_change = record.get('%') - if percent_change: - daychange = float(record['%']) + if percent_change is not None and percent_change != chngcell: + daychange = float(percent_change) if daychange < 0.: color = colorcode('red2') elif daychange > 0.: color = colorcode('forestgreen') + else: + color = colorcode('gray') - # update row header and '%' cell text color - if chngcell: - chngcell.color = color - hdrcell.color = color # if the cell has been "highlighted" make sure to change its color if hdrcell.background_color != [0]*4: hdrcell.background_color = color + # update row header and '%' cell text color + chngcell.color = color + hdrcell.color = color + + # briefly highlight bg of certain cells on each trade execution unflash = set() tick_color = None @@ -123,10 +125,13 @@ async def update_quotes( record, displayable = formatter( quote, symbol_data=symbol_data) + # don't red/green the header cell in ``row.update()`` + record.pop('symbol') + # determine if sorting should happen sort_key = table.sort_key - new = record[sort_key] last = row.get_field(sort_key) + new = record.get(sort_key, last) if new != last: to_sort.add(row.widget) diff --git a/piker/ui/tabular.py b/piker/ui/tabular.py index d995848a..27c1e091 100644 --- a/piker/ui/tabular.py +++ b/piker/ui/tabular.py @@ -340,6 +340,7 @@ class Row(HoverBehavior, GridLayout): gray = colorcode('gray') fgreen = colorcode('forestgreen') red = colorcode('red2') + for key, val in record.items(): last = self.get_field(key) color = gray @@ -361,7 +362,7 @@ class Row(HoverBehavior, GridLayout): if color != gray: cells[key] = cell - self._last_record = record + self._last_record.update(record) return cells # mouse over handlers diff --git a/setup.py b/setup.py index 1bb57932..81e3d56a 100755 --- a/setup.py +++ b/setup.py @@ -30,7 +30,6 @@ setup( license='AGPLv3', author='Tyler Goodlet', maintainer='Tyler Goodlet', - maintainer_email='tgoodlet@gmail.com', url='https://github.com/pikers/piker', platforms=['linux'], packages=[ @@ -46,13 +45,27 @@ setup( ] }, install_requires=[ - 'click', 'colorlog', 'trio', 'attrs', 'async_generator', - 'pygments', 'cython', 'asks', 'pandas', 'msgpack', + 'click', + 'colorlog', + 'trio', + 'attrs', + 'async_generator', + 'pygments', + + # brokers + 'asks', + 'ib_insync', + + # numerics + 'arrow', # better datetimes + 'cython', + 'numpy', + 'pandas', + + # tsdbs + 'pymarketstore', #'kivy', see requirement.txt; using a custom branch atm ], - extras_require={ - 'questrade': ['asks'], - }, tests_require=['pytest'], python_requires=">=3.7", # literally for ``datetime.datetime.fromisoformat``... keywords=["async", "trading", "finance", "quant", "charting"], @@ -61,7 +74,7 @@ setup( 'License :: OSI Approved :: Mozilla Public License 2.0 (MPL 2.0)', 'Operating System :: POSIX :: Linux', "Programming Language :: Python :: Implementation :: CPython", - "Programming Language :: Python :: Implementation :: PyPy", + # "Programming Language :: Python :: Implementation :: PyPy", "Programming Language :: Python :: 3 :: Only", "Programming Language :: Python :: 3.5", "Programming Language :: Python :: 3.6",