diff --git a/piker/_daemon.py b/piker/_daemon.py index 30fdfec2..07a584c3 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -18,7 +18,6 @@ Structured, daemon tree service management. """ -from functools import partial from typing import Optional, Union, Callable, Any from contextlib import asynccontextmanager, AsyncExitStack from collections import defaultdict @@ -72,7 +71,7 @@ class Services(BaseModel): ctx, first = await self.ctx_stack.enter_async_context( portal.open_context( target, - **kwargs, + **kwargs, ) ) return ctx @@ -143,8 +142,14 @@ async def maybe_open_runtime( Start the ``tractor`` runtime (a root actor) if none exists. """ + settings = _tractor_kwargs + settings.update(kwargs) + if not tractor.current_actor(err_on_no_runtime=False): - async with tractor.open_root_actor(loglevel=loglevel, **kwargs): + async with tractor.open_root_actor( + loglevel=loglevel, + **settings, + ): yield else: yield diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index 64f0ad3a..4b4bd1cd 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -32,6 +32,10 @@ class SymbolNotFound(BrokerError): "Symbol not found by broker search" +class NoData(BrokerError): + "Symbol data not permitted" + + def resproc( resp: asks.response_objects.Response, log: logging.Logger, diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 1820b6c6..c2de7ded 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -82,7 +82,7 @@ _ohlc_dtype = [ ohlc_dtype = np.dtype(_ohlc_dtype) _show_wap_in_history = False -_search_conf = {'pause_period': 0.375} +_search_conf = {'pause_period': 0.0616} # https://binance-docs.github.io/apidocs/spot/en/#exchange-information @@ -207,6 +207,25 @@ class Client: return self._pairs + async def search_symbols( + self, + pattern: str, + limit: int = None, + ) -> Dict[str, Any]: + if self._pairs is not None: + data = self._pairs + else: + data = await self.symbol_info() + + matches = fuzzy.extractBests( + pattern, + data, + score_cutoff=50, + ) + # repack in dict form + return {item[0]['symbol']: item[0] + for item in matches} + async def bars( self, symbol: str, @@ -298,7 +317,7 @@ async def stream_messages(ws): if cs.cancelled_caught: timeouts += 1 - if timeouts > 2: + if timeouts > 10: raise trio.TooSlowError("binance feed seems down?") continue diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index aa3abd24..164a060b 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -30,7 +30,7 @@ import tractor from ..cli import cli from .. import watchlists as wl from ..log import get_console_log, colorize_json, get_logger -from .._daemon import maybe_spawn_brokerd +from .._daemon import maybe_spawn_brokerd, maybe_open_pikerd from ..brokers import core, get_brokermod, data log = get_logger('cli') @@ -50,7 +50,7 @@ def api(config, meth, kwargs, keys): """Make a broker-client API method call """ # global opts - broker = config['broker'] + broker = config['brokers'][0] _kwargs = {} for kwarg in kwargs: @@ -87,7 +87,7 @@ def quote(config, tickers, df_output): """Print symbol quotes to the console """ # global opts - brokermod = config['brokermod'] + brokermod = config['brokermods'][0] quotes = trio.run(partial(core.stocks_quote, brokermod, tickers)) if not quotes: @@ -123,7 +123,7 @@ def bars(config, symbol, count, df_output): """Retreive 1m bars for symbol and print on the console """ # global opts - brokermod = config['brokermod'] + brokermod = config['brokermods'][0] # broker backend should return at the least a # list of candle dictionaries @@ -159,7 +159,7 @@ def record(config, rate, name, dhost, filename): """Record client side quotes to a file on disk """ # global opts - brokermod = config['brokermod'] + brokermod = config['brokermods'][0] loglevel = config['loglevel'] log = config['log'] @@ -222,7 +222,7 @@ def optsquote(config, symbol, df_output, date): """Retreive symbol option quotes on the console """ # global opts - brokermod = config['brokermod'] + brokermod = config['brokermods'][0] quotes = trio.run( partial( @@ -250,7 +250,7 @@ def symbol_info(config, tickers): """Print symbol quotes to the console """ # global opts - brokermod = config['brokermod'] + brokermod = config['brokermods'][0] quotes = trio.run(partial(core.symbol_info, brokermod, tickers)) if not quotes: @@ -273,13 +273,25 @@ def search(config, pattern): """Search for symbols from broker backend(s). """ # global opts - brokermod = config['brokermod'] + brokermods = config['brokermods'] - quotes = tractor.run( - partial(core.symbol_search, brokermod, pattern), - start_method='forkserver', - loglevel='info', + # define tractor entrypoint + async def main(func): + + async with maybe_open_pikerd( + loglevel=config['loglevel'], + ): + return await func() + + quotes = trio.run( + main, + partial( + core.symbol_search, + brokermods, + pattern, + ), ) + if not quotes: log.error(f"No matches could be found for {pattern}?") return diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 5189df85..59621b63 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -24,8 +24,12 @@ import inspect from types import ModuleType from typing import List, Dict, Any, Optional +import trio + from ..log import get_logger from . import get_brokermod +from .._daemon import maybe_spawn_brokerd +from .api import open_cached_client log = get_logger(__name__) @@ -126,13 +130,41 @@ async def symbol_info( return await client.symbol_info(symbol, **kwargs) +async def search_w_brokerd(name: str, pattern: str) -> dict: + + async with open_cached_client(name) as client: + + # TODO: support multiple asset type concurrent searches. + return await client.search_symbols(pattern=pattern) + + async def symbol_search( - brokermod: ModuleType, + brokermods: list[ModuleType], pattern: str, **kwargs, ) -> Dict[str, Dict[str, Dict[str, Any]]]: """Return symbol info from broker. """ - async with brokermod.get_client() as client: - # TODO: support multiple asset type concurrent searches. - return await client.search_stocks(pattern=pattern, **kwargs) + results = [] + + async def search_backend(brokername: str) -> None: + + async with maybe_spawn_brokerd( + brokername, + ) as portal: + + results.append(( + brokername, + await portal.run( + search_w_brokerd, + name=brokername, + pattern=pattern, + ), + )) + + async with trio.open_nursery() as n: + + for mod in brokermods: + n.start_soon(search_backend, mod.name) + + return results diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 17c0ceb2..96398c44 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -45,12 +45,14 @@ from ib_insync.objects import Position import ib_insync as ibis from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client +from fuzzywuzzy import process as fuzzy +import numpy as np from ..log import get_logger, get_console_log from .._daemon import maybe_spawn_brokerd from ..data._source import from_df from ..data._sharedmem import ShmArray -from ._util import SymbolNotFound +from ._util import SymbolNotFound, NoData log = get_logger(__name__) @@ -87,7 +89,15 @@ _time_frames = { 'Y': 'OneYear', } -_show_wap_in_history = False +_show_wap_in_history: bool = False + +# optional search config the backend can register for +# it's symbol search handling (in this case we avoid +# accepting patterns before the kb has settled more then +# a quarter second). +_search_conf = { + 'pause_period': 6/16, +} # overrides to sidestep pretty questionable design decisions in @@ -141,11 +151,39 @@ class NonShittyIB(ibis.IB): # map of symbols to contract ids _adhoc_cmdty_data_map = { # https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 - # NOTE: cmdtys don't have trade data: + + # NOTE: some cmdtys/metals don't have trade data like gold/usd: # https://groups.io/g/twsapi/message/44174 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), } +_adhoc_futes_set = { + + # equities + 'nq.globex', + 'mnq.globex', + 'es.globex', + 'mes.globex', + + # cypto$ + 'brr.cmecrypto', + 'ethusdrr.cmecrypto', + + # metals + 'xauusd.cmdty', +} + +# exchanges we don't support at the moment due to not knowing +# how to do symbol-contract lookup correctly likely due +# to not having the data feeds subscribed. +_exch_skip_list = { + 'ASX', # aussie stocks + 'MEXI', # mexican stocks + 'VALUE', # no idea +} + +# https://misc.interactivebrokers.com/cstools/contract_info/v3.10/index.php?action=Conid%20Info&wlId=IB&conid=69067924 + _enters = 0 @@ -245,27 +283,45 @@ class Client: """ descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) - futs = [] - for d in descriptions: - con = d.contract - futs.append(self.ib.reqContractDetailsAsync(con)) + if descriptions is not None: - # batch request all details - results = await asyncio.gather(*futs) - - # XXX: if there is more then one entry in the details list - details = {} - for details_set in results: - # then the contract is so called "ambiguous". - for d in details_set: + futs = [] + for d in descriptions: con = d.contract - unique_sym = f'{con.symbol}.{con.primaryExchange}' - details[unique_sym] = asdict(d) if asdicts else d + if con.primaryExchange not in _exch_skip_list: + futs.append(self.ib.reqContractDetailsAsync(con)) - if len(details) == upto: - return details + # batch request all details + results = await asyncio.gather(*futs) - return details + # XXX: if there is more then one entry in the details list + details = {} + for details_set in results: + # then the contract is so called "ambiguous". + for d in details_set: + con = d.contract + unique_sym = f'{con.symbol}.{con.primaryExchange}' + details[unique_sym] = asdict(d) if asdicts else d + + if len(details) == upto: + return details + + return details + + else: + return {} + + async def search_symbols( + self, + pattern: str, + # how many contracts to search "up to" + upto: int = 3, + asdicts: bool = True, + ) -> Dict[str, ContractDetails]: + + # TODO add search though our adhoc-locally defined symbol set + # for futes/cmdtys/ + return await self.search_stocks(pattern, upto, asdicts) async def search_futes( self, @@ -318,7 +374,7 @@ class Client: sym, exch = symbol.upper().rsplit('.', maxsplit=1) except ValueError: # likely there's an embedded `.` for a forex pair - await tractor.breakpoint() + breakpoint() # futes if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): @@ -346,10 +402,13 @@ class Client: if exch in ('PURE', 'TSE'): # non-yankee currency = 'CAD' - if exch in ('PURE', 'TSE'): - # stupid ib... - primaryExchange = exch - exch = 'SMART' + # stupid ib... + primaryExchange = exch + exch = 'SMART' + + else: + exch = 'SMART' + primaryExchange = exch con = ibis.Stock( symbol=sym, @@ -562,7 +621,7 @@ class Client: # default config ports _tws_port: int = 7497 _gw_port: int = 4002 -_try_ports = [_tws_port, _gw_port] +_try_ports = [_gw_port, _tws_port] _client_ids = itertools.count() _client_cache = {} @@ -641,6 +700,8 @@ async def _aio_run_client_method( if to_trio and 'to_trio' in args: kwargs['to_trio'] = to_trio + log.runtime(f'Running {meth}({kwargs})') + return await async_meth(**kwargs) @@ -777,13 +838,76 @@ def normalize( return data +async def get_bars( + sym: str, + end_dt: str = "", +) -> (dict, np.ndarray): + + _err = None + + fails = 0 + for _ in range(2): + try: + + bars, bars_array = await _trio_run_client_method( + method='bars', + symbol=sym, + end_dt=end_dt, + ) + + if bars_array is None: + raise SymbolNotFound(sym) + + next_dt = bars[0].date + + return (bars, bars_array, next_dt), fails + + except RequestError as err: + _err = err + + # TODO: retreive underlying ``ib_insync`` error? + if err.code == 162: + + if 'HMDS query returned no data' in err.message: + # means we hit some kind of historical "dead zone" + # and further requests seem to always cause + # throttling despite the rps being low + break + + elif 'No market data permissions for' in err.message: + + # TODO: signalling for no permissions searches + raise NoData(f'Symbol: {sym}') + break + + + else: + log.exception( + "Data query rate reached: Press `ctrl-alt-f`" + "in TWS" + ) + + # TODO: should probably create some alert on screen + # and then somehow get that to trigger an event here + # that restarts/resumes this task? + await tractor.breakpoint() + fails += 1 + continue + + return (None, None) + + # else: # throttle wasn't fixed so error out immediately + # raise _err + + async def backfill_bars( sym: str, shm: ShmArray, # type: ignore # noqa # count: int = 20, # NOTE: any more and we'll overrun underlying buffer - count: int = 10, # NOTE: any more and we'll overrun the underlying buffer + count: int = 6, # NOTE: any more and we'll overrun the underlying buffer task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: """Fill historical bars into shared mem / storage afap. @@ -791,10 +915,7 @@ async def backfill_bars( https://github.com/pikers/piker/issues/128 """ - first_bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, - ) + (first_bars, bars_array, next_dt), fails = await get_bars(sym) # write historical data to buffer shm.push(bars_array) @@ -803,46 +924,24 @@ async def backfill_bars( task_status.started(cs) - next_dt = first_bars[0].date - i = 0 while i < count: - try: - bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, - end_dt=next_dt, - ) + out, fails = await get_bars(sym, end_dt=next_dt) - if bars_array is None: - raise SymbolNotFound(sym) + if fails is None or fails > 1: + break - shm.push(bars_array, prepend=True) - i += 1 - next_dt = bars[0].date + if out is (None, None): + # could be trying to retreive bars over weekend + # TODO: add logic here to handle tradable hours and only grab + # valid bars in the range + log.error(f"Can't grab bars starting at {next_dt}!?!?") + continue - except RequestError as err: - # TODO: retreive underlying ``ib_insync`` error? - - if err.code == 162: - - if 'HMDS query returned no data' in err.message: - # means we hit some kind of historical "dead zone" - # and further requests seem to always cause - # throttling despite the rps being low - break - - else: - log.exception( - "Data query rate reached: Press `ctrl-alt-f`" - "in TWS" - ) - - # TODO: should probably create some alert on screen - # and then somehow get that to trigger an event here - # that restarts/resumes this task? - await tractor.breakpoint() + bars, bars_array, next_dt = out + shm.push(bars_array, prepend=True) + i += 1 asset_type_map = { @@ -990,23 +1089,31 @@ async def stream_quotes( } } + con = first_ticker.contract + + # should be real volume for this contract by default + calc_price = False + # check for special contract types if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' - # should be real volume for this contract - calc_price = False + + suffix = con.primaryExchange + if not suffix: + suffix = con.exchange + else: # commodities and forex don't have an exchange name and # no real volume so we have to calculate the price - suffix = 'secType' + suffix = con.secType + # no real volume on this tract calc_price = True - # pass first quote asap quote = normalize(first_ticker, calc_price=calc_price) con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() + topic = '.'.join((con['symbol'], suffix)).lower() quote['symbol'] = topic + # pass first quote asap first_quote = {topic: quote} # ugh, clear ticks since we've consumed them @@ -1018,50 +1125,53 @@ async def stream_quotes( task_status.started((init_msgs, first_quote)) if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' - calc_price = False # should be real volume for contract + # suffix = 'exchange' + # calc_price = False # should be real volume for contract # wait for real volume on feed (trading might be closed) - async with aclosing(stream): + while True: - async for ticker in stream: - - # for a real volume contract we rait for the first - # "real" trade to take place - if not calc_price and not ticker.rtTime: - # spin consuming tickers until we get a real market datum - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] - - # XXX: this works because we don't use - # ``aclosing()`` above? - break - - # tell caller quotes are now coming in live - feed_is_live.set() - - async for ticker in stream: - - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price - ) - - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic - - await send_chan.send({topic: quote}) + ticker = await stream.receive() + # for a real volume contract we rait for the first + # "real" trade to take place + if not calc_price and not ticker.rtTime: + # spin consuming tickers until we get a real market datum + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first real volume tick") # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is truly stateful trash) ticker.ticks = [] + # XXX: this works because we don't use + # ``aclosing()`` above? + break + + # tell caller quotes are now coming in live + feed_is_live.set() + + # last = time.time() + async with aclosing(stream): + async for ticker in stream: + # print(f'ticker rate: {1/(time.time() - last)}') + + # print(ticker.vwap) + quote = normalize( + ticker, + calc_price=calc_price + ) + + # con = quote['contract'] + # topic = '.'.join((con['symbol'], suffix)).lower() + quote['symbol'] = topic + await send_chan.send({topic: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] + # last = time.time() + def pack_position(pos: Position) -> Dict[str, Any]: con = pos.contract @@ -1179,3 +1289,63 @@ async def stream_trades( continue yield {'local_trades': (event_name, msg)} + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, +) -> None: + # async with open_cached_client('ib') as client: + + # load all symbols locally for fast search + await ctx.started({}) + + async with ctx.open_stream() as stream: + + last = time.time() + + async for pattern in stream: + log.debug(f'received {pattern}') + now = time.time() + + assert pattern, 'IB can not accept blank search pattern' + + # throttle search requests to no faster then 1Hz + diff = now - last + if diff < 1.0: + log.debug('throttle sleeping') + await trio.sleep(diff) + try: + pattern = stream.receive_nowait() + except trio.WouldBlock: + pass + + if not pattern or pattern.isspace(): + log.warning('empty pattern received, skipping..') + + # XXX: this unblocks the far end search task which may + # hold up a multi-search nursery block + await stream.send({}) + + continue + + log.debug(f'searching for {pattern}') + # await tractor.breakpoint() + last = time.time() + results = await _trio_run_client_method( + method='search_stocks', + pattern=pattern, + upto=5, + ) + log.debug(f'got results {results.keys()}') + + log.debug("fuzzy matching") + matches = fuzzy.extractBests( + pattern, + results, + score_cutoff=50, + ) + + matches = {item[2]: item[0] for item in matches} + log.debug(f"sending matches: {matches.keys()}") + await stream.send(matches) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 8f83c960..7e9afd5c 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -21,7 +21,7 @@ Kraken backend. from contextlib import asynccontextmanager, AsyncExitStack from dataclasses import asdict, field from types import ModuleType -from typing import List, Dict, Any, Tuple +from typing import List, Dict, Any, Tuple, Optional import json import time @@ -37,6 +37,7 @@ from trio_websocket._impl import ( import arrow import asks +from fuzzywuzzy import process as fuzzy import numpy as np import trio import tractor @@ -56,6 +57,11 @@ log = get_logger(__name__) _url = 'https://api.kraken.com/0' +_search_conf = { + 'pause_period': 0.0616 +} + + # Broker specific ohlc schema which includes a vwap field _ohlc_dtype = [ ('index', int), @@ -147,6 +153,17 @@ class Client: 'User-Agent': 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' }) + self._pairs: list[str] = [] + + @property + def pairs(self) -> Dict[str, Any]: + if self._pairs is None: + raise RuntimeError( + "Make sure to run `cache_symbols()` on startup!" + ) + # retreive and cache all symbols + + return self._pairs async def _public( self, @@ -162,14 +179,51 @@ class Client: async def symbol_info( self, - pair: str = 'all', + pair: Optional[str] = None, ): - resp = await self._public('AssetPairs', {'pair': pair}) + if pair is not None: + pairs = {'pair': pair} + else: + pairs = None # get all pairs + + resp = await self._public('AssetPairs', pairs) err = resp['error'] if err: raise BrokerError(err) - true_pair_key, data = next(iter(resp['result'].items())) - return data + + pairs = resp['result'] + + if pair is not None: + _, data = next(iter(pairs.items())) + return data + else: + return pairs + + async def cache_symbols( + self, + ) -> dict: + if not self._pairs: + self._pairs = await self.symbol_info() + + return self._pairs + + async def search_symbols( + self, + pattern: str, + limit: int = None, + ) -> Dict[str, Any]: + if self._pairs is not None: + data = self._pairs + else: + data = await self.symbol_info() + + matches = fuzzy.extractBests( + pattern, + data, + score_cutoff=50, + ) + # repack in dict form + return {item[0]['altname']: item[0] for item in matches} async def bars( self, @@ -182,6 +236,7 @@ class Client: if since is None: since = arrow.utcnow().floor('minute').shift( minutes=-count).timestamp() + # UTC 2017-07-02 12:53:20 is oldest seconds value since = str(max(1499000000, since)) json = await self._public( @@ -232,7 +287,12 @@ class Client: @asynccontextmanager async def get_client() -> Client: - yield Client() + client = Client() + + # at startup, load all symbols locally for fast search + await client.cache_symbols() + + yield client async def stream_messages(ws): @@ -249,7 +309,7 @@ async def stream_messages(ws): too_slow_count += 1 - if too_slow_count > 10: + if too_slow_count > 20: log.warning( "Heartbeat is too slow, resetting ws connection") @@ -317,7 +377,8 @@ def normalize( # seriously eh? what's with this non-symmetry everywhere # in subscription systems... - topic = quote['pair'].replace('/', '') + # XXX: piker style is always lowercases symbols. + topic = quote['pair'].replace('/', '').lower() # print(quote) return topic, quote @@ -368,10 +429,13 @@ class AutoReconWs: self, tries: int = 10000, ) -> None: - try: - await self._stack.aclose() - except (DisconnectionTimeout, RuntimeError): - await trio.sleep(1) + while True: + try: + await self._stack.aclose() + except (DisconnectionTimeout, RuntimeError): + await trio.sleep(1) + else: + break last_err = None for i in range(tries): @@ -430,12 +494,12 @@ async def open_autorecon_ws(url): async def backfill_bars( + sym: str, shm: ShmArray, # type: ignore # noqa - count: int = 10, # NOTE: any more and we'll overrun the underlying buffer - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: """Fill historical bars into shared mem / storage afap. """ @@ -475,6 +539,10 @@ async def stream_quotes( # keep client cached for real-time section for sym in symbols: + + # transform to upper since piker style is always lower + sym = sym.upper() + si = Pair(**await client.symbol_info(sym)) # validation syminfo = si.dict() syminfo['price_tick_size'] = 1 / 10**si.pair_decimals @@ -482,7 +550,7 @@ async def stream_quotes( sym_infos[sym] = syminfo ws_pairs[sym] = si.wsname - symbol = symbols[0] + symbol = symbols[0].lower() init_msgs = { # pass back token, and bool, signalling if we're the writer @@ -570,8 +638,34 @@ async def stream_quotes( elif typ == 'l1': quote = ohlc - topic = quote['symbol'] + topic = quote['symbol'].lower() # XXX: format required by ``tractor.msg.pub`` # requires a ``Dict[topic: str, quote: dict]`` await send_chan.send({topic: quote}) + + +@tractor.context +async def open_symbol_search( + ctx: tractor.Context, +) -> Client: + async with open_cached_client('kraken') as client: + + # load all symbols locally for fast search + cache = await client.cache_symbols() + await ctx.started(cache) + + async with ctx.open_stream() as stream: + + async for pattern in stream: + + matches = fuzzy.extractBests( + pattern, + cache, + score_cutoff=50, + ) + # repack in dict form + await stream.send( + {item[0]['altname']: item[0] + for item in matches} + ) diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 528df91e..3f09587c 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -628,7 +628,7 @@ class Client: f"Took {time.time() - start} seconds to retreive {len(bars)} bars") return bars - async def search_stocks( + async def search_symbols( self, pattern: str, # how many contracts to return diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index b643b952..e881a726 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -136,7 +136,6 @@ def get_orders( return _orders -# TODO: make this a ``tractor.msg.pub`` async def send_order_cmds(symbol_key: str): """ Order streaming task: deliver orders transmitted from UI diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 2ca5a1a6..0cc38874 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -57,21 +57,31 @@ def pikerd(loglevel, host, tl, pdb): @click.group(context_settings=_context_defaults) -@click.option('--broker', '-b', default=DEFAULT_BROKER, - help='Broker backend to use') +@click.option( + '--brokers', '-b', + default=[DEFAULT_BROKER], + multiple=True, + help='Broker backend to use' +) @click.option('--loglevel', '-l', default='warning', help='Logging level') @click.option('--tl', is_flag=True, help='Enable tractor logging') @click.option('--configdir', '-c', help='Configuration directory') @click.pass_context -def cli(ctx, broker, loglevel, tl, configdir): +def cli(ctx, brokers, loglevel, tl, configdir): if configdir is not None: assert os.path.isdir(configdir), f"`{configdir}` is not a valid path" config._override_config_dir(configdir) ctx.ensure_object(dict) + + if len(brokers) == 1: + brokermods = [get_brokermod(brokers[0])] + else: + brokermods = [get_brokermod(broker) for broker in brokers] + ctx.obj.update({ - 'broker': broker, - 'brokermod': get_brokermod(broker), + 'brokers': brokers, + 'brokermods': brokermods, 'loglevel': loglevel, 'tractorloglevel': None, 'log': get_console_log(loglevel), diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 2079eb71..566f2b07 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -227,7 +227,8 @@ async def sample_and_broadcast( # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. - subs = bus._subscribers[sym] + subs = bus._subscribers[sym.lower()] + for ctx in subs: # print(f'sub is {ctx.chan.uid}') try: diff --git a/piker/data/feed.py b/piker/data/feed.py index 76c2bc23..a0e2478b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -17,6 +17,8 @@ """ Data feed apis and infra. +This module is enabled for ``brokerd`` daemons. + """ from dataclasses import dataclass, field from contextlib import asynccontextmanager @@ -25,7 +27,7 @@ from types import ModuleType from typing import ( Dict, Any, Sequence, AsyncIterator, Optional, - List + List, Awaitable, Callable, ) import trio @@ -43,7 +45,9 @@ from ._sharedmem import ( attach_shm_array, ShmArray, ) +from .ingest import get_ingestormod from ._source import base_iohlc_dtype, Symbol +from ..ui import _search from ._sampling import ( _shms, _incrementers, @@ -51,7 +55,6 @@ from ._sampling import ( iter_ohlc_periods, sample_and_broadcast, ) -from .ingest import get_ingestormod log = get_logger(__name__) @@ -145,12 +148,14 @@ async def _setup_persistent_brokerd( async def allocate_persistent_feed( + ctx: tractor.Context, bus: _FeedsBus, brokername: str, symbol: str, loglevel: str, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: try: @@ -172,7 +177,7 @@ async def allocate_persistent_feed( ) # do history validation? - assert opened, f'Persistent shm for {symbol} was already open?!' + # assert opened, f'Persistent shm for {symbol} was already open?!' # if not opened: # raise RuntimeError("Persistent shm for sym was already open?!") @@ -198,10 +203,12 @@ async def allocate_persistent_feed( # TODO: make this into a composed type which also # contains the backfiller cs for individual super-based # resspawns when needed. - bus.feeds[symbol] = (cs, init_msg, first_quote) + + # XXX: the ``symbol`` here is put into our native piker format (i.e. + # lower case). + bus.feeds[symbol.lower()] = (cs, init_msg, first_quote) if opened: - # start history backfill task ``backfill_bars()`` is # a required backend func this must block until shm is # filled with first set of ohlc bars @@ -235,13 +242,14 @@ async def allocate_persistent_feed( @tractor.stream async def attach_feed_bus( + ctx: tractor.Context, brokername: str, symbol: str, loglevel: str, + ) -> None: - # try: if loglevel is None: loglevel = tractor.current_actor().loglevel @@ -252,26 +260,33 @@ async def attach_feed_bus( assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) + sub_only: bool = False + entry = bus.feeds.get(symbol) + + # if no cached feed for this symbol has been created for this + # brokerd yet, start persistent stream and shm writer task in + # service nursery async with bus.task_lock: - task_cs = bus.feeds.get(symbol) - sub_only: bool = False - - # if no cached feed for this symbol has been created for this - # brokerd yet, start persistent stream and shm writer task in - # service nursery - if task_cs is None: + if entry is None: init_msg, first_quote = await bus.nursery.start( partial( allocate_persistent_feed, ctx=ctx, bus=bus, brokername=brokername, + + # here we pass through the selected symbol in native + # "format" (i.e. upper vs. lowercase depending on + # provider). symbol=symbol, + loglevel=loglevel, ) ) bus._subscribers.setdefault(symbol, []).append(ctx) + assert isinstance(bus.feeds[symbol], tuple) + else: sub_only = True @@ -313,6 +328,8 @@ class Feed: _trade_stream: Optional[AsyncIterator[Dict[str, Any]]] = None _max_sample_rate: int = 0 + search: Callable[..., Awaitable] = None + # cache of symbol info messages received as first message when # a stream startsc. symbols: Dict[str, Symbol] = field(default_factory=dict) @@ -335,6 +352,7 @@ class Feed: iter_ohlc_periods, delay_s=delay_s or self._max_sample_rate, ) as self._index_stream: + yield self._index_stream else: yield self._index_stream @@ -364,6 +382,7 @@ class Feed: # more then one? topics=['local_trades'], ) as self._trade_stream: + yield self._trade_stream else: yield self._trade_stream @@ -376,27 +395,78 @@ def sym_to_shm_key( return f'{broker}.{symbol}' +@asynccontextmanager +async def install_brokerd_search( + + portal: tractor._portal.Portal, + brokermod: ModuleType, + +) -> None: + + async with portal.open_context( + brokermod.open_symbol_search + ) as (ctx, cache): + + # shield here since we expect the search rpc to be + # cancellable by the user as they see fit. + async with ctx.open_stream() as stream: + + async def search(text: str) -> Dict[str, Any]: + await stream.send(text) + return await stream.receive() + + async with _search.register_symbol_search( + + provider_name=brokermod.name, + search_routine=search, + + # TODO: should be make this a required attr of + # a backend module? + pause_period=getattr( + brokermod, '_search_conf', {} + ).get('pause_period', 0.0616), + ): + yield + + @asynccontextmanager async def open_feed( + brokername: str, symbols: Sequence[str], loglevel: Optional[str] = None, + ) -> AsyncIterator[Dict[str, Any]]: - """Open a "data feed" which provides streamed real-time quotes. - """ + ''' + Open a "data feed" which provides streamed real-time quotes. + + ''' + sym = symbols[0].lower() + + # TODO: feed cache locking, right now this is causing + # issues when reconnecting to a long running emsd? + # global _searcher_cache + + # async with _cache_lock: + # feed = _searcher_cache.get((brokername, sym)) + + # # if feed is not None and sym in feed.symbols: + # if feed is not None: + # yield feed + # # short circuit + # return + try: mod = get_brokermod(brokername) except ImportError: mod = get_ingestormod(brokername) - if loglevel is None: - loglevel = tractor.current_actor().loglevel + # no feed for broker exists so maybe spawn a data brokerd - # TODO: do all! - sym = symbols[0] - - # TODO: compress these to one line with py3.9+ - async with maybe_spawn_brokerd(brokername, loglevel=loglevel) as portal: + async with maybe_spawn_brokerd( + brokername, + loglevel=loglevel + ) as portal: async with portal.open_stream_from( @@ -449,10 +519,4 @@ async def open_feed( feed._max_sample_rate = max(ohlc_sample_rates) - try: - yield feed - - finally: - # always cancel the far end producer task - with trio.CancelScope(shield=True): - await stream.aclose() + yield feed diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 9f6e118b..b92e4aa5 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -19,17 +19,22 @@ High level Qt chart widgets. """ import time +from contextlib import AsyncExitStack from typing import Tuple, Dict, Any, Optional, Callable from types import ModuleType from functools import partial from PyQt5 import QtCore, QtGui +from PyQt5.QtCore import Qt import numpy as np import pyqtgraph as pg import tractor import trio -from trio_typing import TaskStatus +from .._daemon import ( + maybe_spawn_brokerd, +) +from ..brokers import get_brokermod from ._axes import ( DynamicDateAxis, PriceAxis, @@ -55,16 +60,19 @@ from ._style import ( _bars_from_right_in_follow_mode, _bars_to_left_in_follow_mode, ) +from . import _search +from ._event import open_key_stream from ..data._source import Symbol from ..data._sharedmem import ShmArray +from ..data import maybe_open_shm_array from .. import brokers from .. import data -from ..data import maybe_open_shm_array from ..log import get_logger from ._exec import run_qtractor, current_screen from ._interaction import ChartView from .order_mode import start_order_mode from .. import fsp +from ..data import feed log = get_logger(__name__) @@ -78,36 +86,44 @@ class ChartSpace(QtGui.QWidget): def __init__(self, parent=None): super().__init__(parent) - self.v_layout = QtGui.QVBoxLayout(self) - self.v_layout.setContentsMargins(0, 0, 0, 0) - self.v_layout.setSpacing(0) + self.hbox = QtGui.QHBoxLayout(self) + self.hbox.setContentsMargins(0, 0, 0, 0) + self.hbox.setSpacing(2) + + self.vbox = QtGui.QVBoxLayout() + self.vbox.setContentsMargins(0, 0, 0, 0) + self.vbox.setSpacing(2) + + self.hbox.addLayout(self.vbox) self.toolbar_layout = QtGui.QHBoxLayout() self.toolbar_layout.setContentsMargins(0, 0, 0, 0) - self.h_layout = QtGui.QHBoxLayout() - self.h_layout.setContentsMargins(0, 0, 0, 0) - # self.init_timeframes_ui() # self.init_strategy_ui() - self.v_layout.addLayout(self.toolbar_layout) - self.v_layout.addLayout(self.h_layout) + self.vbox.addLayout(self.toolbar_layout) + # self.vbox.addLayout(self.hbox) + self._chart_cache = {} - self.symbol_label: Optional[QtGui.QLabel] = None + self.linkedcharts: 'LinkedSplitCharts' = None + self._root_n: Optional[trio.Nursery] = None - def init_search(self): - self.symbol_label = label = QtGui.QLabel() - label.setTextFormat(3) # markdown - label.setFont(_font.font) - label.setMargin(0) - # title = f'sym: {self.symbol}' - # label.setText(title) + def set_chart_symbol( + self, + symbol_key: str, # of form . + linked_charts: 'LinkedSplitCharts', # type: ignore - label.setAlignment( - QtCore.Qt.AlignVCenter - | QtCore.Qt.AlignLeft - ) - self.v_layout.addWidget(label) + ) -> None: + # re-sort org cache symbol list in LIFO order + cache = self._chart_cache + cache.pop(symbol_key, None) + cache[symbol_key] = linked_charts + + def get_chart_symbol( + self, + symbol_key: str, + ) -> 'LinkedSplitCharts': # type: ignore + return self._chart_cache.get(symbol_key) def init_timeframes_ui(self): self.tf_layout = QtGui.QHBoxLayout() @@ -130,42 +146,73 @@ class ChartSpace(QtGui.QWidget): # def init_strategy_ui(self): # self.strategy_box = StrategyBoxWidget(self) # self.toolbar_layout.addWidget(self.strategy_box) + def load_symbol( self, - brokername: str, + providername: str, symbol_key: str, - data: np.ndarray, + loglevel: str, ohlc: bool = True, + reset: bool = False, ) -> None: """Load a new contract into the charting app. Expects a ``numpy`` structured array containing all the ohlcv fields. - """ - # TODO: symbol search - # # of course this doesn't work :eyeroll: - # h = _font.boundingRect('Ag').height() - # print(f'HEIGHT {h}') - # self.symbol_label.setFixedHeight(h + 4) - # self.v_layout.update() - # self.symbol_label.setText(f'/`{symbol}`') - linkedcharts = self._chart_cache.setdefault( - symbol_key, - LinkedSplitCharts(self) - ) + """ + # our symbol key style is always lower case + symbol_key = symbol_key.lower() + + # fully qualified symbol name (SNS i guess is what we're making?) + fqsn = '.'.join([symbol_key, providername]) + + linkedcharts = self.get_chart_symbol(fqsn) + + if not self.vbox.isEmpty(): + # XXX: this is CRITICAL especially with pixel buffer caching + self.linkedcharts.hide() + + # XXX: pretty sure we don't need this + # remove any existing plots? + # XXX: ahh we might want to support cache unloading.. + self.vbox.removeWidget(self.linkedcharts) + + # switching to a new viewable chart + if linkedcharts is None or reset: + + # we must load a fresh linked charts set + linkedcharts = LinkedSplitCharts(self) + + # spawn new task to start up and update new sub-chart instances + self._root_n.start_soon( + chart_symbol, + self, + providername, + symbol_key, + loglevel, + ) + + + self.set_chart_symbol(fqsn, linkedcharts) + + self.vbox.addWidget(linkedcharts) + + # chart is already in memory so just focus it + if self.linkedcharts: + self.linkedcharts.unfocus() + + # self.vbox.addWidget(linkedcharts) + linkedcharts.show() + linkedcharts.focus() self.linkedcharts = linkedcharts - # remove any existing plots - if not self.v_layout.isEmpty(): - self.v_layout.removeWidget(linkedcharts) + symbol = linkedcharts.symbol - self.v_layout.addWidget(linkedcharts) - - return linkedcharts - - # TODO: add signalling painter system - # def add_signals(self): - # self.chart.add_signals() + if symbol is not None: + self.window.setWindowTitle( + f'{symbol.key}@{symbol.brokers} ' + f'tick:{symbol.tick_size}' + ) class LinkedSplitCharts(QtGui.QWidget): @@ -184,8 +231,10 @@ class LinkedSplitCharts(QtGui.QWidget): zoomIsDisabled = QtCore.pyqtSignal(bool) def __init__( + self, chart_space: ChartSpace, + ) -> None: super().__init__() self.signals_visible: bool = False @@ -194,6 +243,8 @@ class LinkedSplitCharts(QtGui.QWidget): self.subplots: Dict[Tuple[str, ...], ChartPlotWidget] = {} self.chart_space = chart_space + self.chart_space = chart_space + self.xaxis = DynamicDateAxis( orientation='bottom', linked_charts=self @@ -232,6 +283,14 @@ class LinkedSplitCharts(QtGui.QWidget): sizes.extend([min_h_ind] * len(self.subplots)) self.splitter.setSizes(sizes) # , int(self.height()*0.2) + def focus(self) -> None: + if self.chart is not None: + self.chart.focus() + + def unfocus(self) -> None: + if self.chart is not None: + self.chart.clearFocus() + def plot_ohlc_main( self, symbol: Symbol, @@ -250,7 +309,7 @@ class LinkedSplitCharts(QtGui.QWidget): self.chart = self.add_plot( name=symbol.key, array=array, - xaxis=self.xaxis, + # xaxis=self.xaxis, style=style, _is_main=True, ) @@ -447,6 +506,10 @@ class ChartPlotWidget(pg.PlotWidget): # for when the splitter(s) are resized self._vb.sigResized.connect(self._set_yrange) + def focus(self) -> None: + # self.setFocus() + self._vb.setFocus() + def last_bar_in_view(self) -> int: self._ohlc[-1]['index'] @@ -1127,6 +1190,9 @@ async def spawn_fsps( Pass target entrypoint and historical data. """ + + linked_charts.focus() + # spawns sub-processes which execute cpu bound FSP code async with tractor.open_nursery(loglevel=loglevel) as n: @@ -1159,7 +1225,7 @@ async def spawn_fsps( # XXX: fsp may have been opened by a duplicate chart. Error for # now until we figure out how to wrap fsps as "feeds". - assert opened, f"A chart for {key} likely already exists?" + # assert opened, f"A chart for {key} likely already exists?" conf['shm'] = shm @@ -1219,9 +1285,6 @@ async def run_fsp( # data-array as first msg _ = await stream.receive() - conf['stream'] = stream - conf['portal'] = portal - shm = conf['shm'] if conf.get('overlay'): @@ -1253,16 +1316,27 @@ async def run_fsp( # fsp_func_name ) - # read last value + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linked_charts.chart.name + + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] + + # read from last calculated value array = shm.array value = array[fsp_func_name][-1] - - last_val_sticky = chart._ysticks[chart.name] last_val_sticky.update_from_data(-1, value) - chart.update_curve_from_array(fsp_func_name, array) + chart._lc.focus() - chart._shm = shm + # works also for overlays in which case data is looked up from + # internal chart array set.... + chart.update_curve_from_array(fsp_func_name, shm.array) # TODO: figure out if we can roll our own `FillToThreshold` to # get brush filled polygons for OS/OB conditions. @@ -1284,8 +1358,6 @@ async def run_fsp( chart._set_yrange() - stream = conf['stream'] - last = time.time() # update chart graphics @@ -1306,7 +1378,6 @@ async def run_fsp( # re-compute steps. read_tries = 2 while read_tries > 0: - try: # read last array = shm.array @@ -1387,7 +1458,6 @@ async def chart_symbol( brokername: str, sym: str, loglevel: str, - task_status: TaskStatus[Symbol] = trio.TASK_STATUS_IGNORED, ) -> None: """Spawn a real-time chart widget for this symbol and app session. @@ -1408,19 +1478,15 @@ async def chart_symbol( bars = ohlcv.array symbol = feed.symbols[sym] - task_status.started(symbol) - # load in symbol's ohlc data chart_app.window.setWindowTitle( f'{symbol.key}@{symbol.brokers} ' f'tick:{symbol.tick_size}' ) - # await tractor.breakpoint() linked_charts = chart_app.linkedcharts linked_charts._symbol = symbol chart = linked_charts.plot_ohlc_main(symbol, bars) - chart.setFocus() # plot historical vwap if available @@ -1495,9 +1561,8 @@ async def chart_symbol( ) # wait for a first quote before we start any update tasks - quote = await feed.receive() - - log.info(f'Received first quote {quote}') + # quote = await feed.receive() + # log.info(f'Received first quote {quote}') n.start_soon( check_for_new_bars, @@ -1518,12 +1583,48 @@ async def chart_symbol( await start_order_mode(chart, symbol, brokername) +async def load_providers( + brokernames: list[str], + loglevel: str, +) -> None: + + # TODO: seems like our incentive for brokerd caching lelel + backends = {} + + async with AsyncExitStack() as stack: + # TODO: spawn these async in nursery. + # load all requested brokerd's at startup and load their + # search engines. + for broker in brokernames: + + log.info(f'Loading brokerd for {broker}') + # spin up broker daemons for each provider + portal = await stack.enter_async_context( + maybe_spawn_brokerd( + broker, + loglevel=loglevel + ) + ) + + backends[broker] = portal + + await stack.enter_async_context( + feed.install_brokerd_search( + portal, + get_brokermod(broker), + ) + ) + + # keep search engines up until cancelled + await trio.sleep_forever() + + async def _async_main( # implicit required argument provided by ``qtractor_run()`` widgets: Dict[str, Any], - symbol_key: str, - brokername: str, + sym: str, + brokernames: str, loglevel: str, ) -> None: @@ -1556,27 +1657,58 @@ async def _async_main( # that run cached in the bg chart_app._root_n = root_n - chart_app.load_symbol(brokername, symbol_key, loglevel) + # setup search widget + search = _search.SearchWidget(chart_space=chart_app) - symbol = await root_n.start( - chart_symbol, - chart_app, - brokername, - symbol_key, - loglevel, + # the main chart's view is given focus at startup + search.bar.unfocus() + + # add search singleton to global chart-space widget + chart_app.hbox.addWidget( + search, + + # alights to top and uses minmial space based on + # search bar size hint (i think?) + alignment=Qt.AlignTop ) + chart_app.search = search - chart_app.window.setWindowTitle( - f'{symbol.key}@{symbol.brokers} ' - f'tick:{symbol.tick_size}' - ) + symbol, _, provider = sym.rpartition('.') - await trio.sleep_forever() + # this internally starts a ``chart_symbol()`` task above + chart_app.load_symbol(provider, symbol, loglevel) + + root_n.start_soon(load_providers, brokernames, loglevel) + + # spin up a search engine for the local cached symbol set + async with _search.register_symbol_search( + + provider_name='cache', + search_routine=partial( + _search.search_simple_dict, + source=chart_app._chart_cache, + ), + + ): + # start handling search bar kb inputs + async with open_key_stream( + search.bar, + ) as key_stream: + + # start kb handling task for searcher + root_n.start_soon( + _search.handle_keyboard_input, + # chart_app, + search, + key_stream, + ) + + await trio.sleep_forever() def _main( sym: str, - brokername: str, + brokernames: [str], piker_loglevel: str, tractor_kwargs, ) -> None: @@ -1586,7 +1718,7 @@ def _main( # Qt entry point run_qtractor( func=_async_main, - args=(sym, brokername, piker_loglevel), + args=(sym, brokernames, piker_loglevel), main_widget=ChartSpace, tractor_kwargs=tractor_kwargs, ) diff --git a/piker/ui/_event.py b/piker/ui/_event.py new file mode 100644 index 00000000..c3d919dc --- /dev/null +++ b/piker/ui/_event.py @@ -0,0 +1,91 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Qt event proxying and processing using ``trio`` mem chans. + +""" +from contextlib import asynccontextmanager + +from PyQt5 import QtCore, QtGui +from PyQt5.QtCore import QEvent +import trio + + +class EventCloner(QtCore.QObject): + """Clone and forward keyboard events over a trio memory channel + for later async processing. + + """ + _event_types: set[QEvent] = set() + _send_chan: trio.abc.SendChannel = None + + def eventFilter( + self, + source: QtGui.QWidget, + ev: QEvent, + ) -> None: + + if ev.type() in self._event_types: + + # TODO: what's the right way to allow this? + # if ev.isAutoRepeat(): + # ev.ignore() + + # XXX: we unpack here because apparently doing it + # after pop from the mem chan isn't showing the same + # event object? no clue wtf is going on there, likely + # something to do with Qt internals and calling the + # parent handler? + + key = ev.key() + mods = ev.modifiers() + txt = ev.text() + + # run async processing + self._send_chan.send_nowait((ev, key, mods, txt)) + + # never intercept the event + return False + + +@asynccontextmanager +async def open_key_stream( + + source_widget: QtGui.QWidget, + event_types: set[QEvent] = {QEvent.KeyPress}, + + # TODO: should we offer some kinda option for toggling releases? + # would it require a channel per event type? + # QEvent.KeyRelease, + +) -> trio.abc.ReceiveChannel: + + # 1 to force eager sending + send, recv = trio.open_memory_channel(16) + + kc = EventCloner() + kc._send_chan = send + kc._event_types = event_types + + source_widget.installEventFilter(kc) + + try: + yield recv + + finally: + await send.aclose() + source_widget.removeEventFilter(kc) diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index 53de8554..85488df8 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -38,6 +38,7 @@ from PyQt5.QtCore import ( QCoreApplication, ) import qdarkstyle +# import qdarkgraystyle import trio import tractor from outcome import Error @@ -144,6 +145,9 @@ def run_qtractor( # currently seem tricky.. app.setQuitOnLastWindowClosed(False) + # XXX: lmfao, this is how you disable text edit cursor blinking..smh + app.setCursorFlashTime(0) + # set global app singleton global _qt_app _qt_app = app diff --git a/piker/ui/_graphics/_cursor.py b/piker/ui/_graphics/_cursor.py index 159c773e..0919f6f9 100644 --- a/piker/ui/_graphics/_cursor.py +++ b/piker/ui/_graphics/_cursor.py @@ -435,7 +435,12 @@ class Cursor(pg.GraphicsObject): self, y_label_level: float = None, ) -> None: - g = self.graphics[self.active_plot] + + plot = self.active_plot + if not plot: + return + + g = self.graphics[plot] # show horiz line and y-label g['hl'].show() g['vl'].show() diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index 06258566..9c407f3c 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -188,7 +188,7 @@ class SelectRect(QtGui.QGraphicsRectItem): self._abs_top_right = label_anchor self._label_proxy.setPos(self.vb.mapFromView(label_anchor)) - self._label.show() + # self._label.show() def clear(self): """Clear the selection box from view. @@ -486,6 +486,8 @@ class ChartView(ViewBox): self._key_buffer = [] self._key_active: bool = False + self.setFocusPolicy(QtCore.Qt.StrongFocus) + @property def chart(self) -> 'ChartPlotWidget': # type: ignore # noqa return self._chart @@ -692,7 +694,7 @@ class ChartView(ViewBox): ev.accept() self.mode.submit_exec() - def keyReleaseEvent(self, ev): + def keyReleaseEvent(self, ev: QtCore.QEvent): """ Key release to normally to trigger release of input mode @@ -711,6 +713,10 @@ class ChartView(ViewBox): # if self.state['mouseMode'] == ViewBox.RectMode: self.setMouseMode(ViewBox.PanMode) + # ctlalt = False + # if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods: + # ctlalt = True + # if self.state['mouseMode'] == ViewBox.RectMode: # if key == QtCore.Qt.Key_Space: if mods == QtCore.Qt.ControlModifier or key == QtCore.Qt.Key_Control: @@ -722,7 +728,7 @@ class ChartView(ViewBox): self._key_active = False - def keyPressEvent(self, ev): + def keyPressEvent(self, ev: QtCore.QEvent) -> None: """ This routine should capture key presses in the current view box. @@ -747,15 +753,22 @@ class ChartView(ViewBox): ctrl = False if mods == QtCore.Qt.ControlModifier: ctrl = True - - if mods == QtCore.Qt.ControlModifier: self.mode._exec_mode = 'live' self._key_active = True - # alt - if mods == QtCore.Qt.AltModifier: - pass + # ctrl + alt + # ctlalt = False + # if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods: + # ctlalt = True + + # ctlr-/ for "lookup", "search" -> open search tree + if ctrl and key in { + QtCore.Qt.Key_L, + QtCore.Qt.Key_Space, + }: + search = self._chart._lc.chart_space.search + search.focus() # esc if key == QtCore.Qt.Key_Escape or (ctrl and key == QtCore.Qt.Key_C): @@ -802,5 +815,6 @@ class ChartView(ViewBox): # elif ev.key() == QtCore.Qt.Key_Backspace: # self.scaleHistory(len(self.axHistory)) else: + # maybe propagate to parent widget ev.ignore() self._key_active = False diff --git a/piker/ui/_search.py b/piker/ui/_search.py new file mode 100644 index 00000000..3adb16a1 --- /dev/null +++ b/piker/ui/_search.py @@ -0,0 +1,963 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +qompleterz: embeddable search and complete using trio, Qt and fuzzywuzzy. + +""" + +# link set for hackzin on this stuff: +# https://doc.qt.io/qt-5/qheaderview.html#moving-header-sections +# https://doc.qt.io/qt-5/model-view-programming.html +# https://doc.qt.io/qt-5/modelview.html +# https://doc.qt.io/qt-5/qtreeview.html#selectedIndexes +# https://doc.qt.io/qt-5/qmodelindex.html#siblingAtColumn +# https://doc.qt.io/qt-5/qitemselectionmodel.html#currentIndex +# https://www.programcreek.com/python/example/108109/PyQt5.QtWidgets.QTreeView +# https://doc.qt.io/qt-5/qsyntaxhighlighter.html +# https://github.com/qutebrowser/qutebrowser/blob/master/qutebrowser/completion/completiondelegate.py#L243 +# https://forum.qt.io/topic/61343/highlight-matched-substrings-in-qstyleditemdelegate + +from collections import defaultdict +from contextlib import asynccontextmanager +from functools import partial +from typing import ( + List, Optional, Callable, + Awaitable, Sequence, Dict, + Any, AsyncIterator, Tuple, +) +import time +# from pprint import pformat + +from fuzzywuzzy import process as fuzzy +import trio +from trio_typing import TaskStatus +from PyQt5 import QtCore, QtGui +from PyQt5 import QtWidgets +from PyQt5.QtCore import ( + Qt, + # QSize, + QModelIndex, + QItemSelectionModel, +) +from PyQt5.QtGui import ( + # QLayout, + QStandardItem, + QStandardItemModel, +) +from PyQt5.QtWidgets import ( + QWidget, + QTreeView, + # QListWidgetItem, + # QAbstractScrollArea, + QStyledItemDelegate, +) + + +from ..log import get_logger +from ._style import ( + _font, + DpiAwareFont, + # hcolor, +) + + +log = get_logger(__name__) + + +class SimpleDelegate(QStyledItemDelegate): + """ + Super simple view delegate to render text in the same + font size as the search widget. + + """ + + def __init__( + self, + parent=None, + font: DpiAwareFont = _font, + ) -> None: + super().__init__(parent) + self.dpi_font = font + + +class CompleterView(QTreeView): + + # XXX: relevant docs links: + # - simple widget version of this: + # https://doc.qt.io/qt-5/qtreewidget.html#details + # - MVC high level instructional: + # https://doc.qt.io/qt-5/model-view-programming.html + # - MV tut: + # https://doc.qt.io/qt-5/modelview.html + # - custome header view (for doing stuff like we have in kivy?): + # https://doc.qt.io/qt-5/qheaderview.html#moving-header-sections + + # TODO: selection model stuff for eventual aggregate feeds + # charting and mgmt; + # https://doc.qt.io/qt-5/qabstractitemview.html#setSelectionModel + # https://doc.qt.io/qt-5/qitemselectionmodel.html + # https://doc.qt.io/qt-5/modelview.html#3-2-working-with-selections + # https://doc.qt.io/qt-5/model-view-programming.html#handling-selections-of-items + + # TODO: mouse extended handling: + # https://doc.qt.io/qt-5/qabstractitemview.html#entered + + def __init__( + self, + parent=None, + labels: List[str] = [], + ) -> None: + + super().__init__(parent) + + model = QStandardItemModel(self) + self.labels = labels + + # a std "tabular" config + self.setItemDelegate(SimpleDelegate()) + self.setModel(model) + self.setAlternatingRowColors(True) + # TODO: size this based on DPI font + self.setIndentation(20) + + self.pressed.connect(self.on_pressed) + + # self.setUniformRowHeights(True) + # self.setColumnWidth(0, 3) + # self.setVerticalBarPolicy(Qt.ScrollBarAlwaysOff) + # self.setSizeAdjustPolicy(QAbstractScrollArea.AdjustIgnored) + + # ux settings + self.setItemsExpandable(True) + self.setExpandsOnDoubleClick(False) + self.setAnimated(False) + self.setHorizontalScrollBarPolicy(Qt.ScrollBarAlwaysOff) + + # column headers + model.setHorizontalHeaderLabels(labels) + + self._font_size: int = 0 # pixels + + def on_pressed(self, idx: QModelIndex) -> None: + + search = self.parent() + search.chart_current_item(clear_to_cache=False) + search.focus() + + def set_font_size(self, size: int = 18): + # dpi_px_size = _font.px_size + # print(size) + if size < 0: + size = 16 + + self._font_size = size + + self.setStyleSheet(f"font: {size}px") + + def resize(self): + model = self.model() + cols = model.columnCount() + + for i in range(cols): + self.resizeColumnToContents(i) + + # inclusive of search bar and header "rows" in pixel terms + rows = 100 + # max_rows = 8 # 6 + search and headers + row_px = self.rowHeight(self.currentIndex()) + # print(f'font_h: {font_h}\n px_height: {px_height}') + + # TODO: probably make this more general / less hacky + self.setMinimumSize(self.width(), rows * row_px) + self.setMaximumSize(self.width() + 10, rows * row_px) + self.setFixedWidth(333) + + def is_selecting_d1(self) -> bool: + cidx = self.selectionModel().currentIndex() + return cidx.parent() == QModelIndex() + + def previous_index(self) -> QModelIndex: + + cidx = self.selectionModel().currentIndex() + one_above = self.indexAbove(cidx) + + if one_above.parent() == QModelIndex(): + # if the next node up's parent is the root we don't want to select + # the next node up since it's a top level node and we only + # select entries depth >= 2. + + # see if one more up is not the root and we can select it. + two_above = self.indexAbove(one_above) + if two_above != QModelIndex(): + return two_above + else: + return cidx + + return one_above # just next up + + def next_index(self) -> QModelIndex: + cidx = self.selectionModel().currentIndex() + one_below = self.indexBelow(cidx) + + if one_below.parent() == QModelIndex(): + # if the next node up's parent is the root we don't want to select + # the next node up since it's a top level node and we only + # select entries depth >= 2. + + # see if one more up is not the root and we can select it. + two_below = self.indexBelow(one_below) + if two_below != QModelIndex(): + return two_below + else: + return cidx + + return one_below # just next down + + def select_from_idx( + + self, + idx: QModelIndex, + + ) -> QStandardItem: + '''Select and return the item at index ``idx``. + + ''' + sel = self.selectionModel() + model = self.model() + + sel.setCurrentIndex( + idx, + QItemSelectionModel.ClearAndSelect | + QItemSelectionModel.Rows + ) + + return model.itemFromIndex(idx) + + def select_first(self) -> QStandardItem: + '''Select the first depth >= 2 entry from the completer tree and + return it's item. + + ''' + # ensure we're **not** selecting the first level parent node and + # instead its child. + model = self.model() + for idx, item in self.iter_d1(): + if model.rowCount(idx) == 0: + continue + else: + return self.select_from_idx(self.indexBelow(idx)) + + def select_next(self) -> QStandardItem: + idx = self.next_index() + assert idx.isValid() + return self.select_from_idx(idx) + + def select_previous(self) -> QStandardItem: + idx = self.previous_index() + assert idx.isValid() + return self.select_from_idx(idx) + + def next_section(self, direction: str = 'down') -> QModelIndex: + cidx = start_idx = self.selectionModel().currentIndex() + + # step up levels to depth == 1 + while cidx.parent() != QModelIndex(): + cidx = cidx.parent() + + # move to next section in `direction` + op = {'up': -1, 'down': +1}[direction] + next_row = cidx.row() + op + nidx = self.model().index(next_row, cidx.column(), QModelIndex()) + + # do nothing, if there is no valid "next" section + if not nidx.isValid(): + return self.select_from_idx(start_idx) + + # go to next selectable child item + self.select_from_idx(nidx) + return self.select_next() + + def iter_d1( + self, + ) -> tuple[QModelIndex, QStandardItem]: + + model = self.model() + isections = model.rowCount() + + # much thanks to following code to figure out breadth-first + # traversing from the root node: + # https://stackoverflow.com/a/33126689 + for i in range(isections): + idx = model.index(i, 0, QModelIndex()) + item = model.itemFromIndex(idx) + yield idx, item + + def find_section( + self, + section: str, + + ) -> Optional[QModelIndex]: + '''Find the *first* depth = 1 section matching ``section`` in + the tree and return its index. + + ''' + for idx, item in self.iter_d1(): + if item.text() == section: + return idx + else: + # caller must expect his + return None + + def clear_section( + self, + section: str, + status_field: str = None, + + ) -> None: + '''Clear all result-rows from under the depth = 1 section. + + ''' + idx = self.find_section(section) + model = self.model() + + if idx is not None: + if model.hasChildren(idx): + rows = model.rowCount(idx) + # print(f'removing {rows} from {section}') + assert model.removeRows(0, rows, parent=idx) + + # remove section as well ? + # model.removeRow(i, QModelIndex()) + + if status_field is not None: + model.setItem(idx.row(), 1, QStandardItem(status_field)) + + else: + model.setItem(idx.row(), 1, QStandardItem()) + + self.resize() + + return idx + else: + return None + + def set_section_entries( + self, + section: str, + values: Sequence[str], + clear_all: bool = False, + + ) -> None: + '''Set result-rows for depth = 1 tree section ``section``. + + ''' + model = self.model() + if clear_all: + # XXX: rewrite the model from scratch if caller requests it + model.clear() + + model.setHorizontalHeaderLabels(self.labels) + + section_idx = self.clear_section(section) + + # if we can't find a section start adding to the root + if section_idx is None: + root = model.invisibleRootItem() + section_item = QStandardItem(section) + blank = QStandardItem('') + root.appendRow([section_item, blank]) + + else: + section_item = model.itemFromIndex(section_idx) + + # values just needs to be sequence-like + for i, s in enumerate(values): + + ix = QStandardItem(str(i)) + item = QStandardItem(s) + + # Add the item to the model + section_item.appendRow([ix, item]) + + self.expandAll() + + # TODO: figure out if we can avoid this line in a better way + # such that "re-selection" doesn't happen tree-wise for each new + # sub-search: + # https://doc.qt.io/qt-5/model-view-programming.html#handling-selections-in-item-views + + # XXX: THE BELOW LINE MUST BE CALLED. + # this stuff is super finicky and if not done right will cause + # Qt crashes out our buttz. it's required in order to get the + # view to show right after typing input. + + self.select_first() + + # TODO: the way we might be able to do this more sanely is, + # 1. for the currently selected item, when start rewriting + # a section figure out the row, column, parent "abstract" + # position in the tree view and store it + # 2. take that position and re-apply the selection to the new + # model/tree by looking up the new "equivalent element" and + # selecting + + self.show_matches() + + def show_matches(self) -> None: + self.show() + self.resize() + + +class SearchBar(QtWidgets.QLineEdit): + + def __init__( + + self, + parent: QWidget, + parent_chart: QWidget, # noqa + view: Optional[CompleterView] = None, + font: DpiAwareFont = _font, + + ) -> None: + + super().__init__(parent) + + # self.setContextMenuPolicy(Qt.CustomContextMenu) + # self.customContextMenuRequested.connect(self.show_menu) + # self.setStyleSheet(f"font: 18px") + + self.view: CompleterView = view + self.dpi_font = font + self.chart_app = parent_chart + + # size it as we specify + # https://doc.qt.io/qt-5/qsizepolicy.html#Policy-enum + self.setSizePolicy( + QtWidgets.QSizePolicy.Expanding, + QtWidgets.QSizePolicy.Fixed, + ) + self.setFont(font.font) + + # witty bit of margin + self.setTextMargins(2, 2, 2, 2) + + def focus(self) -> None: + self.selectAll() + self.show() + self.setFocus() + + def show(self) -> None: + super().show() + self.view.show_matches() + + def sizeHint(self) -> QtCore.QSize: + """ + Scale edit box to size of dpi aware font. + + """ + psh = super().sizeHint() + psh.setHeight(self.dpi_font.px_size + 2) + return psh + + def unfocus(self) -> None: + self.parent().hide() + self.clearFocus() + + if self.view: + self.view.hide() + + +class SearchWidget(QtGui.QWidget): + '''Composed widget of ``SearchBar`` + ``CompleterView``. + + Includes helper methods for item management in the sub-widgets. + + ''' + def __init__( + self, + chart_space: 'ChartSpace', # type: ignore # noqa + columns: List[str] = ['src', 'symbol'], + parent=None, + + ) -> None: + super().__init__(parent or chart_space) + + # size it as we specify + self.setSizePolicy( + QtWidgets.QSizePolicy.Fixed, + QtWidgets.QSizePolicy.Expanding, + ) + + self.chart_app = chart_space + + self.vbox = QtGui.QVBoxLayout(self) + self.vbox.setContentsMargins(0, 0, 0, 0) + self.vbox.setSpacing(4) + + # split layout for the (label:| search bar entry) + self.bar_hbox = QtGui.QHBoxLayout() + self.bar_hbox.setContentsMargins(0, 0, 0, 0) + self.bar_hbox.setSpacing(4) + + # add label to left of search bar + self.label = label = QtGui.QLabel(parent=self) + label.setTextFormat(3) # markdown + label.setFont(_font.font) + label.setMargin(4) + label.setText("`search`:") + label.show() + label.setAlignment( + QtCore.Qt.AlignVCenter + | QtCore.Qt.AlignLeft + ) + + self.bar_hbox.addWidget(label) + + self.view = CompleterView( + parent=self, + labels=columns, + ) + self.bar = SearchBar( + parent=self, + parent_chart=chart_space, + view=self.view, + ) + self.bar_hbox.addWidget(self.bar) + + self.vbox.addLayout(self.bar_hbox) + + self.vbox.setAlignment(self.bar, Qt.AlignTop | Qt.AlignRight) + self.vbox.addWidget(self.bar.view) + self.vbox.setAlignment(self.view, Qt.AlignTop | Qt.AlignLeft) + + def focus(self) -> None: + + if self.view.model().rowCount(QModelIndex()) == 0: + # fill cache list if nothing existing + self.view.set_section_entries( + 'cache', + list(reversed(self.chart_app._chart_cache)), + clear_all=True, + ) + + self.bar.focus() + self.show() + + def get_current_item(self) -> Optional[Tuple[str, str]]: + '''Return the current completer tree selection as + a tuple ``(parent: str, child: str)`` if valid, else ``None``. + + ''' + model = self.view.model() + sel = self.view.selectionModel() + cidx = sel.currentIndex() + + # TODO: get rid of this hard coded column -> 1 + # and use the ``CompleterView`` schema/settings + # to figure out the desired field(s) + # https://doc.qt.io/qt-5/qstandarditemmodel.html#itemFromIndex + node = model.itemFromIndex(cidx.siblingAtColumn(1)) + + if node: + symbol = node.text() + try: + provider = node.parent().text() + except AttributeError: + # no text set + return None + + # TODO: move this to somewhere non-search machinery specific? + if provider == 'cache': + symbol, _, provider = symbol.rpartition('.') + + return provider, symbol + + else: + return None + + def chart_current_item( + self, + clear_to_cache: bool = True, + ) -> Optional[str]: + '''Attempt to load and switch the current selected + completion result to the affiliated chart app. + + Return any loaded symbol + + ''' + value = self.get_current_item() + if value is None: + return None + + provider, symbol = value + chart = self.chart_app + + log.info(f'Requesting symbol: {symbol}.{provider}') + + chart.load_symbol( + provider, + symbol, + 'info', + ) + + # fully qualified symbol name (SNS i guess is what we're + # making?) + fqsn = '.'.join([symbol, provider]).lower() + + # Re-order the symbol cache on the chart to display in + # LIFO order. this is normally only done internally by + # the chart on new symbols being loaded into memory + chart.set_chart_symbol(fqsn, chart.linkedcharts) + + if clear_to_cache: + self.bar.clear() + self.view.set_section_entries( + 'cache', + values=list(reversed(chart._chart_cache)), + + # remove all other completion results except for cache + clear_all=True, + ) + + return fqsn + + +_search_active: trio.Event = trio.Event() +_search_enabled: bool = False + + +async def pack_matches( + view: CompleterView, + has_results: dict[str, set[str]], + matches: dict[(str, str), [str]], + provider: str, + pattern: str, + search: Callable[..., Awaitable[dict]], + task_status: TaskStatus[ + trio.CancelScope] = trio.TASK_STATUS_IGNORED, + +) -> None: + + log.info(f'Searching {provider} for "{pattern}"') + + if provider != 'cache': + # insert provider entries with search status + view.set_section_entries( + section=provider, + values=[], + ) + view.clear_section(provider, status_field='-> searchin..') + + else: # for the cache just clear it's entries and don't put a status + view.clear_section(provider) + + with trio.CancelScope() as cs: + task_status.started(cs) + # ensure ^ status is updated + results = await search(pattern) + + if provider != 'cache': # XXX: don't cache the cache results xD + matches[(provider, pattern)] = results + + # print(f'results from {provider}: {results}') + has_results[pattern].add(provider) + + if results: + # display completion results + view.set_section_entries( + section=provider, + values=results, + ) + else: + view.clear_section(provider) + + +async def fill_results( + + search: SearchBar, + recv_chan: trio.abc.ReceiveChannel, + + # kb debouncing pauses (bracket defaults) + min_pause_time: float = 0.1, + max_pause_time: float = 6/16, + +) -> None: + """Task to search through providers and fill in possible + completion results. + + """ + global _search_active, _search_enabled, _searcher_cache + + bar = search.bar + view = bar.view + view.select_from_idx(QModelIndex()) + + last_text = bar.text() + repeats = 0 + + # cache of prior patterns to search results + matches = defaultdict(list) + has_results: defaultdict[str, set[str]] = defaultdict(set) + + while True: + await _search_active.wait() + period = None + + while True: + + last_text = bar.text() + wait_start = time.time() + + with trio.move_on_after(max_pause_time): + pattern = await recv_chan.receive() + + period = time.time() - wait_start + print(f'{pattern} after {period}') + + # during fast multiple key inputs, wait until a pause + # (in typing) to initiate search + if period < min_pause_time: + log.debug(f'Ignoring fast input for {pattern}') + continue + + text = bar.text() + # print(f'search: {text}') + + if not text or text.isspace(): + # print('idling') + _search_active = trio.Event() + break + + if repeats > 2 and period >= max_pause_time: + _search_active = trio.Event() + repeats = 0 + break + + if text == last_text: + repeats += 1 + + if not _search_enabled: + # print('search currently disabled') + break + + log.debug(f'Search req for {text}') + + already_has_results = has_results[text] + + # issue multi-provider fan-out search request and place + # "searching.." statuses on outstanding results providers + async with trio.open_nursery() as n: + + for provider, (search, pause) in ( + _searcher_cache.copy().items() + ): + + if provider != 'cache': + view.clear_section( + provider, status_field='-> searchin..') + + # only conduct search on this backend if it's + # registered for the corresponding pause period. + if (period >= pause) and ( + provider not in already_has_results + ): + await n.start( + pack_matches, + view, + has_results, + matches, + provider, + text, + search + ) + else: # already has results for this input text + results = matches[(provider, text)] + if results and provider != 'cache': + view.set_section_entries( + section=provider, + values=results, + ) + else: + view.clear_section(provider) + + bar.show() + + +async def handle_keyboard_input( + + search: SearchWidget, + recv_chan: trio.abc.ReceiveChannel, + +) -> None: + + global _search_active, _search_enabled + + # startup + chart = search.chart_app + bar = search.bar + view = bar.view + view.set_font_size(bar.dpi_font.px_size) + + send, recv = trio.open_memory_channel(16) + + async with trio.open_nursery() as n: + + # start a background multi-searcher task which receives + # patterns relayed from this keyboard input handler and + # async updates the completer view's results. + n.start_soon( + partial( + fill_results, + search, + recv, + ) + ) + + async for event, key, mods, txt in recv_chan: + + log.debug(f'key: {key}, mods: {mods}, txt: {txt}') + + ctl = False + if mods == Qt.ControlModifier: + ctl = True + + # # ctl + alt as combo + # ctlalt = False + # if (QtCore.Qt.AltModifier | QtCore.Qt.ControlModifier) == mods: + # ctlalt = True + + if key in (Qt.Key_Enter, Qt.Key_Return): + + search.chart_current_item(clear_to_cache=True) + _search_enabled = False + continue + + elif not ctl and not bar.text(): + # if nothing in search text show the cache + view.set_section_entries( + 'cache', + list(reversed(chart._chart_cache)), + clear_all=True, + ) + continue + + # cancel and close + if ctl and key in { + Qt.Key_C, + Qt.Key_Space, # i feel like this is the "native" one + Qt.Key_Alt, + }: + search.bar.unfocus() + + # kill the search and focus back on main chart + if chart: + chart.linkedcharts.focus() + + continue + + if ctl and key in { + Qt.Key_L, + }: + # like url (link) highlight in a web browser + bar.focus() + + # selection navigation controls + elif ctl and key in { + Qt.Key_D, + }: + view.next_section(direction='down') + _search_enabled = False + + elif ctl and key in { + Qt.Key_U, + }: + view.next_section(direction='up') + _search_enabled = False + + # selection navigation controls + elif (ctl and key in { + + Qt.Key_K, + Qt.Key_J, + + }) or key in { + + Qt.Key_Up, + Qt.Key_Down, + }: + _search_enabled = False + if key in {Qt.Key_K, Qt.Key_Up}: + item = view.select_previous() + + elif key in {Qt.Key_J, Qt.Key_Down}: + item = view.select_next() + + if item: + parent_item = item.parent() + + if parent_item and parent_item.text() == 'cache': + + # if it's a cache item, switch and show it immediately + search.chart_current_item(clear_to_cache=False) + + elif not ctl: + # relay to completer task + _search_enabled = True + send.send_nowait(search.bar.text()) + _search_active.set() + + +async def search_simple_dict( + text: str, + source: dict, +) -> Dict[str, Any]: + + # search routine can be specified as a function such + # as in the case of the current app's local symbol cache + matches = fuzzy.extractBests( + text, + source.keys(), + score_cutoff=90, + ) + + return [item[0] for item in matches] + + +# cache of provider names to async search routines +_searcher_cache: Dict[str, Callable[..., Awaitable]] = {} + + +@asynccontextmanager +async def register_symbol_search( + + provider_name: str, + search_routine: Callable, + pause_period: Optional[float] = None, + +) -> AsyncIterator[dict]: + + global _searcher_cache + + pause_period = pause_period or 0.125 + + # deliver search func to consumer + try: + _searcher_cache[provider_name] = (search_routine, pause_period) + yield search_routine + + finally: + _searcher_cache.pop(provider_name) diff --git a/piker/ui/cli.py b/piker/ui/cli.py index b40a4c31..e65bc379 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -49,7 +49,7 @@ def monitor(config, rate, name, dhost, test, tl): """Start a real-time watchlist UI """ # global opts - brokermod = config['brokermod'] + brokermod = config['brokermods'][0] loglevel = config['loglevel'] log = config['log'] @@ -138,17 +138,24 @@ def chart(config, symbol, profile, pdb): from .. import _profile from ._chart import _main + if '.' not in symbol: + click.echo(click.style( + f'symbol: {symbol} must have a {symbol}. suffix', + fg='red', + )) + return + # toggle to enable profiling _profile._pg_profile = profile # global opts - brokername = config['broker'] + brokernames = config['brokers'] tractorloglevel = config['tractorloglevel'] pikerloglevel = config['loglevel'] _main( sym=symbol, - brokername=brokername, + brokernames=brokernames, piker_loglevel=pikerloglevel, tractor_kwargs={ 'debug_mode': pdb, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 07f2b281..22293efa 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -402,7 +402,9 @@ async def start_order_mode( # each clearing tick is responded individually elif resp in ('broker_filled',): + action = msg['action'] + # TODO: some kinda progress system order_mode.on_fill( oid, diff --git a/setup.py b/setup.py index fe68640e..4b8d8b1d 100755 --- a/setup.py +++ b/setup.py @@ -71,9 +71,11 @@ setup( 'PyQt5', 'pyqtgraph', 'qdarkstyle==2.8.1', + #'kivy', see requirement.txt; using a custom branch atm # tsdbs 'pymarketstore', + #'kivy', see requirement.txt; using a custom branch atm # fuzzy search