From d27214621d70afaf8c9fb051ad2ca3583e91e302 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Oct 2021 12:59:00 -0400 Subject: [PATCH 01/14] Update some typing and add latency checks for binance --- piker/brokers/binance.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 8a3f42e9..4d82474b 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -19,7 +19,7 @@ Binance backend """ from contextlib import asynccontextmanager -from typing import List, Dict, Any, Tuple, Union, Optional +from typing import List, Dict, Any, Tuple, Union, Optional, AsyncGenerator import time import trio @@ -37,7 +37,7 @@ from .._cacheables import open_cached_client from ._util import resproc, SymbolNotFound from ..log import get_logger, get_console_log from ..data import ShmArray -from ..data._web_bs import open_autorecon_ws +from ..data._web_bs import open_autorecon_ws, NoBsWs log = get_logger(__name__) @@ -213,7 +213,7 @@ class Client: ) # repack in dict form return {item[0]['symbol']: item[0] - for item in matches} + for item in matches} async def bars( self, @@ -295,7 +295,7 @@ class AggTrade(BaseModel): M: bool # Ignore -async def stream_messages(ws): +async def stream_messages(ws: NoBsWs) -> AsyncGenerator[NoBsWs, dict]: timeouts = 0 while True: @@ -487,11 +487,20 @@ async def stream_quotes( # signal to caller feed is ready for consumption feed_is_live.set() + # import time + # last = time.time() + # start streaming async for typ, msg in msg_gen: + # period = time.time() - last + # hz = 1/period if period else float('inf') + # if hz > 60: + # log.info(f'Binance quotez : {hz}') + topic = msg['symbol'].lower() await send_chan.send({topic: msg}) + # last = time.time() @tractor.context From 49bdbf29be6a4e6b0e5f9c390e8f2e7f9ccb5eb4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Oct 2021 13:02:45 -0400 Subject: [PATCH 02/14] Add some typing around web bs --- piker/data/_web_bs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 485f69c2..d2a15e06 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -20,7 +20,7 @@ ToOlS fOr CoPInG wITh "tHE wEB" protocols. """ from contextlib import asynccontextmanager, AsyncExitStack from types import ModuleType -from typing import Any, Callable +from typing import Any, Callable, AsyncGenerator import json import trio @@ -127,7 +127,7 @@ async def open_autorecon_ws( # TODO: proper type annot smh fixture: Callable, -): +) -> AsyncGenerator[tuple[...], NoBsWs]: """Apparently we can QoS for all sorts of reasons..so catch em. """ From c52d63c7623bcbc5acf36c5a620e96802a320ad9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Oct 2021 13:03:12 -0400 Subject: [PATCH 03/14] De-densify some funcs --- piker/data/_sharedmem.py | 1 + piker/data/_source.py | 2 ++ 2 files changed, 3 insertions(+) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 5d38cbbd..9c515ce3 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -394,6 +394,7 @@ def open_shm_array( # "unlink" created shm on process teardown by # pushing teardown calls onto actor context stack + tractor._actor._lifetime_stack.callback(shmarr.close) tractor._actor._lifetime_stack.callback(shmarr.destroy) diff --git a/piker/data/_source.py b/piker/data/_source.py index 46302508..8ec92dfd 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -133,9 +133,11 @@ def mk_symbol( def from_df( + df: pd.DataFrame, source=None, default_tf=None + ) -> np.recarray: """Convert OHLC formatted ``pandas.DataFrame`` to ``numpy.recarray``. From 74fe27eb2c7e5431132f13096eb7d6a2fbb5eb95 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Oct 2021 13:03:25 -0400 Subject: [PATCH 04/14] Turn on profiling for the moment --- piker/_profile.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/_profile.py b/piker/_profile.py index fa38d065..06abc09a 100644 --- a/piker/_profile.py +++ b/piker/_profile.py @@ -21,7 +21,7 @@ Profiling wrappers for internal libs. import time from functools import wraps -_pg_profile: bool = False +_pg_profile: bool = True def pg_profile_enabled() -> bool: From b63ce088f2ae0173fac8f3df3564d7404dcb6a67 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 27 Oct 2021 12:58:41 -0400 Subject: [PATCH 05/14] Error out clearing task on first quote being nan --- piker/clearing/_ems.py | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index f802b8a3..5096597f 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -20,6 +20,7 @@ In da suit parlances: "Execution management systems" """ from contextlib import asynccontextmanager from dataclasses import dataclass, field +from math import isnan from pprint import pformat import time from typing import AsyncIterator, Callable @@ -47,9 +48,11 @@ log = get_logger(__name__) # TODO: numba all of this def mk_check( + trigger_price: float, known_last: float, action: str, + ) -> Callable[[float, float], bool]: """Create a predicate for given ``exec_price`` based on last known price, ``known_last``. @@ -77,8 +80,7 @@ def mk_check( return check_lt - else: - return None + raise ValueError('trigger: {trigger_price}, last: {known_last}') @dataclass @@ -177,7 +179,15 @@ async def clear_dark_triggers( tuple(execs.items()) ): - if not pred or (ttype not in tf) or (not pred(price)): + if ( + not pred or + ttype not in tf or + not pred(price) + ): + log.debug( + f'skipping quote for {sym} ' + f'{pred}, {ttype} not in {tf}?, {pred(price)}' + ) # majority of iterations will be non-matches continue @@ -1005,7 +1015,8 @@ async def _emsd_main( first_quote = feed.first_quotes[symbol] book = _router.get_dark_book(broker) - book.lasts[(broker, symbol)] = first_quote['last'] + last = book.lasts[(broker, symbol)] = first_quote['last'] + assert not isnan(last) # ib is a cucker but we've fixed it in the backend # open a stream with the brokerd backend for order # flow dialogue From 78e52566c69dd15b1c7dc2263da2a913dd77a073 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 4 Nov 2021 08:31:48 -0400 Subject: [PATCH 06/14] Use `round()` for magnitude check --- piker/calc.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/piker/calc.py b/piker/calc.py index d5d8d4e1..eb3ef4c6 100644 --- a/piker/calc.py +++ b/piker/calc.py @@ -43,11 +43,15 @@ def humanize( if not number or number <= 0: return round(number, ndigits=digits) - mag = math.floor(math.log(number, 10)) + mag = round(math.log(number, 10)) if mag < 3: return round(number, ndigits=digits) - maxmag = max(itertools.takewhile(lambda key: mag >= key, _mag2suffix)) + maxmag = max( + itertools.takewhile( + lambda key: mag >= key, _mag2suffix + ) + ) return "{value}{suffix}".format( value=round(number/10**maxmag, ndigits=digits), From 2877d7e4ce693a1276a1f5d3dd7a2989a30e60fa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 5 Nov 2021 15:56:43 -0400 Subject: [PATCH 07/14] Start nts --- notes_to_self.rst | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) create mode 100644 notes_to_self.rst diff --git a/notes_to_self.rst b/notes_to_self.rst new file mode 100644 index 00000000..42abd522 --- /dev/null +++ b/notes_to_self.rst @@ -0,0 +1,28 @@ +Notes to self +============= +chicken scratch we shan't forget, consider this staging +for actual feature issues on wtv git wrapper-provider we're +using (no we shan't stick with GH long term likely). + + +cool chart features +------------------- +- allow right-click to spawn shell with current in view + data passed to the new process via ``msgpack-numpy``. +- expand OHLC datum to lower time frame. +- auto-highlight current time range on tick feed + + +features from IB charting +------------------------- +- vlm diffing from ticks and compare when bar arrives from historical + - should help isolate dark vlm / trades + + +chart ux ideas +-------------- +- hotkey to zoom to order intersection (horizontal line) with previous + price levels (+ some margin obvs). +- L1 "lines" (queue size repr) should normalize to some fixed x width + such that when levels with more vlm appear other smaller levels are + scaled down giving an immediate indication of the liquidity diff. From 7e9cbd7d9e43026eda7e4866065bc60637566d70 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Dec 2021 11:51:56 -0500 Subject: [PATCH 08/14] Fix deprecated `LocalPortal` call --- piker/cli/__init__.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 22022e84..97b6ebc3 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -107,7 +107,7 @@ def services(config, tl, names): async with tractor.get_arbiter( *_tractor_kwargs['arbiter_addr'] ) as portal: - registry = await portal.run('self', 'get_registry') + registry = await portal.run_from_ns('self', 'get_registry') json_d = {} for uid, socket in registry.items(): name, uuid = uid From 54712827eeacab404f9e8a779c482ad9b9e850d5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 20 Dec 2021 14:16:13 -0500 Subject: [PATCH 09/14] Fix context attr lookup.. --- piker/data/_sampling.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index d16bf529..db3f53b2 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -135,6 +135,7 @@ async def increment_ohlc_buffer( async def iter_ohlc_periods( ctx: tractor.Context, delay_s: int, + ) -> None: """ Subscribe to OHLC sampling "step" events: when the time @@ -270,18 +271,20 @@ async def sample_and_broadcast( trio.ClosedResourceError, trio.EndOfChannel, ): + ctx = getattr(stream, '_ctx', None) + if ctx: + log.warning( + f'{ctx.chan.uid} dropped ' + '`brokerd`-quotes-feed connection' + ) + if tick_throttle: + assert stream.closed() + # XXX: do we need to deregister here # if it's done in the fee bus code? # so far seems like no since this should all - # be single-threaded. - log.warning( - f'{stream._ctx.chan.uid} dropped ' - '`brokerd`-quotes-feed connection' - ) - if tick_throttle: - assert stream.closed() - # await stream.aclose() - + # be single-threaded. Doing it anyway though + # since there seems to be some kinda race.. subs.remove((stream, tick_throttle)) From 50713030f81a429b3ae845821ff59efd50a3d41a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Jan 2022 11:46:25 -0500 Subject: [PATCH 10/14] annoying doc strings --- piker/data/feed.py | 3 ++- piker/ui/_event.py | 9 +++++---- piker/ui/_forms.py | 11 +++++++---- piker/ui/_search.py | 6 ++++-- 4 files changed, 18 insertions(+), 11 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index 1e0c55b2..b3e1efd6 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -592,7 +592,8 @@ async def maybe_open_feed( **kwargs, ) -> (Feed, ReceiveChannel[dict[str, Any]]): - '''Maybe open a data to a ``brokerd`` daemon only if there is no + ''' + Maybe open a data to a ``brokerd`` daemon only if there is no local one for the broker-symbol pair, if one is cached use it wrapped in a tractor broadcast receiver. diff --git a/piker/ui/_event.py b/piker/ui/_event.py index 9e087dd4..f9982843 100644 --- a/piker/ui/_event.py +++ b/piker/ui/_event.py @@ -26,7 +26,9 @@ import trio from PyQt5 import QtCore from PyQt5.QtCore import QEvent, pyqtBoundSignal from PyQt5.QtWidgets import QWidget -from PyQt5.QtWidgets import QGraphicsSceneMouseEvent as gs_mouse +from PyQt5.QtWidgets import ( + QGraphicsSceneMouseEvent as gs_mouse, +) MOUSE_EVENTS = { @@ -129,6 +131,8 @@ class EventRelay(QtCore.QObject): # TODO: is there a global setting for this? if ev.isAutoRepeat() and self._filter_auto_repeats: ev.ignore() + # filter out this event and stop it's processing + # https://doc.qt.io/qt-5/qobject.html#installEventFilter return True # NOTE: the event object instance coming out @@ -152,9 +156,6 @@ class EventRelay(QtCore.QObject): # **do not** filter out this event # and instead forward to the source widget - return False - - # filter out this event # https://doc.qt.io/qt-5/qobject.html#installEventFilter return False diff --git a/piker/ui/_forms.py b/piker/ui/_forms.py index 72053716..6406a199 100644 --- a/piker/ui/_forms.py +++ b/piker/ui/_forms.py @@ -174,7 +174,6 @@ class Selection(QComboBox): def __init__( self, parent=None, - ) -> None: self._items: dict[str, int] = {} @@ -200,7 +199,6 @@ class Selection(QComboBox): def set_style( self, - color: str, font_size: int, @@ -217,6 +215,7 @@ class Selection(QComboBox): def resize( self, char: str = 'W', + ) -> None: br = _font.boundingRect(str(char)) _, h = br.width(), br.height() @@ -238,9 +237,11 @@ class Selection(QComboBox): keys: list[str], ) -> None: - '''Write keys to the selection verbatim. + ''' + Write keys to the selection verbatim. All other items are cleared beforehand. + ''' self.clear() self._items.clear() @@ -536,7 +537,8 @@ async def open_form_input_handling( class FillStatusBar(QProgressBar): - '''A status bar for fills up to a position limit. + ''' + A status bar for fills up to a position limit. ''' border_px: int = 2 @@ -663,6 +665,7 @@ def mk_fill_status_bar( ) bar_labels_lhs.addSpacing(5/8 * bar_h) + bar_labels_lhs.addWidget( left_label, # XXX: doesn't seem to actually push up against diff --git a/piker/ui/_search.py b/piker/ui/_search.py index 94a2fd56..f1fc1f4e 100644 --- a/piker/ui/_search.py +++ b/piker/ui/_search.py @@ -347,7 +347,8 @@ class CompleterView(QTreeView): clear_all: bool = False, ) -> None: - '''Set result-rows for depth = 1 tree section ``section``. + ''' + Set result-rows for depth = 1 tree section ``section``. ''' model = self.model() @@ -438,7 +439,8 @@ class SearchBar(Edit): class SearchWidget(QtWidgets.QWidget): - '''Composed widget of ``SearchBar`` + ``CompleterView``. + ''' + Composed widget of ``SearchBar`` + ``CompleterView``. Includes helper methods for item management in the sub-widgets. From 16dfc75ad08e8af4c620945d63ce31861bff21d4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Jan 2022 16:05:05 -0500 Subject: [PATCH 11/14] Add guard for "last step" rect --- piker/ui/_curve.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/piker/ui/_curve.py b/piker/ui/_curve.py index 161332d5..15b19f76 100644 --- a/piker/ui/_curve.py +++ b/piker/ui/_curve.py @@ -328,8 +328,10 @@ class FastAppendCurve(pg.PlotCurveItem): profiler = pg.debug.Profiler(disabled=not pg_profile_enabled()) # p.setRenderHint(p.Antialiasing, True) - if self._step_mode: - + if ( + self._step_mode + and self._last_step_rect + ): brush = self.opts['brush'] # p.drawLines(*tuple(filter(bool, self._last_step_lines))) # p.drawRect(self._last_step_rect) From 0744eb78a6c1e286b9c6eb3c01857df4a2037a97 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 14 Jan 2022 09:11:01 -0500 Subject: [PATCH 12/14] Lul, adhere to returning `str`s in `humanize()` --- piker/calc.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/piker/calc.py b/piker/calc.py index eb3ef4c6..d629b28c 100644 --- a/piker/calc.py +++ b/piker/calc.py @@ -27,25 +27,26 @@ _mag2suffix = bidict({3: 'k', 6: 'M', 9: 'B'}) def humanize( - number: float, digits: int = 1 ) -> str: - '''Convert large numbers to something with at most ``digits`` and + ''' + Convert large numbers to something with at most ``digits`` and a letter suffix (eg. k: thousand, M: million, B: billion). ''' try: float(number) except ValueError: - return 0 + return '0' + if not number or number <= 0: - return round(number, ndigits=digits) + return str(round(number, ndigits=digits)) mag = round(math.log(number, 10)) if mag < 3: - return round(number, ndigits=digits) + return str(round(number, ndigits=digits)) maxmag = max( itertools.takewhile( From 47d0c81a2d446e03151b11e7da574fd20ed5b2de Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 21 Jan 2022 08:46:31 -0500 Subject: [PATCH 13/14] Only warn if trigger predicate was already popped --- piker/clearing/_ems.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 5096597f..e1017f64 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -256,7 +256,11 @@ async def clear_dark_triggers( # remove exec-condition from set log.info(f'removing pred for {oid}') - execs.pop(oid) + pred = execs.pop(oid, None) + if not pred: + log.warning( + f'pred for {oid} was already removed!?' + ) await ems_client_order_stream.send(msg) From 8e390278f53dd7a8b1cbd5f1cd68abcdc4f232df Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 23 Jan 2022 19:40:00 -0500 Subject: [PATCH 14/14] Handle logging against IPC stream vs. throttled channel on overruns --- piker/data/_sampling.py | 16 +++++++++++----- 1 file changed, 11 insertions(+), 5 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index db3f53b2..5e702e08 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -253,11 +253,17 @@ async def sample_and_broadcast( try: stream.send_nowait((sym, quote)) except trio.WouldBlock: - log.warning( - f'Feed overrun {bus.brokername} ->' - f'{stream._ctx.channel.uid} !!!' - ) - + ctx = getattr(sream, '_ctx', None) + if ctx: + log.warning( + f'Feed overrun {bus.brokername} ->' + f'{ctx.channel.uid} !!!' + ) + else: + log.warning( + f'Feed overrun {bus.brokername} -> ' + f'feed @ {tick_throttle} Hz' + ) else: await stream.send({sym: quote})