diff --git a/piker/brokers/data.py b/piker/brokers/data.py index e35b0dec..8f321659 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -90,6 +90,7 @@ async def stream_quotes( new_quotes.append(quote) else: new_quotes = quotes + log.info(f"Delivering quotes:\n{quotes}") yield new_quotes @@ -108,6 +109,9 @@ async def stream_quotes( await trio.sleep(delay) +# TODO: at this point probably just just make this a class and +# a lot of these functions should be methods. It will definitely +# make stateful UI apps easier to implement class DataFeed(typing.NamedTuple): """A per broker "data feed" container. @@ -116,8 +120,9 @@ class DataFeed(typing.NamedTuple): """ mod: ModuleType client: object + exit_stack: contextlib.AsyncExitStack quoter_keys: List[str] = ['stock', 'option'] - tasks: Dict[str, trio._core._run.Task] = dict.fromkeys( + tasks: Dict[str, trio.Event] = dict.fromkeys( quoter_keys, False) quoters: Dict[str, typing.Coroutine] = {} subscriptions: Dict[str, Dict[str, set]] = {'option': {}, 'stock': {}} @@ -141,7 +146,9 @@ async def fan_out_to_chans( async def request(): """Get quotes for current symbol subscription set. """ - return await get_quotes(list(symbols2chans.keys())) + symbols = list(symbols2chans.keys()) + # subscription can be changed at any time + return await get_quotes(symbols) if symbols else () async for quotes in stream_quotes( feed.mod, request, rate, @@ -149,18 +156,16 @@ async def fan_out_to_chans( ): chan_payloads = {} for quote in quotes: - # is this too QT specific? - symbol = quote['symbol'] - # set symbol quotes for each subscriber + packet = {quote['symbol']: quote} for chan, cid in symbols2chans.get(quote['key'], set()): chan_payloads.setdefault( - chan, + (chan, cid), {'yield': {}, 'cid': cid} - )['yield'][symbol] = quote + )['yield'].update(packet) # deliver to each subscriber (fan out) if chan_payloads: - for chan, payload in chan_payloads.items(): + for (chan, cid), payload in chan_payloads.items(): try: await chan.send(payload) except ( @@ -233,13 +238,19 @@ async def smoke_quote(get_quotes, tickers, broker): ########################################### -async def modify_quote_stream(broker, feed_type, symbols, chan=None, cid=None): +def modify_quote_stream(broker, feed_type, symbols, chan, cid): """Absolute symbol subscription list for each quote stream. Effectively a symbol subscription api. """ log.info(f"{chan} changed symbol subscription to {symbols}") - feed = await get_cached_feed(broker) + ss = tractor.current_actor().statespace + feed = ss['feeds'].get(broker) + if feed is None: + raise RuntimeError( + "`get_cached_feed()` must be called before modifying its stream" + ) + symbols2chans = feed.subscriptions[feed_type] # update map from each symbol to requesting client's chan for ticker in symbols: @@ -254,7 +265,7 @@ async def modify_quote_stream(broker, feed_type, symbols, chan=None, cid=None): chanset = symbols2chans.get(ticker) # XXX: cid will be different on unsub call for item in chanset.copy(): - if chan in item: + if (chan, cid) == item: chanset.discard(item) if not chanset: @@ -271,8 +282,6 @@ async def get_cached_feed( ss = tractor.current_actor().statespace feeds = ss.setdefault('feeds', {'_lock': trio.Lock()}) lock = feeds['_lock'] - feed_stacks = ss.setdefault('feed_stacks', {}) - feed_stack = feed_stacks.setdefault(brokername, contextlib.AsyncExitStack()) async with lock: try: feed = feeds[brokername] @@ -281,11 +290,13 @@ async def get_cached_feed( except KeyError: log.info(f"Creating new client for broker {brokername}") brokermod = get_brokermod(brokername) - client = await feed_stack.enter_async_context( + exit_stack = contextlib.AsyncExitStack() + client = await exit_stack.enter_async_context( brokermod.get_client()) feed = DataFeed( mod=brokermod, client=client, + exit_stack=exit_stack, ) feeds[brokername] = feed return feed @@ -298,6 +309,7 @@ async def start_quote_stream( diff_cached: bool = True, chan: tractor.Channel = None, cid: str = None, + rate: int = 3, ) -> None: """Handle per-broker quote stream subscriptions using a "lazy" pub-sub pattern. @@ -306,12 +318,10 @@ async def start_quote_stream( Since most brokers seems to support batch quote requests we limit to one task per process for now. """ - actor = tractor.current_actor() # set log level after fork get_console_log(actor.loglevel) # pull global vars from local actor - ss = actor.statespace symbols = list(symbols) log.info( f"{chan.uid} subscribed to {broker} for symbols {symbols}") @@ -337,38 +347,71 @@ async def start_quote_stream( 'option', await feed.mod.option_quoter(feed.client, symbols) ) - - # update map from each symbol to requesting client's chan - await modify_quote_stream(broker, feed_type, symbols, chan, cid) - + payload = { + quote['symbol']: quote + for quote in await get_quotes(symbols) + } + # push initial smoke quote response for client initialization + await chan.send({'yield': payload, 'cid': cid}) try: - if not feed.tasks.get(feed_type): - # no data feeder task yet; so start one - respawn = True + # update map from each symbol to requesting client's chan + modify_quote_stream(broker, feed_type, symbols, chan, cid) + + # event indicating that task was started and then killed + task_is_dead = feed.tasks.get(feed_type) + if task_is_dead is False: + task_is_dead = trio.Event() + task_is_dead.set() + feed.tasks[feed_type] = task_is_dead + + if not task_is_dead.is_set(): + # block and let existing feed task deliver + # stream data until it is cancelled in which case + # we'll take over and spawn it again + await task_is_dead.wait() + # client channel was likely disconnected + # but we still want to keep the broker task + # alive if there are other consumers (including + # ourselves) + if any(symbols2chans.values()): + log.warn( + f"Data feed task for {feed.mod.name} was cancelled but" + f" there are still active clients, respawning") + + # no data feeder task yet; so start one + respawn = True + while respawn: + respawn = False log.info(f"Spawning data feed task for {feed.mod.name}") - while respawn: - respawn = False - try: - async with trio.open_nursery() as nursery: - nursery.start_soon( - partial( - fan_out_to_chans, feed, get_quotes, - symbols2chans, - diff_cached=diff_cached, - cid=cid - ) + try: + async with trio.open_nursery() as nursery: + nursery.start_soon( + partial( + fan_out_to_chans, feed, get_quotes, + symbols2chans, + diff_cached=diff_cached, + cid=cid, + rate=rate, ) - feed.tasks[feed_type] = True - except trio.BrokenResourceError: - log.exception("Respawning failed data feed task") - respawn = True - # unblocks when no more symbols subscriptions exist and the - # quote streamer task terminates (usually because another call - # was made to `modify_quoter` to unsubscribe from streaming - # symbols) + ) + # it's alive! + task_is_dead.clear() + + except trio.BrokenResourceError: + log.exception("Respawning failed data feed task") + respawn = True + + # unblocks when no more symbols subscriptions exist and the + # quote streamer task terminates (usually because another call + # was made to `modify_quoter` to unsubscribe from streaming + # symbols) finally: log.info(f"Terminated {feed_type} quoter task for {feed.mod.name}") - feed.tasks.pop(feed_type) + task_is_dead.set() + + # if we're cancelled externally unsubscribe our quote feed + modify_quote_stream(broker, feed_type, [], chan, cid) + # if there are truly no more subscriptions with this broker # drop from broker subs dict if not any(symbols2chans.values()): @@ -376,7 +419,7 @@ async def start_quote_stream( # broker2symbolsubs.pop(broker, None) # destroy the API client - await feed_stack.aclose() + await feed.exit_stack.aclose() async def stream_to_file( diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 330580af..c3fb7586 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -5,7 +5,6 @@ import time from datetime import datetime from functools import partial import configparser -from operator import itemgetter from typing import List, Tuple, Dict, Any, Iterator, NamedTuple import trio @@ -25,7 +24,10 @@ log = get_logger(__name__) _refresh_token_ep = 'https://login.questrade.com/oauth2/' _version = 'v1' -_rate_limit = 4 # queries/sec + +# stock queries/sec +# it seems 4 rps is best we can do total +_rate_limit = 4 class QuestradeError(Exception): @@ -90,7 +92,7 @@ class _API: async def option_quotes( self, - contracts: Dict[ContractsKey, Dict[int, dict]], + contracts: Dict[ContractsKey, Dict[int, dict]] = {}, option_ids: List[int] = [], # if you don't want them all ) -> dict: """Retrieve option chain quotes for all option ids or by filter(s). @@ -105,6 +107,8 @@ class _API: ] resp = await self._sess.post( path=f'/markets/quotes/options', + # XXX: b'{"code":1024,"message":"The size of the array requested is not valid: optionIds"}' + # ^ what I get when trying to use too many ids manually... json={'filters': filters, 'optionIds': option_ids} ) return resproc(resp, log)['optionQuotes'] @@ -123,7 +127,8 @@ class Client: self.access_data = {} self._reload_config(config) self._symbol_cache: Dict[str, int] = {} - self._contracts2expiries = {} + self._optids2contractinfo = {} + self._contract2ids = {} def _reload_config(self, config=None, **kwargs): log.warn("Reloading access config data") @@ -312,17 +317,44 @@ class Client: contracts.items(), key=lambda item: item[0].expiry ): - by_key[ - ContractsKey( - key.symbol, - key.id, - # converting back - maybe just do this initially? - key.expiry.isoformat(timespec='microseconds'), - ) - ] = { - item['strikePrice']: item for item in - byroot['chainPerRoot'][0]['chainPerStrikePrice'] - } + for chain in byroot['chainPerRoot']: + optroot = chain['optionRoot'] + + # handle QTs "adjusted contracts" (aka adjusted for + # the underlying in some way; usually has a '(1)' in + # the expiry key in their UI) + adjusted_contracts = optroot not in key.symbol + tail = optroot[len(key.symbol):] + suffix = '-' + tail if adjusted_contracts else '' + + by_key[ + ContractsKey( + key.symbol + suffix, + key.id, + # converting back - maybe just do this initially? + key.expiry.isoformat(timespec='microseconds'), + ) + ] = { + item['strikePrice']: item for item in + chain['chainPerStrikePrice'] + } + + # fill out contract id to strike expiry map + for tup, bystrikes in by_key.items(): + for strike, ids in bystrikes.items(): + for key, contract_type in ( + ('callSymbolId', 'call'), ('putSymbolId', 'put') + ): + contract_int_id = ids[key] + self._optids2contractinfo[contract_int_id] = { + 'strike': strike, + 'expiry': tup.expiry, + 'contract_type': contract_type, + 'contract_key': tup, + } + # store ids per contract + self._contract2ids.setdefault( + tup, set()).add(contract_int_id) return by_key async def option_chains( @@ -332,16 +364,31 @@ class Client: ) -> Dict[str, Dict[str, Dict[str, Any]]]: """Return option chain snap quote for each ticker in ``symbols``. """ - batch = [] - for key, bystrike in contracts.items(): - quotes = await self.api.option_quotes({key: bystrike}) - for quote in quotes: - # index by .symbol, .expiry since that's what - # a subscriber (currently) sends initially - quote['key'] = (key[0], key[2]) - batch.extend(quotes) + quotes = await self.api.option_quotes(contracts=contracts) + # XXX the below doesn't work so well due to the symbol count + # limit per quote request + # quotes = await self.api.option_quotes(option_ids=list(contract_ids)) + for quote in quotes: + id = quote['symbolId'] + contract_info = self._optids2contractinfo[id].copy() + key = contract_info.pop('contract_key') - return batch + # XXX TODO: this currently doesn't handle adjusted contracts + # (i.e. ones that we stick a '(1)' after) + + # index by .symbol, .expiry since that's what + # a subscriber (currently) sends initially + quote['key'] = (key.symbol, key.expiry) + + # update with expiry and strike (Obviously the + # QT api designers are using some kind of severely + # stupid disparate table system where they keep + # contract info in a separate table from the quote format + # keys. I'm really not surprised though - windows shop..) + # quote.update(self._optids2contractinfo[quote['symbolId']]) + quote.update(contract_info) + + return quotes async def token_refresher(client): @@ -394,7 +441,8 @@ async def get_client() -> Client: try: log.debug("Check time to ensure access token is valid") try: - await client.api.time() + # await client.api.time() + await client.quote(['RY.TO']) except Exception: # access token is likely no good log.warn(f"Access token {client.access_data['access_token']} seems" @@ -471,18 +519,17 @@ async def option_quoter(client: Client, tickers: List[str]): if isinstance(tickers[0], tuple): datetime.fromisoformat(tickers[0][1]) else: - log.warn(f"Ignoring option quoter call with {tickers}") - # TODO make caller always check that a quoter has been set - return + raise ValueError(f'Option subscription format is (symbol, expiry)') @async_lifo_cache(maxsize=128) - async def get_contract_by_date(sym_date_pairs: Tuple[Tuple[str, str]]): + async def get_contract_by_date( + sym_date_pairs: Tuple[Tuple[str, str]], + ): """For each tuple, ``(symbol_date_1, symbol_date_2, ... , symbol_date_n)`` return a contract dict. """ - symbols = map(itemgetter(0), sym_date_pairs) - dates = map(itemgetter(1), sym_date_pairs) + symbols, dates = zip(*sym_date_pairs) contracts = await client.get_all_contracts(symbols) selected = {} for key, val in contracts.items(): @@ -536,11 +583,11 @@ _qt_stock_keys = { 'bidSize': 'bsize', 'askSize': 'asize', 'VWAP': ('VWAP', partial(round, ndigits=3)), - 'mktcap': ('mktcap', humanize), + 'MC': ('MC', humanize), '$ vol': ('$ vol', humanize), 'volume': ('vol', humanize), - 'close': 'close', - 'openPrice': 'open', + # 'close': 'close', + # 'openPrice': 'open', 'lowPrice': 'low', 'highPrice': 'high', # 'low52w': 'low52w', # put in info widget @@ -556,15 +603,15 @@ _qt_stock_keys = { # BidAskLayout columns which will contain three cells the first stacked on top # of the other 2 -_bidasks = { +_stock_bidasks = { 'last': ['bid', 'ask'], 'size': ['bsize', 'asize'], 'VWAP': ['low', 'high'], - 'mktcap': ['vol', '$ vol'], + 'vol': ['MC', '$ vol'], } -def format_quote( +def format_stock_quote( quote: dict, symbol_data: dict, keymap: dict = _qt_stock_keys, @@ -586,7 +633,7 @@ def format_quote( computed = { 'symbol': quote['symbol'], '%': round(change, 3), - 'mktcap': mktcap, + 'MC': mktcap, # why QT do you have to be an asshole shipping null values!!! '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3), 'close': previous, @@ -609,3 +656,100 @@ def format_quote( displayable[new_key] = display_value return new, displayable + + +_qt_option_keys = { + "lastTradePrice": 'last', + "askPrice": 'ask', + "bidPrice": 'bid', + "lastTradeSize": 'size', + "bidSize": 'bsize', + "askSize": 'asize', + 'VWAP': ('VWAP', partial(round, ndigits=3)), + "lowPrice": 'low', + "highPrice": 'high', + # "expiry": "expiry", + # "delay": 0, + "delta": ('delta', partial(round, ndigits=3)), + # "gamma": ('gama', partial(round, ndigits=3)), + # "rho": ('rho', partial(round, ndigits=3)), + # "theta": ('theta', partial(round, ndigits=3)), + # "vega": ('vega', partial(round, ndigits=3)), + '$ vol': ('$ vol', humanize), + 'volume': ('vol', humanize), + # "2021-01-15T00:00:00.000000-05:00", + # "isHalted": false, + # "key": [ + # "APHA.TO", + # "2021-01-15T00:00:00.000000-05:00" + # ], + # "lastTradePriceTrHrs": null, + # "lastTradeTick": 'tick', + "lastTradeTime": 'time', + "openInterest": 'oi', + "openPrice": 'open', + # "strike": 'strike', + # "symbol": "APHA15Jan21P8.00.MX", + # "symbolId": 23881868, + # "underlying": "APHA.TO", + # "underlyingId": 8297492, + "symbol": 'symbol', + "contract_type": 'contract_type', + "volatility": ( + 'IV %', + lambda v: '{}'.format(round(v, ndigits=2)) + ), + "strike": 'strike', +} + +_option_bidasks = { + 'last': ['bid', 'ask'], + 'size': ['bsize', 'asize'], + 'VWAP': ['low', 'high'], + 'vol': ['oi', '$ vol'], +} + + +def format_option_quote( + quote: dict, + symbol_data: dict, + keymap: dict = _qt_option_keys, +) -> Tuple[dict, dict]: + """Remap a list of quote dicts ``quotes`` using the mapping of old keys + -> new keys ``keymap`` returning 2 dicts: one with raw data and the other + for display. + + Returns 2 dicts: first is the original values mapped by new keys, + and the second is the same but with all values converted to a + "display-friendly" string format. + """ + # TODO: need historical data.. + # (cause why would QT keep their quote structure consistent across + # assets..) + # previous = symbol_data[symbol]['prevDayClosePrice'] + # change = percent_change(previous, last) + computed = { + # why QT do you have to be an asshole shipping null values!!! + '$ vol': round((quote['VWAP'] or 0) * (quote['volume'] or 0), 3), + # '%': round(change, 3), + # 'close': previous, + } + new = {} + displayable = {} + + # structuring and normalization + for key, new_key in keymap.items(): + display_value = value = computed.get(key) or quote.get(key) + + # API servers can return `None` vals when markets are closed (weekend) + value = 0 if value is None else value + + # convert values to a displayble format using available formatting func + if isinstance(new_key, tuple): + new_key, func = new_key + display_value = func(value) if value else value + + new[new_key] = value + displayable[new_key] = display_value + + return new, displayable diff --git a/piker/cli.py b/piker/cli.py index 6dd3b327..d980dea3 100644 --- a/piker/cli.py +++ b/piker/cli.py @@ -22,6 +22,10 @@ DEFAULT_BROKER = 'robinhood' _config_dir = click.get_app_dir('piker') _watchlists_data_path = os.path.join(_config_dir, 'watchlists.json') +_data_mods = [ + 'piker.brokers.core', + 'piker.brokers.data', +] @click.command() @@ -33,7 +37,7 @@ def pikerd(loglevel, host, tl): """ get_console_log(loglevel) tractor.run_daemon( - rpc_module_paths=['piker.brokers.data'], + rpc_module_paths=_data_mods, name='brokerd', loglevel=loglevel if tl else None, ) @@ -133,7 +137,7 @@ async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None): "No broker daemon could be found, spawning brokerd..") portal = await nursery.start_actor( 'brokerd', - rpc_module_paths=['piker.brokers.data'], + rpc_module_paths=_data_mods, loglevel=loglevel, ) yield portal @@ -144,7 +148,7 @@ async def maybe_spawn_brokerd_as_subactor(sleep=0.5, tries=10, loglevel=None): help='Broker backend to use') @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--tl', is_flag=True, help='Enable tractor logging') -@click.option('--rate', '-r', default=5, help='Quote rate limit') +@click.option('--rate', '-r', default=3, help='Quote rate limit') @click.option('--test', '-t', help='Test quote stream file') @click.option('--dhost', '-dh', default='127.0.0.1', help='Daemon host address to connect to') @@ -174,8 +178,9 @@ def monitor(loglevel, broker, rate, name, dhost, test, tl): tractor.run( partial(main, tries=1), - name='kivy-monitor', + name='monitor', loglevel=loglevel if tl else None, + rpc_module_paths=['piker.ui.monitor'], ) @@ -358,3 +363,38 @@ def optsquote(loglevel, broker, symbol, df_output, date): click.echo(df) else: click.echo(colorize_json(quotes)) + + +@cli.command() +@click.option('--broker', '-b', default=DEFAULT_BROKER, + help='Broker backend to use') +@click.option('--loglevel', '-l', default='warning', help='Logging level') +@click.option('--tl', is_flag=True, help='Enable tractor logging') +@click.option('--date', '-d', help='Contracts expiry date') +@click.option('--test', '-t', help='Test quote stream file') +@click.option('--rate', '-r', default=1, help='Logging level') +@click.argument('symbol', required=True) +def optschain(loglevel, broker, symbol, date, tl, rate, test): + """Start the real-time option chain UI. + """ + from .ui.option_chain import _async_main + log = get_console_log(loglevel) # activate console logging + brokermod = get_brokermod(broker) + + async def main(tries): + async with maybe_spawn_brokerd_as_subactor( + tries=tries, loglevel=loglevel + ) as portal: + # run app "main" + await _async_main( + symbol, portal, + brokermod, + rate=rate, + test=test, + ) + + tractor.run( + partial(main, tries=1), + name='kivy-options-chain', + loglevel=loglevel if tl else None, + ) diff --git a/piker/ui/kivy/mouse_over.py b/piker/ui/kivy/mouse_over.py index 80a67d63..c9ad149b 100644 --- a/piker/ui/kivy/mouse_over.py +++ b/piker/ui/kivy/mouse_over.py @@ -89,14 +89,16 @@ class MouseOverBehavior(object): def __init__(self, **kwargs): self.register_event_type('on_enter') self.register_event_type('on_leave') - MouseOverBehavior._widgets.append(self) super().__init__(**kwargs) Window.bind(mouse_pos=self._on_mouse_pos) + self._widgets.append(self) + + def __del__(self): + MouseOverBehavior.remove(self) @classmethod - # try throttling to 1ms latency (doesn't seem to work - # best I can get is 0.01...) - @triggered(timeout=0.001, interval=False) + # throttle at 10ms latency + @triggered(timeout=0.01, interval=False) def _on_mouse_pos(cls, *args): log.debug(f"{cls} time since last call: {time.time() - cls._last_time}") cls._last_time = time.time() @@ -107,10 +109,11 @@ class MouseOverBehavior(object): pos = args[1] # Next line to_widget allow to compensate for relative layout - for widget in cls._widgets.copy(): + for widget in cls._widgets: w_coords = widget.to_widget(*pos) inside = widget.collide_point(*w_coords) if inside and widget.hovered: + log.debug('already hovered') return elif inside: # un-highlight the last highlighted diff --git a/piker/ui/monitor.py b/piker/ui/monitor.py index d9568135..537227ef 100644 --- a/piker/ui/monitor.py +++ b/piker/ui/monitor.py @@ -5,419 +5,38 @@ Launch with ``piker monitor ``. (Currently there's a bunch of questrade specific stuff in here) """ -from itertools import chain from types import ModuleType, AsyncGeneratorType -from typing import List +from typing import List, Callable import trio import tractor from kivy.uix.boxlayout import BoxLayout -from kivy.uix.gridlayout import GridLayout -from kivy.uix.stacklayout import StackLayout -from kivy.uix.button import Button from kivy.lang import Builder -from kivy import utils from kivy.app import async_runTouchApp from kivy.core.window import Window -from async_generator import aclosing +from .tabular import ( + Row, TickerTable, _kv, _black_rgba, colorcode, +) from ..log import get_logger from .pager import PagerView -from .kivy.mouse_over import new_mouse_over_group -HoverBehavior = new_mouse_over_group() log = get_logger('monitor') -_colors2hexs = { - 'darkgray': 'a9a9a9', - 'gray': '808080', - 'green': '008000', - 'forestgreen': '228b22', - 'red2': 'ff3333', - 'red': 'ff0000', - 'firebrick': 'b22222', -} - -_colors = {key: utils.rgba(val) for key, val in _colors2hexs.items()} - - -def colorcode(name): - return _colors[name if name else 'gray'] - - -_bs = 0.75 # border size - -# medium shade of gray that seems to match the -# default i3 window borders -_i3_rgba = [0.14]*3 + [1] - -# slightly off black like the jellybean bg from -# vim colorscheme -_cell_rgba = [0.07]*3 + [1] -_black_rgba = [0]*4 - -_kv = (f''' -#:kivy 1.10.0 - - - font_size: 21 - # make text wrap to botom - text_size: self.size - halign: 'center' - valign: 'middle' - size: self.texture_size - # color: {colorcode('gray')} - # font_color: {colorcode('gray')} - font_name: 'Roboto-Regular' - background_color: [0]*4 # by default transparent; use row color - # background_color: {_cell_rgba} - # spacing: 0, 0 - # padding: [0]*4 - - - font_size: 21 - background_color: [0]*4 # by default transparent; use row color - # background_color: {_cell_rgba} - # canvas.before: - # Color: - # rgba: [0.13]*4 - # BorderImage: # use a fixed size border - # pos: self.pos - # size: [self.size[0] - {_bs}, self.size[1]] - # # 0s are because the containing TickerTable already has spacing - # # border: [0, {_bs} , 0, {_bs}] - # border: [0, {_bs} , 0, 0] - - - spacing: [{_bs}] - # row_force_default: True - row_default_height: 62 - cols: 1 - canvas.before: - Color: - # i3 style gray as background - rgba: {_i3_rgba} - # rgba: {_cell_rgba} - Rectangle: - # scale with container self here refers to the widget i.e BoxLayout - pos: self.pos - size: self.size - - - spacing: [{_bs}, 0] - - - # minimum_height: 200 # should be pulled from Cell text size - # minimum_width: 200 - # row_force_default: True - # row_default_height: 61 # determines the header row size - padding: [0]*4 - spacing: [0] - canvas.before: - Color: - # rgba: [0]*4 - rgba: {_cell_rgba} - Rectangle: - # self here refers to the widget i.e Row(GridLayout) - pos: self.pos - size: self.size - # row higlighting on mouse over - Color: - rgba: {_i3_rgba} - RoundedRectangle: - size: self.width, self.height if self.hovered else 1 - pos: self.pos - radius: (10,) - - - -# part of the `PagerView` - - size_hint: 1, None - # static size of 51 px - height: 51 - font_size: 25 - background_color: {_i3_rgba} -''') - - -class Cell(Button): - """Data cell: the fundemental widget. - - ``key`` is the column name index value. - """ - def __init__(self, key=None, **kwargs): - super(Cell, self).__init__(**kwargs) - self.key = key - - -class HeaderCell(Cell): - """Column header cell label. - """ - def on_press(self, value=None): - """Clicking on a col header indicates to sort rows by this column - in `update_quotes()`. - """ - table = self.row.table - # if this is a row header cell then sort by the clicked field - if self.row.is_header: - table.sort_key = self.key - - last = table.last_clicked_col_cell - if last and last is not self: - last.underline = False - last.bold = False - - # outline the header text to indicate it's been the last clicked - self.underline = True - self.bold = True - # mark this cell as the last selected - table.last_clicked_col_cell = self - # sort and render the rows immediately - self.row.table.render_rows(table.quote_cache) - - # allow highlighting of row headers for tracking - elif self.is_header: - if self.background_color == self.color: - self.background_color = _black_rgba - else: - self.background_color = self.color - - -class BidAskLayout(StackLayout): - """Cell which houses three buttons containing a last, bid, and ask in a - single unit oriented with the last 2 under the first. - """ - def __init__(self, values, header=False, **kwargs): - # uncomment to get vertical stacked bid-ask - # super(BidAskLayout, self).__init__(orientation='bt-lr', **kwargs) - super(BidAskLayout, self).__init__(orientation='lr-tb', **kwargs) - assert len(values) == 3, "You can only provide 3 values: last,bid,ask" - self._keys2cells = {} - cell_type = HeaderCell if header else Cell - top_size = cell_type().font_size - small_size = top_size - 4 - top_prop = 0.5 # proportion of size used by top cell - bottom_prop = 1 - top_prop - for (key, size_hint, font_size), value in zip( - [('last', (1, top_prop), top_size), - ('bid', (0.5, bottom_prop), small_size), - ('ask', (0.5, bottom_prop), small_size)], - # uncomment to get vertical stacked bid-ask - # [('last', (top_prop, 1), top_size), - # ('bid', (bottom_prop, 0.5), small_size), - # ('ask', (bottom_prop, 0.5), small_size)], - values - ): - cell = cell_type( - text=str(value), - size_hint=size_hint, - # width=self.width/2 - 3, - font_size=font_size - ) - self._keys2cells[key] = cell - cell.key = value - cell.is_header = header - setattr(self, key, cell) - self.add_widget(cell) - - # should be assigned by referrer - self.row = None - - def get_cell(self, key): - return self._keys2cells[key] - - @property - def row(self): - return self.row - - @row.setter - def row(self, row): - # so hideous - for cell in self.cells: - cell.row = row - - @property - def cells(self): - return [self.last, self.bid, self.ask] - - -class Row(GridLayout, HoverBehavior): - """A grid for displaying a row of ticker quote data. - - The row fields can be updated using the ``fields`` property which will in - turn adjust the text color of the values based on content changes. - """ - def __init__( - self, record, headers=(), bidasks=None, table=None, - is_header=False, - **kwargs - ): - super(Row, self).__init__(cols=len(record), **kwargs) - self._cell_widgets = {} - self._last_record = record - self.table = table - self.is_header = is_header - - # selection state - self.mouse_over = False - - # create `BidAskCells` first - layouts = {} - bidasks = bidasks or {} - ba_cells = {} - for key, children in bidasks.items(): - layout = BidAskLayout( - [record[key]] + [record[child] for child in children], - header=is_header - ) - layout.row = self - layouts[key] = layout - for i, child in enumerate([key] + children): - ba_cells[child] = layout.cells[i] - - children_flat = list(chain.from_iterable(bidasks.values())) - self._cell_widgets.update(ba_cells) - - # build out row using Cell labels - for (key, val) in record.items(): - header = key in headers - - # handle bidask cells - if key in layouts: - self.add_widget(layouts[key]) - elif key in children_flat: - # these cells have already been added to the `BidAskLayout` - continue - else: - cell = self._append_cell(val, key, header=header) - cell.key = key - self._cell_widgets[key] = cell - - def get_cell(self, key): - return self._cell_widgets[key] - - def _append_cell(self, text, key, header=False): - if not len(self._cell_widgets) < self.cols: - raise ValueError(f"Can not append more then {self.cols} cells") - - # header cells just have a different colour - celltype = HeaderCell if header else Cell - cell = celltype(text=str(text), key=key) - cell.is_header = header - cell.row = self - self.add_widget(cell) - return cell - - def update(self, record, displayable): - """Update this row's cells with new values from a quote ``record``. - - Return all cells that changed in a ``dict``. - """ - # color changed field values - cells = {} - gray = colorcode('gray') - fgreen = colorcode('forestgreen') - red = colorcode('red2') - for key, val in record.items(): - # logic for cell text coloring: up-green, down-red - if self._last_record[key] < val: - color = fgreen - elif self._last_record[key] > val: - color = red - else: - color = gray - - cell = self.get_cell(key) - cell.text = str(displayable[key]) - cell.color = color - if color != gray: - cells[key] = cell - - self._last_record = record - return cells - - # mouse over handlers - def on_enter(self): - """Highlight layout on enter. - """ - log.debug( - f"Entered row {type(self)} through {self.border_point}") - # don't highlight header row - if getattr(self, 'is_header', None): - self.hovered = False - - def on_leave(self): - """Un-highlight layout on exit. - """ - log.debug( - f"Left row {type(self)} through {self.border_point}") - - -class TickerTable(GridLayout): - """A grid for displaying ticker quote records as a table. - """ - def __init__(self, sort_key='%', quote_cache={}, **kwargs): - super(TickerTable, self).__init__(**kwargs) - self.symbols2rows = {} - self.sort_key = sort_key - self.quote_cache = quote_cache - self.row_filter = lambda item: item - # for tracking last clicked column header cell - self.last_clicked_col_cell = None - self._last_row_toggle = 0 - - def append_row(self, record, bidasks=None): - """Append a `Row` of `Cell` objects to this table. - """ - row = Row(record, headers=('symbol',), bidasks=bidasks, table=self) - # store ref to each row - self.symbols2rows[row._last_record['symbol']] = row - self.add_widget(row) - return row - - def render_rows( - self, pairs: {str: (dict, Row)}, sort_key: str = None, - row_filter=None, - ): - """Sort and render all rows on the ticker grid from ``pairs``. - """ - self.clear_widgets() - sort_key = sort_key or self.sort_key - for data, row in filter( - row_filter or self.row_filter, - reversed( - sorted(pairs.values(), key=lambda item: item[0][sort_key]) - ) - ): - self.add_widget(row) # row append - - def ticker_search(self, patt): - """Return sequence of matches when pattern ``patt`` is in a - symbol name. Most naive algo possible for the moment. - """ - for symbol, row in self.symbols2rows.items(): - if patt in symbol: - yield symbol, row - - def search(self, patt): - """Search bar api compat. - """ - return dict(self.ticker_search(patt)) or {} - - async def update_quotes( nursery: trio._core._run.Nursery, - brokermod: ModuleType, + formatter: Callable, widgets: dict, agen: AsyncGeneratorType, symbol_data: dict, - first_quotes: dict + first_quotes: dict, + task_status: trio._core._run._TaskStatus = trio.TASK_STATUS_IGNORED, ): """Process live quotes by updating ticker rows. """ + log.debug("Initializing UI update loop") table = widgets['table'] flash_keys = {'low', 'high'} @@ -426,24 +45,27 @@ async def update_quotes( for cell in cells: cell.background_color = _black_rgba - def color_row(row, data, cells): + def color_row(row, record, cells): hdrcell = row.get_cell('symbol') chngcell = row.get_cell('%') # determine daily change color - daychange = float(data['%']) - if daychange < 0.: - color = colorcode('red2') - elif daychange > 0.: - color = colorcode('forestgreen') - else: - color = colorcode('gray') + color = colorcode('gray') + percent_change = record.get('%') + if percent_change: + daychange = float(record['%']) + if daychange < 0.: + color = colorcode('red2') + elif daychange > 0.: + color = colorcode('forestgreen') # update row header and '%' cell text color - chngcell.color = hdrcell.color = color - # if the cell has been "highlighted" make sure to change its color - if hdrcell.background_color != [0]*4: - hdrcell.background_color = color + if chngcell: + chngcell.color = color + hdrcell.color = color + # if the cell has been "highlighted" make sure to change its color + if hdrcell.background_color != [0]*4: + hdrcell.background_color = color # briefly highlight bg of certain cells on each trade execution unflash = set() @@ -478,36 +100,62 @@ async def update_quotes( # revert flash state momentarily nursery.start_soon(revert_cells_color, unflash) - cache = {} - table.quote_cache = cache - # initial coloring + to_sort = set() for sym, quote in first_quotes.items(): - row = table.symbols2rows[sym] - record, displayable = brokermod.format_quote( + row = table.get_row(sym) + record, displayable = formatter( quote, symbol_data=symbol_data) row.update(record, displayable) color_row(row, record, {}) - cache[sym] = (record, row) + to_sort.add(row.widget) - # render all rows once up front - table.render_rows(cache) + table.render_rows(to_sort) + log.debug("Finished initializing update loop") + task_status.started() # real-time cell update loop async for quotes in agen: # new quotes data only + to_sort = set() for symbol, quote in quotes.items(): - record, displayable = brokermod.format_quote( + row = table.get_row(symbol) + record, displayable = formatter( quote, symbol_data=symbol_data) - row = table.symbols2rows[symbol] - cache[symbol] = (record, row) + + # determine if sorting should happen + sort_key = table.sort_key + new = record[sort_key] + last = row.get_field(sort_key) + if new != last: + to_sort.add(row.widget) + + # update and color cells = row.update(record, displayable) color_row(row, record, cells) - table.render_rows(cache) + if to_sort: + table.render_rows(to_sort) + log.debug("Waiting on quotes") log.warn("Data feed connection dropped") - nursery.cancel_scope.cancel() + # XXX: if we're cancelled this should never get called + # nursery.cancel_scope.cancel() + + +async def stream_symbol_selection(): + """An RPC async gen for streaming the symbol corresponding + value corresponding to the last clicked row. + """ + widgets = tractor.current_actor().statespace['widgets'] + table = widgets['table'] + q = trio.Queue(1) + table._click_queues.append(q) + try: + async for symbol in q: + yield symbol + finally: + table._click_queues.remove(q) async def _async_main( @@ -515,7 +163,7 @@ async def _async_main( portal: tractor._portal.Portal, tickers: List[str], brokermod: ModuleType, - rate: int, + rate: int = 3, test: bool = False ) -> None: '''Launch kivy app + all other related tasks. @@ -528,11 +176,18 @@ async def _async_main( "piker.brokers.data", 'stream_from_file', filename=test ) + # TODO: need a set of test packets to make this work + # seriously fu QT + # sd = {} else: # start live streaming from broker daemon quote_gen = await portal.run( - "piker.brokers.data", 'start_quote_stream', - broker=brokermod.name, symbols=tickers) + "piker.brokers.data", + 'start_quote_stream', + broker=brokermod.name, + symbols=tickers, + rate=3, + ) # subscribe for tickers (this performs a possible filtering # where invalid symbols are discarded) @@ -540,85 +195,94 @@ async def _async_main( "piker.brokers.data", 'symbol_data', broker=brokermod.name, tickers=tickers) - async with trio.open_nursery() as nursery: - # get first quotes response - log.debug("Waiting on first quote...") - quotes = await quote_gen.__anext__() - first_quotes = [ - brokermod.format_quote(quote, symbol_data=sd)[0] - for quote in quotes.values()] + # get first quotes response + log.debug("Waiting on first quote...") + quotes = await quote_gen.__anext__() + first_quotes = [ + brokermod.format_stock_quote(quote, symbol_data=sd)[0] + for quote in quotes.values()] - if first_quotes[0].get('last') is None: - log.error("Broker API is down temporarily") - nursery.cancel_scope.cancel() - return + if first_quotes[0].get('last') is None: + log.error("Broker API is down temporarily") + return - # build out UI - Window.set_title(f"monitor: {name}\t(press ? for help)") - Builder.load_string(_kv) - box = BoxLayout(orientation='vertical', spacing=0) + # build out UI + Window.set_title(f"monitor: {name}\t(press ? for help)") + Builder.load_string(_kv) + box = BoxLayout(orientation='vertical', spacing=0) - # define bid-ask "stacked" cells - # (TODO: needs some rethinking and renaming for sure) - bidasks = brokermod._bidasks + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._stock_bidasks - # add header row - headers = first_quotes[0].keys() - header = Row( - {key: key for key in headers}, - headers=headers, - bidasks=bidasks, - is_header=True, - size_hint=(1, None), + # add header row + headers = first_quotes[0].keys() + header = Row( + {key: key for key in headers}, + headers=headers, + bidasks=bidasks, + is_header=True, + size_hint=(1, None), + ) + box.add_widget(header) + + # build table + table = TickerTable( + cols=1, + size_hint=(1, None), + ) + for ticker_record in first_quotes: + table.append_row( + ticker_record['symbol'], + Row(ticker_record, headers=('symbol',), + bidasks=bidasks, table=table) ) - box.add_widget(header) - # build table - table = TickerTable( - cols=1, - size_hint=(1, None), - ) - for ticker_record in first_quotes: - table.append_row(ticker_record, bidasks=bidasks) - # associate the col headers row with the ticker table even though - # they're technically wrapped separately in containing BoxLayout - header.table = table + # associate the col headers row with the ticker table even though + # they're technically wrapped separately in containing BoxLayout + header.table = table - # mark the initial sorted column header as bold and underlined - sort_cell = header.get_cell(table.sort_key) - sort_cell.bold = sort_cell.underline = True - table.last_clicked_col_cell = sort_cell + # mark the initial sorted column header as bold and underlined + sort_cell = header.get_cell(table.sort_key) + sort_cell.bold = sort_cell.underline = True + table.last_clicked_col_cell = sort_cell - # set up a pager view for large ticker lists - table.bind(minimum_height=table.setter('height')) - pager = PagerView(box, table, nursery) - box.add_widget(pager) + # set up a pager view for large ticker lists + table.bind(minimum_height=table.setter('height')) - widgets = { - # 'anchor': anchor, - 'root': box, - 'table': table, - 'box': box, - 'header': header, - 'pager': pager, - } - nursery.start_soon( - update_quotes, nursery, brokermod, widgets, quote_gen, sd, quotes) + ss = tractor.current_actor().statespace + try: + async with trio.open_nursery() as nursery: + pager = PagerView( + container=box, + contained=table, + nursery=nursery + ) + box.add_widget(pager) + + widgets = { + 'root': box, + 'table': table, + 'box': box, + 'header': header, + 'pager': pager, + } + ss['widgets'] = widgets + nursery.start_soon( + update_quotes, + nursery, + brokermod.format_stock_quote, + widgets, + quote_gen, + sd, + quotes + ) - try: # Trio-kivy entry point. await async_runTouchApp(widgets['root']) # run kivy - finally: - await quote_gen.aclose() # cancel aysnc gen call - # un-subscribe from symbols stream (cancel if brokerd - # was already torn down - say by SIGINT) - with trio.move_on_after(0.2): - await portal.run( - "piker.brokers.data", 'modify_quote_stream', - broker=brokermod.name, - feed_type='stock', - symbols=[] - ) - # cancel GUI update task nursery.cancel_scope.cancel() + finally: + with trio.open_cancel_scope(shield=True): + # cancel aysnc gen call + await quote_gen.aclose() diff --git a/piker/ui/option_chain.py b/piker/ui/option_chain.py new file mode 100644 index 00000000..237c8431 --- /dev/null +++ b/piker/ui/option_chain.py @@ -0,0 +1,561 @@ +""" +options: a real-time option chain. + +Launch with ``piker options ``. +""" +import types +from functools import partial +from typing import Dict, List + +import trio +from async_generator import asynccontextmanager +import tractor +from kivy.uix.boxlayout import BoxLayout +from kivy.lang import Builder +from kivy.app import async_runTouchApp +from kivy.core.window import Window +from kivy.uix.label import Label + +from ..log import get_logger +from ..brokers.core import contracts +from .pager import PagerView + +from .tabular import Row, HeaderCell, Cell, TickerTable +from .monitor import update_quotes + + +log = get_logger('option_chain') + + +async def modify_symbol(symbol): + pass + + +class StrikeCell(HeaderCell): + """Strike cell""" + + +_no_display = ['symbol', 'contract_type', 'strike', 'time', 'open'] +_strike_row_cache = {} +_strike_cell_cache = {} +_no_contracts_msg = "No contracts available for symbol" + + +class StrikeRow(BoxLayout): + """A 'row' composed of two ``Row``s sandwiching a + ``StrikeCell`. + """ + _row_cache = {} + + def __init__(self, strike, **kwargs): + super().__init__(orientation='horizontal', **kwargs) + self.strike = strike + # store 2 rows: 1 for call, 1 for put + self._sub_rows = {} + self._widgets_added = False + + def append_sub_row( + self, + record: dict, + displayable: dict, + bidasks=None, + headers=(), + table=None, + **kwargs, + ) -> None: + # the 'contract_type' determines whether this + # is a put or call row + contract_type = record['contract_type'] + + # We want to only create a few ``Row`` widgets as possible to + # speed up rendering; we cache sub rows after creation. + row = self._row_cache.get((self.strike, contract_type)) + if not row: + # reverse order of call side cells + if contract_type == 'call': + record = dict(list(reversed(list(record.items())))) + + row = Row( + record, + bidasks=bidasks, + headers=headers, + table=table, + no_cell=_no_display, + **kwargs + ) + self._row_cache[(self.strike, contract_type)] = row + else: + # must update the internal cells + row.update(record, displayable) + + # reassign widget for when rendered in the update loop + row.widget = self + self._sub_rows[contract_type] = row + + if self.is_populated() and not self._widgets_added: + # calls on the left + self.add_widget(self._sub_rows['call']) + strike_cell = _strike_cell_cache.setdefault( + self.strike, StrikeCell( + key=self.strike, + text=str(self.strike), + # is_header=True, + # make centre strike cell nice and small + size_hint=(1/10., 1), + ) + ) + # strikes in the middle + self.add_widget(strike_cell) + # puts on the right + self.add_widget(self._sub_rows['put']) + self._widgets_added = True + + def is_populated(self): + """Bool determing if both a put and call subrow have beed appended. + """ + return len(self._sub_rows) == 2 + + def has_widgets(self): + return self._widgets_added + + def update(self, record, displayable): + self._sub_rows[record['contract_type']].update( + record, displayable) + + def get_field(self, key): + """Always sort on the lone field, the strike price. + """ + return int(self.strike) + + def rowsitems(self): + return self._sub_rows.items() + + +class ExpiryButton(Cell): + # must be set to allow 'plain bg colors' since default texture is grey + background_normal = '' + + def on_press(self, value=None): + last = self.chain._last_expiry + if last: + last.click_toggle = False + self.chain._last_expiry = self + + log.info(f"Clicked {self}") + self.click_toggle = True + self.chain.start_displaying(self.chain.symbol, self.key) + + +class DataFeed(object): + """Data feed client for streaming symbol data from a (remote) + ``brokerd`` data daemon. + """ + def __init__(self, portal, brokermod): + self.portal = portal + self.brokermod = brokermod + self._symbols = None + self.quote_gen = None + self._mutex = trio.StrictFIFOLock() + + async def open_stream(self, symbols, rate=1, test=None): + async with self._mutex: + try: + if self.quote_gen is not None and symbols != self._symbols: + log.info( + f"Stopping existing subscription for {self._symbols}") + await self.quote_gen.aclose() + self._symbols = symbols + + if test: + # stream from a local test file + quote_gen = await self.portal.run( + "piker.brokers.data", 'stream_from_file', + filename=test + ) + else: + log.info(f"Starting new stream for {self._symbols}") + # start live streaming from broker daemon + quote_gen = await self.portal.run( + "piker.brokers.data", + 'start_quote_stream', + broker=self.brokermod.name, + symbols=symbols, + feed_type='option', + rate=rate, + ) + + # get first quotes response + log.debug(f"Waiting on first quote for {symbols}...") + quotes = {} + # with trio.move_on_after(5): + quotes = await quote_gen.__anext__() + + self.quote_gen = quote_gen + self.first_quotes = quotes + return quote_gen, quotes + except Exception: + if self.quote_gen: + await self.quote_gen.aclose() + self.quote_gen = None + raise + + def format_quotes(self, quotes): + records, displayables = zip(*[ + self.brokermod.format_option_quote(quote, {}) + for quote in quotes.values() + ]) + return records, displayables + + +@asynccontextmanager +async def find_local_monitor(): + """Establish a portal to a local monitor for triggering + symbol changes. + """ + async with tractor.find_actor('monitor') as portal: + if not portal: + log.warn( + "No monitor app could be found, no symbol link established..") + yield portal + + +class OptionChain(object): + """A real-time options chain UI. + """ + _title = "option chain: {symbol}\t(press ? for help)" + + def __init__( + self, + widgets: dict, + bidasks: Dict[str, List[str]], + feed: DataFeed, + rate: int, + ): + self.symbol = None + self.expiry = None + self.widgets = widgets + self.bidasks = bidasks + self._strikes2rows = {} + self._nursery = None + self._update_nursery = None + self.feed = feed + self._quote_gen = None + # TODO: this should be moved down to the data feed layer + # right now it's only needed for the UI update loop to cancel itself + self._update_cs = None + self._first_quotes = None + self._last_expiry = None + # flag to determine if one-time widgets have been generated + self._static_widgets_initialized = False + self._no_opts_label = None + + @property + def no_opts_label(self): + if self._no_opts_label is None: + label = self._no_opts_label = Label(text=_no_contracts_msg) + label.font_size = 30 + return self._no_opts_label + + async def _rx_symbols(self): + async with find_local_monitor() as portal: + if not portal: + log.warn("No local monitor could be found") + return + async for symbol in await portal.run( + 'piker.ui.monitor', + 'stream_symbol_selection', + ): + log.info(f"Changing symbol subscriptions to {symbol}") + self.start_displaying(symbol, self.expiry) + + @asynccontextmanager + async def open_rt_display(self, nursery, symbol, expiry=None): + """Open an internal update task scope required to allow + for dynamic real-time operation. + """ + self._parent_nursery = nursery + async with trio.open_nursery() as n: + self._nursery = n + # fill out and start updatingn strike table + n.start_soon( + partial(self._start_displaying, symbol, expiry=expiry) + ) + # listen for undlerlying symbol changes from a local monitor app + n.start_soon(self._rx_symbols) + yield self + n.cancel_scope.cancel() + + self._nursery = None + # make sure we always tear down our existing data feed + await self.feed.quote_gen.aclose() + + def clear_strikes(self): + """Clear the strike rows from the internal table. + """ + table = self.widgets['table'] + table.clear() + self._strikes2rows.clear() + + def render_rows(self, records, displayables): + """Render all strike rows in the internal table. + """ + log.debug("Rendering rows") + table = self.widgets['table'] + for record, display in zip( + sorted(records, key=lambda q: q['strike']), + displayables + ): + strike = record['strike'] + strike_row = _strike_row_cache.setdefault( + strike, StrikeRow(strike)) + strike_row.append_sub_row( + record, + display, + bidasks=self.bidasks, + table=table, + ) + if strike_row.is_populated(): + # We must fill out the the table's symbol2rows manually + # using each contracts "symbol" so that the quote updater + # task can look up the right row to update easily + # See update_quotes() and ``Row`` internals for details. + for contract_type, row in strike_row.rowsitems(): + symbol = row._last_record['symbol'] + table.symbols2rows[symbol] = row + + if strike not in self._strikes2rows: + # re-adding widgets is an error + self._strikes2rows[strike] = strike_row + + log.debug("Finished rendering rows!") + + def _init_static_widgets(self, displayables): + assert self._static_widgets_initialized is False + container = self.widgets['container'] + + # calls / puts header + type_header = BoxLayout( + orientation='horizontal', + size_hint=(1, 1/30.), + ) + calls = Label(text='calls', font_size='20') + puts = Label(text='puts', font_size='20') + type_header.add_widget(calls) + type_header.add_widget(puts) + container.add_widget(type_header) + + # figure out header fields for each table based on quote keys + headers = displayables[0].keys() + header_row = StrikeRow(strike='strike', size_hint=(1, None)) + header_record = {key: key for key in headers} + header_record['contract_type'] = 'put' + header_row.append_sub_row( + header_record, + header_record, + headers=headers, + bidasks=self.bidasks, + is_header=True, + size_hint=(1, None), + ) + header_record['contract_type'] = 'call' + header_row.append_sub_row( + header_record, + header_record, + headers=headers, + bidasks=self.bidasks, + is_header=True, + size_hint=(1, None), + ) + container.add_widget(header_row) + + # build out chain tables + table = TickerTable( + sort_key='strike', + cols=1, + size_hint=(1, None), + ) + header_row.table = table + table.bind(minimum_height=table.setter('height')) + pager = PagerView( + container=container, + contained=table, + nursery=self._nursery + ) + container.add_widget(pager) + + self.widgets.update({ + 'table': table, + 'type_header': type_header, + 'table': table, + 'pager': pager, + }) + + async def _start_displaying(self, symbol, expiry=None): + """Main routine to start displaying the real time updated strike + table. + + Clear any existing data feed subscription that is no longer needed + (eg. when clicking a new expiry button) spin up a new subscription, + populate the table and start updating it. + """ + table = self.widgets.get('table') + if table: + self.clear_strikes() + + if self._update_cs: + log.warn("Cancelling existing update task") + self._update_cs.cancel() + await trio.sleep(0) + + if self._quote_gen: + await self._quote_gen.aclose() + + # redraw any symbol specific UI components + if self.symbol != symbol or expiry is None: + # set window title + self.widgets['window'].set_title( + self._title.format(symbol=symbol) + ) + + # retreive all contracts to populate expiry row + all_contracts = await contracts(self.feed.brokermod, symbol) + + if not all_contracts: + label = self.no_opts_label + label.symbol = symbol + if table: + table.add_widget(label) + return + + # start streaming soonest contract by default if not provided + expiry = next(iter(all_contracts)).expiry if not expiry else expiry + + # TODO: figure out how to compact these buttons + expiries = { + key.expiry: key.expiry[:key.expiry.find('T')] + for key in all_contracts + } + expiry_row = self.widgets['expiry_row'] + expiry_row.clear_widgets() + + for expiry, justdate in expiries.items(): + button = ExpiryButton(text=str(justdate), key=expiry) + # assign us to each expiry button + button.chain = self + expiry_row.add_widget(button) + + if self._nursery is None: + raise RuntimeError( + "You must call open this chain's update scope first!") + + log.debug(f"Waiting on first_quotes for {symbol}:{expiry}") + self._quote_gen, first_quotes = await self.feed.open_stream( + [(symbol, expiry)] + ) + log.debug(f"Got first_quotes for {symbol}:{expiry}") + records, displayables = self.feed.format_quotes(first_quotes) + + # draw static widgets only once + if self._static_widgets_initialized is False: + self._init_static_widgets(displayables) + self._static_widgets_initialized = True + + self.render_rows(records, displayables) + + with trio.open_cancel_scope() as cs: + self._update_cs = cs + await self._nursery.start( + partial( + update_quotes, + self._nursery, + self.feed.brokermod.format_option_quote, + self.widgets, + self._quote_gen, + symbol_data={}, + first_quotes=first_quotes, + ) + ) + # always keep track of current subscription + self.symbol, self.expiry = symbol, expiry + + def start_displaying(self, symbol, expiry): + if self.symbol == symbol and self.expiry == expiry: + log.info(f"Clicked {symbol}:{expiry} is already selected") + return + + log.info(f"Subscribing for {symbol}:{expiry}") + self._nursery.start_soon( + partial(self._start_displaying, symbol, expiry=expiry) + ) + + +async def new_chain_ui( + portal: tractor._portal.Portal, + symbol: str, + brokermod: types.ModuleType, + nursery: trio._core._run.Nursery, + rate: int = 1, +) -> None: + """Create and return a new option chain UI. + """ + # use `monitor` styling for now + from .monitor import _kv + Builder.load_string(_kv) + + # the master container + container = BoxLayout(orientation='vertical', spacing=0) + + # expiry buttons row (populated later once contracts are retreived) + expiry_row = BoxLayout( + orientation='horizontal', + size_hint=(1, None), + ) + container.add_widget(expiry_row) + + widgets = { + 'window': Window, + 'root': container, + 'container': container, + 'expiry_row': expiry_row, + } + # define bid-ask "stacked" cells + # (TODO: needs some rethinking and renaming for sure) + bidasks = brokermod._option_bidasks + + feed = DataFeed(portal, brokermod) + chain = OptionChain( + widgets, + bidasks, + feed, + rate=rate, + ) + return chain + + +async def _async_main( + symbol: str, + portal: tractor._portal.Portal, + brokermod: types.ModuleType, + rate: int = 1, + test: bool = False +) -> None: + '''Launch kivy app + all other related tasks. + + This is started with cli cmd `piker options`. + ''' + async with trio.open_nursery() as nursery: + # set up a pager view for large ticker lists + chain = await new_chain_ui( + portal, + symbol, + brokermod, + nursery, + rate=rate, + ) + async with chain.open_rt_display(nursery, symbol): + try: + # trio-kivy entry point. + await async_runTouchApp(chain.widgets['root']) # run kivy + finally: + # cancel GUI update task + nursery.cancel_scope.cancel() diff --git a/piker/ui/pager.py b/piker/ui/pager.py index 5b38aa76..d70175d8 100644 --- a/piker/ui/pager.py +++ b/piker/ui/pager.py @@ -184,5 +184,5 @@ class PagerView(ScrollView): _, yscale = self.convert_distance_to_scroll(0, pxs) new = self.scroll_y + (yscale * {'u': 1, 'd': -1}[direction]) # bound to near [0, 1] to avoid "over-scrolling" - limited = max(-0.01, min(new, 1.01)) + limited = max(-0.001, min(new, 1.001)) self.scroll_y = limited diff --git a/piker/ui/tabular.py b/piker/ui/tabular.py new file mode 100644 index 00000000..7a9aca2f --- /dev/null +++ b/piker/ui/tabular.py @@ -0,0 +1,470 @@ +""" +Real-time table components +""" +from itertools import chain +from typing import List +from bisect import bisect + +import trio +from kivy.uix.gridlayout import GridLayout +from kivy.uix.stacklayout import StackLayout +from kivy.uix.button import Button +from kivy import utils +from kivy.properties import BooleanProperty + +from ..log import get_logger +from .kivy.mouse_over import new_mouse_over_group + + +HoverBehavior = new_mouse_over_group() +log = get_logger('monitor') + +_colors2hexs = { + 'darkgray': 'a9a9a9', + 'gray': '808080', + 'green': '008000', + 'forestgreen': '228b22', + 'red2': 'ff3333', + 'red': 'ff0000', + 'firebrick': 'b22222', +} + +_colors = {key: utils.rgba(val) for key, val in _colors2hexs.items()} + + +def colorcode(name): + return _colors[name if name else 'gray'] + + +_bs = 0.75 # border size +_fs = 20 # font size + +# medium shade of gray that seems to match the +# default i3 window borders +_i3_rgba = [0.14]*3 + [1] + +# slightly off black like the jellybean bg from +# vim colorscheme +_cell_rgba = [0.07]*3 + [1] +_black_rgba = [0]*4 + +_kv = (f''' +#:kivy 1.10.0 + + + font_size: {_fs} + + # make text wrap to botom + text_size: self.size + halign: 'center' + valign: 'middle' + size: self.texture_size + + # don't set these as the update loop already does it + # color: {colorcode('gray')} + # font_color: {colorcode('gray')} + # font_name: 'Hack-Regular' + + # if `highlight` is set use i3 color by default transparent; use row color + # this is currently used for expiry cells on the options chain + background_color: {_i3_rgba} if self.click_toggle else {_black_rgba} + # must be set to allow 'plain bg colors' since default texture is grey + # but right now is only set for option chain expiry buttons + # background_normal: '' + # spacing: 0, 0 + # padding: 3, 3 + + + + font_size: {_fs} + # canvas.before: + # Color: + # rgba: [0.13]*4 + # BorderImage: # use a fixed size border + # pos: self.pos + # size: [self.size[0] - {_bs}, self.size[1]] + # # 0s are because the containing TickerTable already has spacing + # # border: [0, {_bs} , 0, {_bs}] + # border: [0, {_bs} , 0, 0] + + + + spacing: [{_bs}] + # row_force_default: True + row_default_height: 56 + cols: 1 + canvas.before: + Color: + # i3 style gray as background + rgba: {_i3_rgba} + Rectangle: + # scale with container self here refers to the widget i.e BoxLayout + pos: self.pos + size: self.size + + + + spacing: [{_bs}, 0] + + + + # minimum_height: 200 # should be pulled from Cell text size + # minimum_width: 200 + # row_force_default: True + # row_default_height: 61 # determines the header row size + padding: [0]*4 + spacing: [0] + canvas.before: + Color: + rgba: {_cell_rgba} + Rectangle: + # self here refers to the widget i.e Row(GridLayout) + pos: self.pos + size: self.size + # row higlighting on mouse over + Color: + rgba: {_i3_rgba} + # RoundedRectangle: + Rectangle: + size: self.width, self.height if self.hovered else 1 + pos: self.pos + # radius: (0,) + + +# part of the `PagerView` + + size_hint: 1, None + # static size of 51 px + height: 51 + font_size: 25 + background_color: {_i3_rgba} +''') + + +class Cell(Button): + """Data cell: the fundemental widget. + + ``key`` is the column name index value. + """ + click_toggle = BooleanProperty(False) + + def __init__(self, key=None, is_header=False, **kwargs): + super(Cell, self).__init__(**kwargs) + self.key = key + self.row = None + self.is_header = is_header + + def on_press(self, value=None): + self.row.on_press() + + +class HeaderCell(Cell): + """Column header cell label. + """ + def on_press(self, value=None): + """Clicking on a col header indicates to sort rows by this column + in `update_quotes()`. + """ + table = self.row.table + # if this is a row header cell then sort by the clicked field + if self.row.is_header: + table.sort_key = self.key + + last = table.last_clicked_col_cell + if last and last is not self: + last.underline = False + last.bold = False + + # outline the header text to indicate it's been the last clicked + self.underline = True + self.bold = True + # mark this cell as the last selected + table.last_clicked_col_cell = self + # sort and render the rows immediately + self.row.table.render_rows(table.symbols2rows.values()) + + # TODO: make this some kind of small geometry instead + # (maybe like how trading view does it). + # allow highlighting of row headers for tracking + elif self.is_header: + if self.background_color == self.color: + self.background_color = _black_rgba + else: + self.background_color = self.color + + +class BidAskLayout(StackLayout): + """Cell which houses three buttons containing a last, bid, and ask in a + single unit oriented with the last 2 under the first. + """ + def __init__(self, values, header=False, **kwargs): + # uncomment to get vertical stacked bid-ask + # super(BidAskLayout, self).__init__(orientation='bt-lr', **kwargs) + super(BidAskLayout, self).__init__(orientation='lr-tb', **kwargs) + assert len(values) == 3, "You can only provide 3 values: last,bid,ask" + self._keys2cells = {} + cell_type = HeaderCell if header else Cell + top_size = cell_type().font_size + small_size = top_size - 4 + top_prop = 0.5 # proportion of size used by top cell + bottom_prop = 1 - top_prop + for (key, size_hint, font_size), value in zip( + [('last', (1, top_prop), top_size), + ('bid', (0.5, bottom_prop), small_size), + ('ask', (0.5, bottom_prop), small_size)], + # uncomment to get vertical stacked bid-ask + # [('last', (top_prop, 1), top_size), + # ('bid', (bottom_prop, 0.5), small_size), + # ('ask', (bottom_prop, 0.5), small_size)], + values + ): + cell = cell_type( + text=str(value), + size_hint=size_hint, + # width=self.width/2 - 3, + font_size=font_size + ) + self._keys2cells[key] = cell + cell.key = value + cell.is_header = header + setattr(self, key, cell) + self.add_widget(cell) + + # should be assigned by referrer + self.row = None + + def get_cell(self, key): + return self._keys2cells.get(key) + + @property + def row(self): + return self.row + + @row.setter + def row(self, row): + # so hideous + for cell in self.cells: + cell.row = row + + @property + def cells(self): + return [self.last, self.bid, self.ask] + + +class Row(HoverBehavior, GridLayout): + """A grid for displaying a row of ticker quote data. + """ + def __init__( + self, + record, + headers=(), + no_cell=(), + bidasks=None, + table=None, + is_header=False, + cell_type=None, + **kwargs + ): + super().__init__(cols=len(record), **kwargs) + self._cell_widgets = {} + self._last_record = record + self.table = table + self.is_header = is_header + self._cell_type = cell_type + self.widget = self + + # Create `BidAskCells` first. + # bid/ask cells are just 3 cells grouped in a + # ``BidAskLayout`` which just stacks the parent cell + # on top of 2 children. + layouts = {} + bidasks = bidasks or {} + ba_cells = {} + for key, children in bidasks.items(): + layout = BidAskLayout( + [record[key]] + [record[child] for child in children], + header=is_header + ) + layout.row = self + layouts[key] = layout + for i, child in enumerate([key] + children): + ba_cells[child] = layout.cells[i] + + children_flat = list(chain.from_iterable(bidasks.values())) + self._cell_widgets.update(ba_cells) + + # build out row using Cell labels + for (key, val) in record.items(): + header = key in headers + + # handle bidask cells + if key in layouts: + self.add_widget(layouts[key]) + elif key in children_flat: + # these cells have already been added to the `BidAskLayout` + continue + elif key not in no_cell: + cell = self._append_cell(val, key, header=header) + cell.key = key + self._cell_widgets[key] = cell + + def iter_cells(self): + return self._cell_widgets.items() + + def get_cell(self, key): + return self._cell_widgets.get(key) + + def get_field(self, key): + return self._last_record[key] + + def _append_cell(self, text, key, header=False): + if not len(self._cell_widgets) < self.cols: + raise ValueError(f"Can not append more then {self.cols} cells") + + # header cells just have a different colour + celltype = self._cell_type or (HeaderCell if header else Cell) + cell = celltype(text=str(text), key=key) + cell.is_header = header + cell.row = self + self.add_widget(cell) + return cell + + def update(self, record, displayable): + """Update this row's cells with new values from a quote + ``record``. + + Return all cells that changed in a ``dict``. + """ + # color changed field values + cells = {} + gray = colorcode('gray') + fgreen = colorcode('forestgreen') + red = colorcode('red2') + for key, val in record.items(): + last = self.get_field(key) + color = gray + try: + # logic for cell text coloring: up-green, down-red + if last < val: + color = fgreen + elif last > val: + color = red + except TypeError: + log.warn(f"wtf QT {val} is not regular?") + + cell = self.get_cell(key) + # some displayable fields might have specifically + # not had cells created as set in the `no_cell` attr + if cell is not None: + cell.text = str(displayable[key]) + cell.color = color + if color != gray: + cells[key] = cell + + self._last_record = record + return cells + + # mouse over handlers + def on_enter(self): + """Highlight layout on enter. + """ + log.debug( + f"Entered row {self} through {self.border_point}") + # don't highlight header row + if self.is_header: + self.hovered = False + + def on_leave(self): + """Un-highlight layout on exit. + """ + log.debug( + f"Left row {self} through {self.border_point}") + + def on_press(self, value=None): + log.info(f"Pressed row for {self._last_record['symbol']}") + if self.table and not self.is_header: + for q in self.table._click_queues: + q.put_nowait(self._last_record['symbol']) + + +class TickerTable(GridLayout): + """A grid for displaying ticker quote records as a table. + """ + def __init__(self, sort_key='%', auto_sort=True, **kwargs): + super(TickerTable, self).__init__(**kwargs) + self.symbols2rows = {} + self.sort_key = sort_key + # for tracking last clicked column header cell + self.last_clicked_col_cell = None + self._auto_sort = auto_sort + self._symbols2index = {} + self._sorted = [] + self._click_queues: List[trio.Queue] = [] + + def append_row(self, key, row): + """Append a `Row` of `Cell` objects to this table. + """ + # store ref to each row + self.symbols2rows[key] = row + self.add_widget(row) + self._sorted.append(row) + return row + + def clear(self): + self.clear_widgets() + self._sorted.clear() + + def render_rows( + self, + changed: set, + sort_key: str = None, + ): + """Sort and render all rows on the ticker grid from ``syms2rows``. + """ + sort_key = sort_key or self.sort_key + key_row_pairs = list(sorted( + [(row.get_field(sort_key), row) for row in self._sorted], + key=lambda item: item[0], + )) + if key_row_pairs: + sorted_keys, sorted_rows = zip(*key_row_pairs) + sorted_keys, sorted_rows = list(sorted_keys), list(sorted_rows) + else: + sorted_keys, sorted_rows = [], [] + + # now remove and re-insert any rows that need to be shuffled + # due to new a new field change + for row in changed: + try: + old_index = sorted_rows.index(row) + except ValueError: + # row is not yet added so nothing to remove + pass + else: + del sorted_rows[old_index] + del sorted_keys[old_index] + self._sorted.remove(row) + self.remove_widget(row) + + for row in changed: + key = row.get_field(sort_key) + index = bisect(sorted_keys, key) + sorted_keys.insert(index, key) + self._sorted.insert(index, row) + self.add_widget(row, index=index) + + def ticker_search(self, patt): + """Return sequence of matches when pattern ``patt`` is in a + symbol name. Most naive algo possible for the moment. + """ + for symbol, row in self.symbols2rows.items(): + if patt in symbol: + yield symbol, row + + def get_row(self, symbol: str) -> Row: + return self.symbols2rows[symbol] + + def search(self, patt): + """Search bar api compat. + """ + return dict(self.ticker_search(patt)) or {} diff --git a/tests/test_questrade.py b/tests/test_questrade.py index 482c1e37..6730884d 100644 --- a/tests/test_questrade.py +++ b/tests/test_questrade.py @@ -51,8 +51,10 @@ _ex_quotes = { 'askSize': 0, 'bidPrice': None, 'bidSize': 0, + 'contract_type': 'call', 'delay': 0, 'delta': -0.212857, + "expiry": "2021-01-15T00:00:00.000000-05:00", 'gamma': 0.003524, 'highPrice': 0, 'isHalted': False, @@ -66,6 +68,7 @@ _ex_quotes = { 'openInterest': 1, 'openPrice': 0, 'rho': -0.891868, + "strike": 8, 'symbol': 'WEED15Jan21P54.00.MX', 'symbolId': 22739148, 'theta': -0.012911, @@ -183,7 +186,7 @@ async def stream_option_chain(portal, symbols): ``symbols`` arg is ignored here. """ - symbol = 'APHA.TO' # your fave greenhouse LP + symbol = symbols[0] async with qt.get_client() as client: contracts = await client.get_all_contracts([symbol]) @@ -198,16 +201,21 @@ async def stream_option_chain(portal, symbols): broker='questrade', symbols=[sub], feed_type='option', + rate=4, diff_cached=False, ) + # latency arithmetic + loops = 8 + rate = 1/3. # 3 rps + timeout = loops / rate + try: # wait on the data streamer to actually start # delivering await agen.__anext__() # it'd sure be nice to have an asyncitertools here... - with trio.fail_after(2.1): - loops = 8 + with trio.fail_after(timeout): count = 0 async for quotes in agen: # print(f'got quotes for {quotes.keys()}') @@ -221,15 +229,33 @@ async def stream_option_chain(portal, symbols): count += 1 if count == loops: break + + # switch the subscription and make sure + # stream is still working + sub = subs_keys[1] + await agen.aclose() + agen = await portal.run( + 'piker.brokers.data', + 'start_quote_stream', + broker='questrade', + symbols=[sub], + feed_type='option', + rate=4, + diff_cached=False, + ) + + await agen.__anext__() + with trio.fail_after(timeout): + count = 0 + async for quotes in agen: + for symbol, quote in quotes.items(): + assert quote['key'] == sub + count += 1 + if count == loops: + break finally: # unsub - await portal.run( - 'piker.brokers.data', - 'modify_quote_stream', - broker='questrade', - feed_type='option', - symbols=[], - ) + await agen.aclose() async def stream_stocks(portal, symbols): @@ -240,6 +266,7 @@ async def stream_stocks(portal, symbols): 'start_quote_stream', broker='questrade', symbols=symbols, + diff_cached=False, ) try: # it'd sure be nice to have an asyncitertools here... @@ -250,13 +277,7 @@ async def stream_stocks(portal, symbols): break finally: # unsub - await portal.run( - 'piker.brokers.data', - 'modify_quote_stream', - broker='questrade', - feed_type='stock', - symbols=[], - ) + await agen.aclose() @pytest.mark.parametrize( @@ -265,8 +286,14 @@ async def stream_stocks(portal, symbols): (stream_stocks,), (stream_option_chain,), (stream_stocks, stream_option_chain), + (stream_stocks, stream_stocks), + (stream_option_chain, stream_option_chain), + ], + ids=[ + 'stocks', 'options', + 'stocks_and_options', 'stocks_and_stocks', + 'options_and_options', ], - ids=['stocks', 'options', 'stocks_and_options'], ) @tractor_test async def test_quote_streaming(tmx_symbols, loglevel, stream_what): @@ -284,9 +311,16 @@ async def test_quote_streaming(tmx_symbols, loglevel, stream_what): 'piker.brokers.core' ], ) - async with trio.open_nursery() as n: - for func in stream_what: - n.start_soon(func, portal, tmx_symbols) + if len(stream_what) > 1: + # stream disparate symbol sets per task + first, *tail = tmx_symbols + symbols = ([first], tail) + else: + symbols = [tmx_symbols] + + async with trio.open_nursery() as n: + for syms, func in zip(symbols, stream_what): + n.start_soon(func, portal, syms) # stop all spawned subactors await nursery.cancel()