From 5952e7f53809bde2170b6dc22115a2a83e773a31 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 31 Jan 2022 07:33:40 -0500 Subject: [PATCH 01/47] Add dark vlm deduplication support via flag --- piker/data/_normalize.py | 46 ++++++++++++++++++++++++++++++++++------ 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/piker/data/_normalize.py b/piker/data/_normalize.py index 56d64b75..45eed1be 100644 --- a/piker/data/_normalize.py +++ b/piker/data/_normalize.py @@ -14,27 +14,61 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" +''' Stream format enforcement. -""" -from typing import AsyncIterator, Optional, Tuple - -import numpy as np +''' +from itertools import chain +from typing import AsyncIterator def iterticks( quote: dict, - types: Tuple[str] = ('trade', 'dark_trade'), + types: tuple[str] = ('trade', 'dark_trade'), + deduplicate_darks: bool = False, ) -> AsyncIterator: ''' Iterate through ticks delivered per quote cycle. ''' + if deduplicate_darks: + assert 'dark_trade' in types + # print(f"{quote}\n\n") ticks = quote.get('ticks', ()) + trades = {} + darks = {} + if ticks: + + # do a first pass and attempt to remove duplicate dark + # trades with the same tick signature. + if deduplicate_darks: + for tick in ticks: + ttype = tick.get('type') + sig = ( + tick['time'], + tick['price'], + tick['size'] + ) + + if ttype == 'dark_trade': + darks[sig] = tick + + elif ttype == 'trade': + trades[sig] = tick + + # filter duplicates + for sig, tick in trades.items(): + tick = darks.pop(sig, None) + if tick: + ticks.remove(tick) + # print(f'DUPLICATE {tick}') + + # re-insert ticks + ticks.extend(list(chain(trades.values(), darks.values()))) + for tick in ticks: # print(f"{quote['symbol']}: {tick}") ttype = tick.get('type') From 8118a57b9a6d1daf8f2ea4371e35094c5a41125e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 31 Jan 2022 23:50:56 -0500 Subject: [PATCH 02/47] Guard against no time field in some provider quotes --- piker/data/_normalize.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/piker/data/_normalize.py b/piker/data/_normalize.py index 45eed1be..677468ad 100644 --- a/piker/data/_normalize.py +++ b/piker/data/_normalize.py @@ -24,7 +24,10 @@ from typing import AsyncIterator def iterticks( quote: dict, - types: tuple[str] = ('trade', 'dark_trade'), + types: tuple[str] = ( + 'trade', + 'dark_trade', + ), deduplicate_darks: bool = False, ) -> AsyncIterator: @@ -47,17 +50,20 @@ def iterticks( if deduplicate_darks: for tick in ticks: ttype = tick.get('type') - sig = ( - tick['time'], - tick['price'], - tick['size'] - ) - if ttype == 'dark_trade': - darks[sig] = tick + time = tick.get('time', None) + if time: + sig = ( + time, + tick['price'], + tick['size'] + ) - elif ttype == 'trade': - trades[sig] = tick + if ttype == 'dark_trade': + darks[sig] = tick + + elif ttype == 'trade': + trades[sig] = tick # filter duplicates for sig, tick in trades.items(): From 1aae40cdebcc4f1b9df3a4b28a2ff395558d994c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Feb 2022 13:14:38 -0500 Subject: [PATCH 03/47] Expect multi-output fsps to yield a `dict` of history arrays --- piker/fsp/_engine.py | 54 ++++++++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 9 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index afd986e0..1b601c35 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -20,7 +20,10 @@ core task logic for processing chains ''' from dataclasses import dataclass from functools import partial -from typing import AsyncIterator, Callable, Optional +from typing import ( + AsyncIterator, Callable, Optional, + Union, +) import numpy as np import pyqtgraph as pg @@ -101,28 +104,61 @@ async def fsp_compute( # Conduct a single iteration of fsp with historical bars input # and get historical output + history_output: Union[ + dict[str, np.ndarray], # multi-output case + np.ndarray, # single output case + ] history_output = await out_stream.__anext__() func_name = func.__name__ profiler(f'{func_name} generated history') # build struct array with an 'index' field to push as history - history = np.zeros( - len(history_output), - dtype=dst.array.dtype - ) # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? # if the output array is multi-field then push # each respective field. - fields = getattr(history.dtype, 'fields', None) - if fields: + # await tractor.breakpoint() + fields = getattr(dst.array.dtype, 'fields', None).copy() + fields.pop('index') + # TODO: nptyping here! + history: Optional[np.ndarray] = None + if fields and len(fields) > 1 and fields: + if not isinstance(history_output, dict): + raise ValueError( + f'`{func_name}` is a multi-output FSP and should yield a ' + '`dict[str, np.ndarray]` for history' + ) + for key in fields.keys(): - if key in history.dtype.fields: - history[func_name] = history_output + if key in history_output: + output = history_output[key] + + if history is None: + # using the first output, determine + # the length of the struct-array that + # will be pushed to shm. + history = np.zeros( + len(output), + dtype=dst.array.dtype + ) + + if output is None: + continue + + history[key] = output # single-key output stream else: + if not isinstance(history_output, np.ndarray): + raise ValueError( + f'`{func_name}` is a single output FSP and should yield an ' + '`np.ndarray` for history' + ) + history = np.zeros( + len(history_output), + dtype=dst.array.dtype + ) history[func_name] = history_output # TODO: XXX: From dfe4473c9a10458e2e698a595dcf6dff7fcb4b91 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Feb 2022 15:18:12 -0500 Subject: [PATCH 04/47] Yield history `dict`s, add a `flow_rates` fsp --- piker/fsp/_volume.py | 57 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 53 insertions(+), 4 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 7cf7d7b4..e4928dd1 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -108,7 +108,6 @@ async def tina_vwap( @fsp( outputs=('dolla_vlm', 'dark_vlm'), - ohlc=False, curve_style='step', ) async def dolla_vlm( @@ -132,14 +131,24 @@ async def dolla_vlm( v = a['volume'] # on first iteration yield history - yield chl3 * v + yield { + 'dolla_vlm': chl3 * v, + 'dark_vlm': None, + } i = ohlcv.index output = vlm = 0 dvlm = 0 async for quote in source: - for tick in iterticks(quote): + for tick in iterticks( + quote, + types=( + 'trade', + 'dark_trade', + ), + deduplicate_darks=True, + ): # this computes tick-by-tick weightings from here forward size = tick['size'] @@ -156,13 +165,15 @@ async def dolla_vlm( # is reported in the sym info. ttype = tick.get('type') + if ttype == 'dark_trade': - print(f'dark_trade: {tick}') + # print(f'dark_trade: {tick}') key = 'dark_vlm' dvlm += price * size output = dvlm else: + # print(f'vlm: {tick}') key = 'dolla_vlm' vlm += price * size output = vlm @@ -175,3 +186,41 @@ async def dolla_vlm( # print(f' tinal vlm: {tina_lvlm}') yield key, output + + +@fsp( + outputs=( + '1m_trade_rate', + '1m_vlm_rate', + ), + curve_style='step', +) +async def flow_rates( + source: AsyncReceiver[dict], + ohlcv: ShmArray, # OHLC sampled history + +) -> AsyncIterator[ + tuple[str, Union[np.ndarray, float]], +]: + # generally no history available prior to real-time calcs + yield { + '1m_trade_rate': None, + '1m_vlm_rate': None, + } + + ltr = 0 + lvr = 0 + async for quote in source: + if quote: + + tr = quote['tradeRate'], + if tr != ltr: + print(f'trade rate: {tr}') + yield '1m_trade_rate', tr + ltr = tr + + vr = quote['volumeRate'], + if vr != lvr: + print(f'vlm rate: {tr}') + yield '1m_vlm_rate', tr + ltr = tr From b81209e78e8499648d6c1e5e3178a4a54e37acaa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Feb 2022 15:19:00 -0500 Subject: [PATCH 05/47] Ensure `sym` arg is a `str` --- piker/fsp/_api.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index f2c7cdc8..30a00633 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -143,6 +143,7 @@ def maybe_mk_fsp_shm( exists, otherwise load the shm already existing for that token. ''' + assert isinstance(sym, str), '`sym` should be file-name-friendly `str`' uid = tractor.current_actor().uid # TODO: load output types from `Fsp` From 4e96dd09e3ad204b13e4c17b5c11ebb7283ff7e6 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 2 Feb 2022 08:05:10 -0500 Subject: [PATCH 06/47] Add a `.text_color` property to our axes types --- piker/ui/_axes.py | 26 ++++++++++++++++++++++---- 1 file changed, 22 insertions(+), 4 deletions(-) diff --git a/piker/ui/_axes.py b/piker/ui/_axes.py index 56096b7d..2363cc84 100644 --- a/piker/ui/_axes.py +++ b/piker/ui/_axes.py @@ -44,10 +44,14 @@ class Axis(pg.AxisItem): self, linkedsplits, typical_max_str: str = '100 000.000', + text_color: str = 'bracket', **kwargs ) -> None: - super().__init__(**kwargs) + super().__init__( + # textPen=textPen, + **kwargs + ) # XXX: pretty sure this makes things slower # self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) @@ -74,15 +78,28 @@ class Axis(pg.AxisItem): }) self.setTickFont(_font.font) + # NOTE: this is for surrounding "border" self.setPen(_axis_pen) + # this is the text color - self.setTextPen(_axis_pen) + # self.setTextPen(pg.mkPen(hcolor(text_color))) + self.text_color = text_color + self.typical_br = _font._qfm.boundingRect(typical_max_str) # size the pertinent axis dimension to a "typical value" self.size_to_values() + @property + def text_color(self) -> str: + return self._text_color + + @text_color.setter + def text_color(self, text_color: str) -> None: + self.setTextPen(pg.mkPen(hcolor(text_color))) + self._text_color = text_color + def size_to_values(self) -> None: pass @@ -109,7 +126,8 @@ class PriceAxis(Axis): def set_title( self, title: str, - view: Optional[ChartView] = None + view: Optional[ChartView] = None, + color: Optional[str] = None, ) -> Label: ''' @@ -123,7 +141,7 @@ class PriceAxis(Axis): label = self.title = Label( view=view or self.linkedView(), fmt_str=title, - color='bracket', + color=color or self.text_color, parent=self, # update_on_range_change=False, ) From f3289c197774287e69007d7f6756204098c14200 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 2 Feb 2022 08:06:41 -0500 Subject: [PATCH 07/47] Create source length zeroed arrays from yielded `None` fsp history --- piker/fsp/_engine.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 1b601c35..f9ba0c39 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -135,11 +135,17 @@ async def fsp_compute( output = history_output[key] if history is None: + + if output is None: + length = len(src.array) + else: + length = len(output) + # using the first output, determine # the length of the struct-array that # will be pushed to shm. history = np.zeros( - len(output), + length, dtype=dst.array.dtype ) From 97c2f86092f28375a581fb8b39d5d7bf74ea7108 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 2 Feb 2022 08:07:29 -0500 Subject: [PATCH 08/47] TOSQUASH, fix separate vlm vs trade rate --- piker/fsp/_volume.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index e4928dd1..c64b2a70 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -193,7 +193,7 @@ async def dolla_vlm( '1m_trade_rate', '1m_vlm_rate', ), - curve_style='step', + curve_style='line', ) async def flow_rates( source: AsyncReceiver[dict], @@ -213,14 +213,14 @@ async def flow_rates( async for quote in source: if quote: - tr = quote['tradeRate'], + tr = quote['tradeRate'] if tr != ltr: print(f'trade rate: {tr}') yield '1m_trade_rate', tr ltr = tr - vr = quote['volumeRate'], + vr = quote['volumeRate'] if vr != lvr: - print(f'vlm rate: {tr}') - yield '1m_vlm_rate', tr - ltr = tr + print(f'vlm rate: {vr}') + yield '1m_vlm_rate', vr + lvr = vr From 0b5250d5e3626b78caef72b9c15b8a8ae376b7f2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 2 Feb 2022 08:08:04 -0500 Subject: [PATCH 09/47] Plot the vlm rate (per min) taken verbatim from ib --- piker/ui/_fsp.py | 121 +++++++++++++++++++++++++++++++++-------------- 1 file changed, 86 insertions(+), 35 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 937b7e1e..210b56ac 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -50,8 +50,13 @@ from ._forms import ( ) from ..fsp._api import maybe_mk_fsp_shm, Fsp from ..fsp import cascade -from ..fsp._volume import tina_vwap, dolla_vlm +from ..fsp._volume import ( + tina_vwap, + dolla_vlm, + flow_rates, +) from ..log import get_logger +from ._style import hcolor log = get_logger(__name__) @@ -440,7 +445,7 @@ class FspAdmin: # allocate an output shm array dst_shm, opened = maybe_mk_fsp_shm( - fqsn, + '.'.join(fqsn), target=target, readonly=True, ) @@ -648,8 +653,9 @@ async def open_vlm_displays( if dvlm: + tasks_ready = [] # spawn and overlay $ vlm on the same subchart - shm, started = await admin.start_engine_task( + dvlm_shm, started = await admin.start_engine_task( dolla_vlm, { # fsp engine conf @@ -663,11 +669,36 @@ async def open_vlm_displays( }, # loglevel, ) + tasks_ready.append(started) + + # FIXME: we should error on starting the same fsp right + # since it might collide with existing shm.. or wait we + # had this before?? + # dolla_vlm, + + # spawn and overlay $ vlm on the same subchart + fr_shm, started = await admin.start_engine_task( + flow_rates, + { # fsp engine conf + 'func_name': 'flow_rates', + # 'zero_on_step': True, + # 'params': { + # 'price_func': { + # 'default_value': 'chl3', + # }, + # }, + }, + # loglevel, + ) + tasks_ready.append(started) # profiler(f'created shm for fsp actor: {display_name}') - await started.wait() + # wait for all engine tasks to startup + async with trio.open_nursery() as n: + for event in tasks_ready: + n.start_soon(event.wait) - pi = chart.overlay_plotitem( + dvlm_pi = chart.overlay_plotitem( 'dolla_vlm', index=0, # place axis on inside (nearest to chart) axis_title=' $vlm', @@ -683,7 +714,7 @@ async def open_vlm_displays( ) # add custom auto range handler - pi.vb._maxmin = partial( + dvlm_pi.vb._maxmin = partial( maxmin, # keep both regular and dark vlm in view names=['dolla_vlm', 'dark_vlm'], @@ -691,9 +722,9 @@ async def open_vlm_displays( curve, _ = chart.draw_curve( name='dolla_vlm', - data=shm.array, + data=dvlm_shm.array, array_key='dolla_vlm', - overlay=pi, + overlay=dvlm_pi, step_mode=True, # **conf.get('chart_kwargs', {}) ) @@ -708,44 +739,64 @@ async def open_vlm_displays( # specially store ref to shm for lookup in display loop # since only a placeholder of `None` is entered in # ``.draw_curve()``. - chart._overlays['dolla_vlm'] = shm + chart._overlays['dolla_vlm'] = dvlm_shm curve, _ = chart.draw_curve( name='dark_vlm', - data=shm.array, + data=dvlm_shm.array, array_key='dark_vlm', - overlay=pi, + overlay=dvlm_pi, color='charcoal', # darker theme hue step_mode=True, # **conf.get('chart_kwargs', {}) ) - chart._overlays['dark_vlm'] = shm - # XXX: old dict-style config before it was moved into the - # helper task - # 'dolla_vlm': { - # 'func_name': 'dolla_vlm', - # 'zero_on_step': True, - # 'overlay': 'volume', - # 'separate_axes': True, - # 'params': { - # 'price_func': { - # 'default_value': 'chl3', - # # tell target ``Edit`` widget to not allow - # # edits for now. - # 'widget_kwargs': {'readonly': True}, - # }, - # }, - # 'chart_kwargs': {'step_mode': True} - # }, + chart._overlays['dark_vlm'] = dvlm_shm - # } + # add flow rate curves + rate_color = 'default_light' + fr_pi = chart.overlay_plotitem( + 'flow_rates', + index=0, # place axis on inside (nearest to chart) + axis_title=' vlm/m', + axis_side='left', + axis_kwargs={ + 'typical_max_str': ' 100.0 M ', + 'formatter': partial( + humanize, + digits=2, + ), + # 'textPen': pg.mkPen(hcolor(vlmr_color)), + 'text_color': rate_color, + }, - for name, axis_info in pi.axes.items(): - # lol this sux XD - axis = axis_info['item'] - if isinstance(axis, PriceAxis): - axis.size_to_values() + ) + # add custom auto range handler + fr_pi.vb._maxmin = partial( + maxmin, + # keep both regular and dark vlm in view + names=[ + # '1m_trade_rate', + '1m_vlm_rate', + ], + ) + + curve, _ = chart.draw_curve( + name='1m_vlm_rate', + data=fr_shm.array, + array_key='1m_vlm_rate', + overlay=fr_pi, + color=rate_color, + # **conf.get('chart_kwargs', {}) + ) + chart._overlays['1m_vlm_rate'] = fr_shm + + for pi in (dvlm_pi, fr_pi): + for name, axis_info in pi.axes.items(): + # lol this sux XD + axis = axis_info['item'] + if isinstance(axis, PriceAxis): + axis.size_to_values() # built-in vlm fsps for target, conf in { From 4b7d1fb35b007e68d13edb5f875428eee60959fe Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 2 Feb 2022 14:02:21 -0500 Subject: [PATCH 10/47] Add line style via `str` style name to our fast curve --- piker/ui/_curve.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/piker/ui/_curve.py b/piker/ui/_curve.py index 15b19f76..78fdbad1 100644 --- a/piker/ui/_curve.py +++ b/piker/ui/_curve.py @@ -24,6 +24,7 @@ import numpy as np import pyqtgraph as pg from PyQt5 import QtGui, QtWidgets from PyQt5.QtCore import ( + Qt, QLineF, QSizeF, QRectF, @@ -85,6 +86,14 @@ def step_path_arrays_from_1d( return x_out, y_out +_line_styles: dict[str, int] = { + 'solid': Qt.PenStyle.SolidLine, + 'dash': Qt.PenStyle.DashLine, + 'dot': Qt.PenStyle.DotLine, + 'dashdot': Qt.PenStyle.DashDotLine, +} + + # TODO: got a feeling that dropping this inheritance gets us even more speedups class FastAppendCurve(pg.PlotCurveItem): ''' @@ -106,6 +115,7 @@ class FastAppendCurve(pg.PlotCurveItem): step_mode: bool = False, color: str = 'default_lightest', fill_color: Optional[str] = None, + style: str = 'solid', **kwargs @@ -118,7 +128,11 @@ class FastAppendCurve(pg.PlotCurveItem): self._xrange: tuple[int, int] = self.dataBounds(ax=0) # all history of curve is drawn in single px thickness - self.setPen(hcolor(color)) + pen = pg.mkPen(hcolor(color)) + pen.setStyle(_line_styles[style]) + if 'dash' in style: + pen.setDashPattern([6, 6]) + self.setPen(pen) # last segment is drawn in 2px thickness for emphasis self.last_step_pen = pg.mkPen(hcolor(color), width=2) From d4f79a6245d93ce7670f76edc6a753ff612aacf2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 2 Feb 2022 14:03:01 -0500 Subject: [PATCH 11/47] Comment flow rates fsp prints --- piker/fsp/_volume.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index c64b2a70..5e144f58 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -215,12 +215,12 @@ async def flow_rates( tr = quote['tradeRate'] if tr != ltr: - print(f'trade rate: {tr}') + # print(f'trade rate: {tr}') yield '1m_trade_rate', tr ltr = tr vr = quote['volumeRate'] if vr != lvr: - print(f'vlm rate: {vr}') + # print(f'vlm rate: {vr}') yield '1m_vlm_rate', vr lvr = vr From 615bf3a55a7097d82eea0bb16568528bd98de812 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 2 Feb 2022 14:03:32 -0500 Subject: [PATCH 12/47] Use solid line for vlm rate and dashed for trades rate --- piker/ui/_fsp.py | 72 ++++++++++++++++++++++++++++++++++++------------ 1 file changed, 54 insertions(+), 18 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 210b56ac..1b79d8f6 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -56,7 +56,6 @@ from ..fsp._volume import ( flow_rates, ) from ..log import get_logger -from ._style import hcolor log = get_logger(__name__) @@ -681,12 +680,6 @@ async def open_vlm_displays( flow_rates, { # fsp engine conf 'func_name': 'flow_rates', - # 'zero_on_step': True, - # 'params': { - # 'price_func': { - # 'default_value': 'chl3', - # }, - # }, }, # loglevel, ) @@ -698,6 +691,9 @@ async def open_vlm_displays( for event in tasks_ready: n.start_soon(event.wait) + ################### + # dolla vlm overlay + ################### dvlm_pi = chart.overlay_plotitem( 'dolla_vlm', index=0, # place axis on inside (nearest to chart) @@ -710,7 +706,6 @@ async def open_vlm_displays( digits=2, ), }, - ) # add custom auto range handler @@ -726,7 +721,6 @@ async def open_vlm_displays( array_key='dolla_vlm', overlay=dvlm_pi, step_mode=True, - # **conf.get('chart_kwargs', {}) ) # TODO: is there a way to "sync" the dual axes such that only # one curve is needed? @@ -741,6 +735,9 @@ async def open_vlm_displays( # ``.draw_curve()``. chart._overlays['dolla_vlm'] = dvlm_shm + ################ + # dark vlm curve + ################ curve, _ = chart.draw_curve( name='dark_vlm', @@ -753,11 +750,16 @@ async def open_vlm_displays( ) chart._overlays['dark_vlm'] = dvlm_shm - # add flow rate curves - rate_color = 'default_light' + ################## + # Vlm rate overlay + ################## + trade_rate_color = vlm_rate_color = 'i3' fr_pi = chart.overlay_plotitem( - 'flow_rates', + 'vlm_rates', index=0, # place axis on inside (nearest to chart) + + # NOTE: we might want to suffix with a \w + # on lhs and prefix for the rhs axis labels? axis_title=' vlm/m', axis_side='left', axis_kwargs={ @@ -766,17 +768,14 @@ async def open_vlm_displays( humanize, digits=2, ), - # 'textPen': pg.mkPen(hcolor(vlmr_color)), - 'text_color': rate_color, + 'text_color': vlm_rate_color, }, - ) # add custom auto range handler fr_pi.vb._maxmin = partial( maxmin, # keep both regular and dark vlm in view names=[ - # '1m_trade_rate', '1m_vlm_rate', ], ) @@ -786,11 +785,48 @@ async def open_vlm_displays( data=fr_shm.array, array_key='1m_vlm_rate', overlay=fr_pi, - color=rate_color, - # **conf.get('chart_kwargs', {}) + color=vlm_rate_color, + style='solid', ) chart._overlays['1m_vlm_rate'] = fr_shm + #################### + # Trade rate overlay + #################### + fr_pi = chart.overlay_plotitem( + 'trade_rates', + index=1, # place axis on inside (nearest to chart) + axis_title='trades/m ', + axis_side='left', + axis_kwargs={ + 'typical_max_str': ' 100.0 M ', + 'formatter': partial( + humanize, + digits=2, + ), + 'text_color': trade_rate_color, + }, + + ) + # add custom auto range handler + fr_pi.vb._maxmin = partial( + maxmin, + # keep both regular and dark vlm in view + names=[ + '1m_trade_rate', + ], + ) + + curve, _ = chart.draw_curve( + name='1m_trade_rate', + data=fr_shm.array, + array_key='1m_trade_rate', + overlay=fr_pi, + color=trade_rate_color, + style='dash', + ) + chart._overlays['1m_trade_rate'] = fr_shm + for pi in (dvlm_pi, fr_pi): for name, axis_info in pi.axes.items(): # lol this sux XD From efb743fd85d8cf2a53f4e900df5222b319917986 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 10:24:40 -0500 Subject: [PATCH 13/47] Flip to using `pydantic` for shm tokens --- piker/data/_sharedmem.py | 76 +++++++++++++++++++++++++--------------- 1 file changed, 47 insertions(+), 29 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index c741ba1c..5f7fdcd0 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -18,9 +18,10 @@ NumPy compatible shared memory buffers for real-time IPC streaming. """ +from __future__ import annotations from dataclasses import dataclass, asdict from sys import byteorder -from typing import List, Tuple, Optional +from typing import Optional from multiprocessing.shared_memory import SharedMemory, _USE_POSIX from multiprocessing import resource_tracker as mantracker @@ -29,6 +30,7 @@ if _USE_POSIX: import tractor import numpy as np +from pydantic import BaseModel, validator from ..log import get_logger from ._source import base_iohlc_dtype @@ -85,26 +87,34 @@ class SharedInt: shm_unlink(self._shm.name) -@dataclass -class _Token: - """Internal represenation of a shared memory "token" +class _Token(BaseModel): + ''' + Internal represenation of a shared memory "token" which can be used to key a system wide post shm entry. - """ + + ''' + class Config: + frozen = True + shm_name: str # this servers as a "key" value shm_first_index_name: str shm_last_index_name: str - dtype_descr: List[Tuple[str]] + dtype_descr: tuple - def __post_init__(self): - # np.array requires a list for dtype - self.dtype_descr = np.dtype(list(map(tuple, self.dtype_descr))).descr + @property + def dtype(self) -> np.dtype: + return np.dtype(list(map(tuple, self.dtype_descr))).descr def as_msg(self): - return asdict(self) + return self.dict() @classmethod - def from_msg(self, msg: dict) -> '_Token': - return msg if isinstance(msg, _Token) else _Token(**msg) + def from_msg(cls, msg: dict) -> _Token: + if isinstance(msg, _Token): + return msg + + msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) + return _Token(**msg) # TODO: this api? @@ -127,15 +137,17 @@ def _make_token( key: str, dtype: Optional[np.dtype] = None, ) -> _Token: - """Create a serializable token that can be used + ''' + Create a serializable token that can be used to access a shared array. - """ + + ''' dtype = base_iohlc_dtype if dtype is None else dtype return _Token( - key, - key + "_first", - key + "_last", - np.dtype(dtype).descr + shm_name=key, + shm_first_index_name=key + "_first", + shm_last_index_name=key + "_last", + dtype_descr=np.dtype(dtype).descr ) @@ -178,10 +190,10 @@ class ShmArray: @property def _token(self) -> _Token: return _Token( - self._shm.name, - self._first._shm.name, - self._last._shm.name, - self._array.dtype.descr, + shm_name=self._shm.name, + shm_first_index_name=self._first._shm.name, + shm_last_index_name=self._last._shm.name, + dtype_descr=tuple(self._array.dtype.descr), ) @property @@ -402,16 +414,19 @@ def open_shm_array( def attach_shm_array( - token: Tuple[str, str, Tuple[str, str]], + token: tuple[str, str, tuple[str, str]], size: int = _default_size, readonly: bool = True, + ) -> ShmArray: - """Attach to an existing shared memory array previously + ''' + Attach to an existing shared memory array previously created by another process using ``open_shared_array``. No new shared mem is allocated but wrapper types for read/write access are constructed. - """ + + ''' token = _Token.from_msg(token) key = token.shm_name @@ -422,7 +437,7 @@ def attach_shm_array( shm = SharedMemory(name=key) shmarr = np.ndarray( (size,), - dtype=token.dtype_descr, + dtype=token.dtype, buffer=shm.buf ) shmarr.setflags(write=int(not readonly)) @@ -470,8 +485,10 @@ def maybe_open_shm_array( key: str, dtype: Optional[np.dtype] = None, **kwargs, -) -> Tuple[ShmArray, bool]: - """Attempt to attach to a shared memory block using a "key" lookup + +) -> tuple[ShmArray, bool]: + ''' + Attempt to attach to a shared memory block using a "key" lookup to registered blocks in the users overall "system" registry (presumes you don't have the block's explicit token). @@ -485,7 +502,8 @@ def maybe_open_shm_array( If you know the explicit ``_Token`` for your memory segment instead use ``attach_shm_array``. - """ + + ''' try: # see if we already know this key token = _known_tokens[key] From d130f0449f15864260f5eeb381f8bc7cea6d0075 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 10:40:45 -0500 Subject: [PATCH 14/47] Expect registry of fsp "flows" to each engine task In order for fsp routines to be able to look up other "flows" in the cascade, we need a small registry-table which gives access to a map of a source stream + an fsp -> an output stream. Eventually we'll also likely want a dependency (injection) mechanism so that any fsp demanded can either be dynamically allocated or at the least waited upon before a consumer tries to access it. --- piker/fsp/_engine.py | 25 ++++++++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index f9ba0c39..1b853c60 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -37,8 +37,11 @@ from .. import data from ..data import attach_shm_array from ..data.feed import Feed from ..data._sharedmem import ShmArray -from ._api import Fsp -from ._api import _load_builtins +from ._api import ( + Fsp, + _load_builtins, + _Token, +) log = get_logger(__name__) @@ -99,6 +102,8 @@ async def fsp_compute( # to the async iterable? it's that or we do some kinda # async itertools style? filter_quotes_by_sym(symbol, quote_stream), + + # XXX: currently the ``ohlcv`` arg feed.shm, ) @@ -239,6 +244,8 @@ async def cascade( ns_path: NamespacePath, + shm_registry: dict[str, _Token], + zero_on_step: bool = False, loglevel: Optional[str] = None, @@ -261,9 +268,21 @@ async def cascade( log.info( f'Registered FSP set:\n{lines}' ) - func: Fsp = reg.get( + + # update actor local flows table which registers + # readonly "instances" of this fsp for symbol/source + # so that consumer fsps can look it up by source + fsp. + # TODO: ugh i hate this wind/unwind to list over the wire + # but not sure how else to do it. + for (token, fsp_name, dst_token) in shm_registry: + Fsp._flow_registry[ + (_Token.from_msg(token), fsp_name) + ] = _Token.from_msg(dst_token) + + fsp: Fsp = reg.get( NamespacePath(ns_path) ) + func = fsp.func if not func: # TODO: assume it's a func target path From df6afe24a49448ae24ed357156d4c8de8bb2428b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 11:08:42 -0500 Subject: [PATCH 15/47] Define a flow registry on `FspAdmin`, use it to update fsp engine clusters --- piker/ui/_fsp.py | 31 ++++++++++++++++++++++--------- 1 file changed, 22 insertions(+), 9 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 1b79d8f6..9a08fac0 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -37,6 +37,7 @@ from .._cacheables import maybe_open_context from ..calc import humanize from ..data._sharedmem import ( ShmArray, + _Token, try_read, ) from ._chart import ( @@ -367,6 +368,7 @@ class FspAdmin: tuple, tuple[tractor.MsgStream, ShmArray] ] = {} + self._flow_registry: dict[_Token, str] = {} self.src_shm = src_shm def rr_next_portal(self) -> tractor.Portal: @@ -411,6 +413,11 @@ class FspAdmin: loglevel=loglevel, zero_on_step=conf.get('zero_on_step', False), + shm_registry=[ + (token.as_msg(), fsp_name, dst_token.as_msg()) + for (token, fsp_name), dst_token + in self._flow_registry.items() + ], ) as (ctx, last_index), ctx.open_stream() as stream, @@ -443,11 +450,15 @@ class FspAdmin: fqsn = self.linked.symbol.front_feed() # allocate an output shm array - dst_shm, opened = maybe_mk_fsp_shm( + key, dst_shm, opened = maybe_mk_fsp_shm( '.'.join(fqsn), target=target, readonly=True, ) + self._flow_registry[ + (self.src_shm._token, target.name) + ] = dst_shm._token + # if not opened: # raise RuntimeError( # f'Already started FSP `{fqsn}:{func_name}`' @@ -675,14 +686,6 @@ async def open_vlm_displays( # had this before?? # dolla_vlm, - # spawn and overlay $ vlm on the same subchart - fr_shm, started = await admin.start_engine_task( - flow_rates, - { # fsp engine conf - 'func_name': 'flow_rates', - }, - # loglevel, - ) tasks_ready.append(started) # profiler(f'created shm for fsp actor: {display_name}') @@ -753,7 +756,17 @@ async def open_vlm_displays( ################## # Vlm rate overlay ################## + # spawn and overlay $ vlm on the same subchart + fr_shm, started = await admin.start_engine_task( + flow_rates, + { # fsp engine conf + 'func_name': 'flow_rates', + }, + # loglevel, + ) + await started.wait() trade_rate_color = vlm_rate_color = 'i3' + fr_pi = chart.overlay_plotitem( 'vlm_rates', index=0, # place axis on inside (nearest to chart) From ebf3e00438a350f63126b7101a39c64a6ac91c1b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 11:11:34 -0500 Subject: [PATCH 16/47] Add `Fsp._flow_registry` as actor-local table Define the flows table as a class var (thus making it a "global" and/or actor-local state) which can be accessed by any in process task. Add `Fsp.get_shm()` to allow accessing output streams by source-token + fsp routine reference and thus providing inter-fsp low level access to real-time flows. --- piker/fsp/_api.py | 47 +++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 41 insertions(+), 6 deletions(-) diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index 30a00633..a332ec5f 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -40,6 +40,8 @@ from tractor.msg import NamespacePath from ..data._sharedmem import ( ShmArray, maybe_open_shm_array, + attach_shm_array, + _Token, ) from ..log import get_logger @@ -72,6 +74,13 @@ class Fsp: # - custom function wrappers, # https://wrapt.readthedocs.io/en/latest/wrappers.html#custom-function-wrappers + # actor-local map of source flow shm tokens + # + the consuming fsp *to* the consumers output + # shm flow. + _flow_registry: dict[ + tuple[_Token, str], _Token, + ] = {} + def __init__( self, func: Callable[..., Awaitable], @@ -93,7 +102,7 @@ class Fsp: self.config: dict[str, Any] = config # register with declared set. - _fsp_registry[self.ns_path] = func + _fsp_registry[self.ns_path] = self @property def name(self) -> str: @@ -111,6 +120,24 @@ class Fsp: ): return self.func(*args, **kwargs) + # TODO: lru_cache this? prettty sure it'll work? + def get_shm( + self, + src_shm: ShmArray, + + ) -> ShmArray: + ''' + Provide access to allocated shared mem array + for this "instance" of a signal processor for + the given ``key``. + + ''' + dst_token = self._flow_registry[ + (src_shm._token, self.name) + ] + shm = attach_shm_array(dst_token) + return shm + def fsp( wrapped=None, @@ -132,19 +159,27 @@ def fsp( return Fsp(wrapped, outputs=(wrapped.__name__,)) +def mk_fsp_shm_key( + sym: str, + target: Fsp + +) -> str: + uid = tractor.current_actor().uid + return f'{sym}.fsp.{target.name}.{".".join(uid)}' + + def maybe_mk_fsp_shm( sym: str, - target: fsp, + target: Fsp, readonly: bool = True, -) -> (ShmArray, bool): +) -> (str, ShmArray, bool): ''' Allocate a single row shm array for an symbol-fsp pair if none exists, otherwise load the shm already existing for that token. ''' assert isinstance(sym, str), '`sym` should be file-name-friendly `str`' - uid = tractor.current_actor().uid # TODO: load output types from `Fsp` # - should `index` be a required internal field? @@ -153,7 +188,7 @@ def maybe_mk_fsp_shm( [(field_name, float) for field_name in target.outputs] ) - key = f'{sym}.fsp.{target.name}.{".".join(uid)}' + key = mk_fsp_shm_key(sym, target) shm, opened = maybe_open_shm_array( key, @@ -161,4 +196,4 @@ def maybe_mk_fsp_shm( dtype=fsp_dtype, readonly=True, ) - return shm, opened + return key, shm, opened From 1fc6429f75a274468ae90590c1ca885265eebe31 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 11:54:44 -0500 Subject: [PATCH 17/47] Prep for manual rate calcs, handle non-ib backends XD --- piker/fsp/_volume.py | 59 +++++++++++++++++++++++++++++++++++++++----- 1 file changed, 53 insertions(+), 6 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 5e144f58..aaded701 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -30,9 +30,10 @@ def wap( weights: np.ndarray, ) -> np.ndarray: - """Weighted average price from signal and weights. + ''' + Weighted average price from signal and weights. - """ + ''' cum_weights = np.cumsum(weights) cum_weighted_input = np.cumsum(signal * weights) @@ -130,6 +131,8 @@ async def dolla_vlm( chl3 = (a['close'] + a['high'] + a['low']) / 3 v = a['volume'] + from ._momo import wma + # on first iteration yield history yield { 'dolla_vlm': chl3 * v, @@ -189,9 +192,22 @@ async def dolla_vlm( @fsp( + # TODO: eventually I guess we should support some kinda declarative + # graphics config syntax per output yah? That seems like a clean way + # to let users configure things? Not sure how exactly to offer that + # api as well as how to expose such a thing *inside* the body? outputs=( + # pulled verbatim from `ib` for now '1m_trade_rate', '1m_vlm_rate', + + # our own instantaneous rate calcs which are all + # parameterized by a samples count (bars) period + # 'trade_rate', + # 'dark_trade_rate', + + # 'dvlm_rate', + # 'dark_dvlm_rate', ), curve_style='line', ) @@ -199,9 +215,30 @@ async def flow_rates( source: AsyncReceiver[dict], ohlcv: ShmArray, # OHLC sampled history + # TODO (idea): a dynamic generic / boxing type that can be updated by other + # FSPs, user input, and possibly any general event stream in + # real-time. Hint: ideally implemented with caching until mutated + # ;) + period: 'Param[int]' = 16, # noqa + + # TODO (idea): a generic for declaring boxed fsps much like ``pytest`` + # fixtures? This probably needs a lot of thought if we want to offer + # a higher level composition syntax eventually (oh right gotta make + # an issue for that). + # ideas for how to allow composition / intercalling: + # - offer a `Fsp.get_history()` to do the first yield output? + # * err wait can we just have shm access directly? + # - how would it work if some consumer fsp wanted to dynamically + # change params which are input to the callee fsp? i guess we could + # lazy copy in that case? + # dvlm: 'Fsp[dolla_vlm]' + ) -> AsyncIterator[ tuple[str, Union[np.ndarray, float]], ]: + + # dvlm_shm = dolla_vlm.get_shm(ohlcv) + # generally no history available prior to real-time calcs yield { '1m_trade_rate': None, @@ -210,17 +247,27 @@ async def flow_rates( ltr = 0 lvr = 0 + + # TODO: 3.10 do ``anext()`` + quote = await source.__anext__() + tr = quote.get('tradeRate') + yield '1m_trade_rate', tr or 0 + vr = quote.get('volumeRate') + yield '1m_vlm_rate', vr or 0 + async for quote in source: if quote: - tr = quote['tradeRate'] - if tr != ltr: + # XXX: ib specific schema we should + # probably pre-pack ourselves. + tr = quote.get('tradeRate') + if tr is not None and tr != ltr: # print(f'trade rate: {tr}') yield '1m_trade_rate', tr ltr = tr - vr = quote['volumeRate'] - if vr != lvr: + vr = quote.get('volumeRate') + if vr is not None and vr != lvr: # print(f'vlm rate: {vr}') yield '1m_vlm_rate', vr lvr = vr From 26b007147158ca3441825946dd83884673052613 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 11:55:11 -0500 Subject: [PATCH 18/47] Subscribe for rate calcs from IB in default tick set --- piker/brokers/ib.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index aedeffc3..3572683c 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1042,6 +1042,7 @@ tick_types = { # https://interactivebrokers.github.io/tws-api/tick_types.html#rt_volume 48: 'dark_trade', + # standard L1 ticks 0: 'bsize', 1: 'bid', 2: 'ask', @@ -1049,6 +1050,12 @@ tick_types = { 4: 'last', 5: 'size', 8: 'volume', + + # ``ib_insync`` already packs these into + # quotes under the following fields. + # 55: 'trades_per_min', # `'tradeRate'` + # 56: 'vlm_per_min', # `'volumeRate'` + # 89: 'shortable', # `'shortableShares'` } @@ -1263,7 +1270,13 @@ async def _setup_quote_stream( to_trio: trio.abc.SendChannel, symbol: str, - opts: tuple[int] = ('375', '233', '236'), + opts: tuple[int] = ( + '375', # RT trade volume (excludes utrades) + '233', # RT trade volume (includes utrades) + '236', # Shortable shares + '294', # Trade rate / minute + '295', # Vlm rate / minute + ), contract: Optional[Contract] = None, ) -> trio.abc.ReceiveChannel: From 4570b06c26d685a201de1235a6ac80403266cc92 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 11:55:53 -0500 Subject: [PATCH 19/47] Handle no y-range maxmin output (seems like bug?) --- piker/ui/_interaction.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index 4168a3ff..2b8448ca 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -731,7 +731,11 @@ class ChartView(ViewBox): ) if set_range: - ylow, yhigh = self._maxmin() + yrange = self._maxmin() + if yrange is None: + return + + ylow, yhigh = yrange # view margins: stay within a % of the "true range" diff = yhigh - ylow From 2d3c685e19a02b37803fbb1edede942a45edec05 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 12:10:15 -0500 Subject: [PATCH 20/47] Typecast np dtype description to a tuple --- piker/data/feed.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index b3e1efd6..55f8b9b9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -567,7 +567,7 @@ async def open_feed( shm_token = data['shm_token'] # XXX: msgspec won't relay through the tuples XD - shm_token['dtype_descr'] = list( + shm_token['dtype_descr'] = tuple( map(tuple, shm_token['dtype_descr'])) assert shm_token == shm.token # sanity From b358b8e874a2645bf6cd14ee762b98f38c032629 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 12:10:44 -0500 Subject: [PATCH 21/47] Move `wma` fsp earlier in module --- piker/fsp/_momo.py | 52 +++++++++++++++++++++++----------------------- 1 file changed, 26 insertions(+), 26 deletions(-) diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 29e94f98..01e41c04 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -170,6 +170,32 @@ def _wma( return np.convolve(signal, weights, 'valid') +@fsp +async def wma( + + source, #: AsyncStream[np.ndarray], + length: int, + ohlcv: np.ndarray, # price time-frame "aware" + +) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? + ''' + Streaming weighted moving average. + + ``weights`` is a sequence of already scaled values. As an example + for the WMA often found in "techincal analysis": + ``weights = np.arange(1, N) * N*(N-1)/2``. + + ''' + # deliver historical output as "first yield" + yield _wma(ohlcv.array['close'], length) + + # begin real-time section + + async for quote in source: + for tick in iterticks(quote, type='trade'): + yield _wma(ohlcv.last(length)) + + @fsp async def rsi( @@ -224,29 +250,3 @@ async def rsi( down_ema_last=last_down_ema_close, ) yield rsi_out[-1:] - - -@fsp -async def wma( - - source, #: AsyncStream[np.ndarray], - length: int, - ohlcv: np.ndarray, # price time-frame "aware" - -) -> AsyncIterator[np.ndarray]: # maybe something like like FspStream? - ''' - Streaming weighted moving average. - - ``weights`` is a sequence of already scaled values. As an example - for the WMA often found in "techincal analysis": - ``weights = np.arange(1, N) * N*(N-1)/2``. - - ''' - # deliver historical output as "first yield" - yield _wma(ohlcv.array['close'], length) - - # begin real-time section - - async for quote in source: - for tick in iterticks(quote, type='trade'): - yield _wma(ohlcv.last(length)) From 5274eb538ca7f1668424df449bb1b0aa1f53c2ae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 14:44:54 -0500 Subject: [PATCH 22/47] Add 16 period dollar vlm rates, drop ib rates for now --- piker/fsp/_volume.py | 84 ++++++++++++++++++++++++++++++++------------ 1 file changed, 62 insertions(+), 22 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index aaded701..1170fcc9 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -22,8 +22,15 @@ from tractor.trionics._broadcast import AsyncReceiver from ._api import fsp from ..data._normalize import iterticks from ..data._sharedmem import ShmArray +from ._momo import _wma +from ..log import get_logger + +log = get_logger(__name__) +# NOTE: is the same as our `wma` fsp, and if so which one is faster? +# Ohhh, this is an IIR style i think? So it has an anchor point +# effectively instead of a moving window/FIR style? def wap( signal: np.ndarray, @@ -131,8 +138,6 @@ async def dolla_vlm( chl3 = (a['close'] + a['high'] + a['low']) / 3 v = a['volume'] - from ._momo import wma - # on first iteration yield history yield { 'dolla_vlm': chl3 * v, @@ -203,11 +208,11 @@ async def dolla_vlm( # our own instantaneous rate calcs which are all # parameterized by a samples count (bars) period - # 'trade_rate', - # 'dark_trade_rate', + 'trade_rate', + 'dark_trade_rate', - # 'dvlm_rate', - # 'dark_dvlm_rate', + 'dvlm_rate', + 'dark_dvlm_rate', ), curve_style='line', ) @@ -236,13 +241,17 @@ async def flow_rates( ) -> AsyncIterator[ tuple[str, Union[np.ndarray, float]], ]: - - # dvlm_shm = dolla_vlm.get_shm(ohlcv) - # generally no history available prior to real-time calcs yield { + # from ib '1m_trade_rate': None, '1m_vlm_rate': None, + + 'trade_rate': None, + 'dark_trade_rate': None, + + 'dvlm_rate': None, + 'dark_dvlm_rate': None, } ltr = 0 @@ -255,19 +264,50 @@ async def flow_rates( vr = quote.get('volumeRate') yield '1m_vlm_rate', vr or 0 + # NOTE: in theory we could dynamically allocate a cascade based on + # this call but not sure if that's too "dynamic" in terms of + # validating cascade flows from message typing perspective. + + # attach to ``dolla_vlm`` fsp running + # on this same source flow. + dvlm_shm = dolla_vlm.get_shm(ohlcv) + + # precompute arithmetic mean weights (all ones) + seq = np.full((period,), 1) + weights = seq / seq.sum() + async for quote in source: - if quote: + if not quote: + log.error("OH WTF NO QUOTE IN FSP") + continue - # XXX: ib specific schema we should - # probably pre-pack ourselves. - tr = quote.get('tradeRate') - if tr is not None and tr != ltr: - # print(f'trade rate: {tr}') - yield '1m_trade_rate', tr - ltr = tr + dvlm_wma = _wma( + dvlm_shm.array['dolla_vlm'], + period, + weights=weights, + ) + yield 'dvlm_rate', dvlm_wma[-1] - vr = quote.get('volumeRate') - if vr is not None and vr != lvr: - # print(f'vlm rate: {vr}') - yield '1m_vlm_rate', vr - lvr = vr + # TODO: skip this if no dark vlm is declared + # by symbol info (eg. in crypto$) + dark_dvlm_wma = _wma( + dvlm_shm.array['dark_vlm'], + period, + weights=weights, + ) + yield 'dark_dvlm_rate', dark_dvlm_wma[-1] + + # XXX: ib specific schema we should + # probably pre-pack ourselves. + # tr = quote.get('tradeRate') + # if tr is not None and tr != ltr: + # # print(f'trade rate: {tr}') + # yield '1m_trade_rate', tr + # ltr = tr + + # # TODO: we *could* do an ohlc3 + # vr = quote.get('volumeRate') + # if vr is not None and vr != lvr: + # # print(f'vlm rate: {vr}') + # yield '1m_vlm_rate', vr + # lvr = vr From e4244e96a982f5bad460f2b5c6e025fe78e108ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 14:52:45 -0500 Subject: [PATCH 23/47] Fix var name typo --- piker/data/_sampling.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index b29b0f7d..669f624e 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -252,7 +252,7 @@ async def sample_and_broadcast( try: stream.send_nowait((sym, quote)) except trio.WouldBlock: - ctx = getattr(sream, '_ctx', None) + ctx = getattr(stream, '_ctx', None) if ctx: log.warning( f'Feed overrun {bus.brokername} ->' @@ -371,7 +371,7 @@ async def uniform_rate_send( # we have a quote already so send it now. - measured_rate = 1 / (time.time() - last_send) + # measured_rate = 1 / (time.time() - last_send) # log.info( # f'`{sym}` throttled send hz: {round(measured_rate, ndigits=1)}' # ) From 73faafcfc13d98a6f1085ac39c03e4a20d5d4348 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 16:46:48 -0500 Subject: [PATCH 24/47] Add trade "rates" (i.e. trade counts) support B) Though it's not per-tick accurate, accumulate the number of "trades" (i.e. the "clearing rate" - maybe this is a better name?) per bar inside the `dolla_vlm` fsp and average and report wmas of this in the `flow_rates` fsp. --- piker/fsp/_volume.py | 38 +++++++++++++++++++++++++++++--------- 1 file changed, 29 insertions(+), 9 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 1170fcc9..6ad04403 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -115,7 +115,12 @@ async def tina_vwap( @fsp( - outputs=('dolla_vlm', 'dark_vlm'), + outputs=( + 'dolla_vlm', + 'dark_vlm', + 'trade_count', + 'dark_trade_count', + ), curve_style='step', ) async def dolla_vlm( @@ -145,8 +150,8 @@ async def dolla_vlm( } i = ohlcv.index - output = vlm = 0 - dvlm = 0 + output = dvlm = vlm = 0 + dark_trade_count = trade_count = 0 async for quote in source: for tick in iterticks( @@ -165,8 +170,8 @@ async def dolla_vlm( li = ohlcv.index if li > i: i = li - vlm = 0 - dvlm = 0 + trade_count = dark_trade_count = dvlm = vlm = 0 + # TODO: for marginned instruments (futes, etfs?) we need to # show the margin $vlm by multiplying by whatever multiplier @@ -178,13 +183,17 @@ async def dolla_vlm( # print(f'dark_trade: {tick}') key = 'dark_vlm' dvlm += price * size - output = dvlm + yield 'dark_vlm', dvlm + dark_trade_count += 1 + yield 'dark_trade_count', dark_trade_count else: # print(f'vlm: {tick}') key = 'dolla_vlm' vlm += price * size - output = vlm + yield 'dolla_vlm', vlm + trade_count += 1 + yield 'trade_count', vlm # TODO: plot both to compare? # c, h, l, v = ohlcv.last()[ @@ -193,8 +202,6 @@ async def dolla_vlm( # tina_lvlm = c+h+l/3 * v # print(f' tinal vlm: {tina_lvlm}') - yield key, output - @fsp( # TODO: eventually I guess we should support some kinda declarative @@ -287,6 +294,12 @@ async def flow_rates( weights=weights, ) yield 'dvlm_rate', dvlm_wma[-1] + trade_rate_wma = _wma( + dvlm_shm.array['trade_count'], + period, + weights=weights, + ) + yield 'trade_rate', trade_rate_wma[-1] # TODO: skip this if no dark vlm is declared # by symbol info (eg. in crypto$) @@ -297,6 +310,13 @@ async def flow_rates( ) yield 'dark_dvlm_rate', dark_dvlm_wma[-1] + dark_trade_rate_wma = _wma( + dvlm_shm.array['dark_trade_count'], + period, + weights=weights, + ) + yield 'dark_trade_rate', dark_trade_rate_wma[-1] + # XXX: ib specific schema we should # probably pre-pack ourselves. # tr = quote.get('tradeRate') From 87653ddca2a51335eb4e1bfd8798f9b1a74b888b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 16:49:06 -0500 Subject: [PATCH 25/47] Simplify to only needed one LHS axis for clearing rates --- piker/ui/_fsp.py | 154 ++++++++++++++++++++++++++++++----------------- 1 file changed, 99 insertions(+), 55 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 9a08fac0..2dbb41b3 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -715,7 +715,13 @@ async def open_vlm_displays( dvlm_pi.vb._maxmin = partial( maxmin, # keep both regular and dark vlm in view - names=['dolla_vlm', 'dark_vlm'], + names=[ + 'dolla_vlm', + 'dark_vlm', + + 'dvlm_rate', + 'dark_dvlm_rate', + ], ) curve, _ = chart.draw_curve( @@ -741,22 +747,21 @@ async def open_vlm_displays( ################ # dark vlm curve ################ + # darker theme hue (obvsly) + dark_vlm_color = 'charcoal' curve, _ = chart.draw_curve( - name='dark_vlm', data=dvlm_shm.array, array_key='dark_vlm', overlay=dvlm_pi, - color='charcoal', # darker theme hue + color=dark_vlm_color, step_mode=True, # **conf.get('chart_kwargs', {}) ) chart._overlays['dark_vlm'] = dvlm_shm - ################## - # Vlm rate overlay - ################## - # spawn and overlay $ vlm on the same subchart + # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is + # up since this one depends on it. fr_shm, started = await admin.start_engine_task( flow_rates, { # fsp engine conf @@ -765,82 +770,121 @@ async def open_vlm_displays( # loglevel, ) await started.wait() - trade_rate_color = vlm_rate_color = 'i3' - fr_pi = chart.overlay_plotitem( - 'vlm_rates', - index=0, # place axis on inside (nearest to chart) + # curve, _ = chart.draw_curve( + # name='1m_vlm_rate', + # data=fr_shm.array, + # array_key='1m_vlm_rate', + # overlay=fr_pi, + # color='jet', + # style='solid', + # ) + # chart._overlays['1m_vlm_rate'] = fr_shm - # NOTE: we might want to suffix with a \w - # on lhs and prefix for the rhs axis labels? - axis_title=' vlm/m', - axis_side='left', - axis_kwargs={ - 'typical_max_str': ' 100.0 M ', - 'formatter': partial( - humanize, - digits=2, - ), - 'text_color': vlm_rate_color, - }, - ) - # add custom auto range handler - fr_pi.vb._maxmin = partial( - maxmin, - # keep both regular and dark vlm in view - names=[ - '1m_vlm_rate', - ], - ) + # use slightly less light (then bracket) gray + # for volume from "main exchange". + vlm_color = 'i3' curve, _ = chart.draw_curve( - name='1m_vlm_rate', + name='dvlm_rate', data=fr_shm.array, - array_key='1m_vlm_rate', - overlay=fr_pi, - color=vlm_rate_color, + array_key='dvlm_rate', + overlay=dvlm_pi, + color=vlm_color, style='solid', ) - chart._overlays['1m_vlm_rate'] = fr_shm + chart._overlays['dvlm_rate'] = fr_shm + + curve, _ = chart.draw_curve( + name='dark_dvlm_rate', + data=fr_shm.array, + array_key='dark_dvlm_rate', + overlay=dvlm_pi, + color=dark_vlm_color, + style='solid', + ) + chart._overlays['dark_dvlm_rate'] = fr_shm + + # vlm rate overlay + #################### + # (needs separate axis since trade counts are likely + # different scale then vlm) + + # vlmrate_pi = chart.overlay_plotitem( + # 'vlm_rates', + # index=0, # place axis on inside (nearest to chart) + + # # NOTE: we might want to suffix with a \w + # # on lhs and prefix for the rhs axis labels? + # axis_title=' vlm/m', + # axis_side='left', + # axis_kwargs={ + # 'typical_max_str': ' 100.0 M ', + # 'formatter': partial( + # humanize, + # digits=2, + # ), + # 'text_color': vlm_color, + # }, + # ) + # # add custom auto range handler + # vlmrate.vb._maxmin = partial( + # maxmin, + # # keep both regular and dark vlm in view + # names=[ + # # '1m_vlm_rate', + # ], + # ) #################### # Trade rate overlay #################### - fr_pi = chart.overlay_plotitem( + tr_pi = chart.overlay_plotitem( 'trade_rates', index=1, # place axis on inside (nearest to chart) - axis_title='trades/m ', + axis_title='tr(16) ', axis_side='left', axis_kwargs={ - 'typical_max_str': ' 100.0 M ', + 'typical_max_str': ' 10.0 M ', 'formatter': partial( humanize, digits=2, ), - 'text_color': trade_rate_color, + 'text_color': vlm_color, }, ) + fields = [ + 'trade_rate', + 'dark_trade_rate', + # '1m_trade_rate', + ] # add custom auto range handler - fr_pi.vb._maxmin = partial( + tr_pi.vb._maxmin = partial( maxmin, # keep both regular and dark vlm in view - names=[ - '1m_trade_rate', - ], + names=fields, ) - curve, _ = chart.draw_curve( - name='1m_trade_rate', - data=fr_shm.array, - array_key='1m_trade_rate', - overlay=fr_pi, - color=trade_rate_color, - style='dash', - ) - chart._overlays['1m_trade_rate'] = fr_shm + for field in fields: + if 'dark' in field: + color = dark_vlm_color + else: + color = vlm_color - for pi in (dvlm_pi, fr_pi): + curve, _ = chart.draw_curve( + name=field, + data=fr_shm.array, + array_key=field, + overlay=tr_pi, + color=color, + # dashed line to represent "individual trades" being + # more "granular" B) + style='dash', + ) + chart._overlays[field] = fr_shm + + for pi in (dvlm_pi, tr_pi): for name, axis_info in pi.axes.items(): # lol this sux XD axis = axis_info['item'] From 8d432e19884f1a25ca7958c3f6b03bcbf46c0be5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 17:00:12 -0500 Subject: [PATCH 26/47] Shorter clear rate axis title --- piker/ui/_fsp.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 2dbb41b3..0ae4b9ea 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -842,7 +842,11 @@ async def open_vlm_displays( tr_pi = chart.overlay_plotitem( 'trade_rates', index=1, # place axis on inside (nearest to chart) - axis_title='tr(16) ', + + # TODO: dynamically update period (and thus this axis?) + # title from user input. + axis_title='clears/P', + axis_side='left', axis_kwargs={ 'typical_max_str': ' 10.0 M ', From e3c46a5d4d5a0a971ec51095b01b4c84b9709f59 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 17:02:02 -0500 Subject: [PATCH 27/47] Add draft, commented tickbytick for ib --- piker/brokers/ib.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 3572683c..4139d60d 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1076,6 +1076,10 @@ def normalize( new_ticks.append(td) + tbt = ticker.tickByTicks + if tbt: + print(f'tickbyticks:\n {ticker.tickByTicks}') + ticker.ticks = new_ticks # some contracts don't have volume so we may want to calculate @@ -1088,6 +1092,11 @@ def normalize( # serialize for transport data = asdict(ticker) + # convert named tuples to dicts for transport + tbts = data.get('tickByTicks') + if tbts: + data['tickByTicks'] = [tbt._asdict() for tbt in tbts] + # add time stamps for downstream latency measurements data['brokerd_ts'] = time.time() @@ -1274,6 +1283,11 @@ async def _setup_quote_stream( '375', # RT trade volume (excludes utrades) '233', # RT trade volume (includes utrades) '236', # Shortable shares + + # these all appear to only be updated every 25s thus + # making them mostly useless and explains why the scanner + # is always slow XD + # '293', # Trade count for day '294', # Trade rate / minute '295', # Vlm rate / minute ), @@ -1294,6 +1308,12 @@ async def _setup_quote_stream( contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) + # NOTE: it's batch-wise and slow af but I guess could + # be good for backchecking? Seems to be every 5s maybe? + # ticker: Ticker = client.ib.reqTickByTickData( + # contract, 'Last', + # ) + # # define a simple queue push routine that streams quote packets # # to trio over the ``to_trio`` memory channel. # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore From e8d7709358c48b6e6ac653c4d90ea748eb55e664 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Feb 2022 17:02:57 -0500 Subject: [PATCH 28/47] Drop notification display time to piker seconds worth --- piker/ui/order_mode.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 6795f384..755e72f3 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -433,9 +433,12 @@ class OrderMode: [ 'notify-send', '-u', 'normal', - '-t', '10000', + '-t', '1616', 'piker', - f'alert: {msg}', + + # TODO: add in standard fill/exec info that maybe we + # pack in a broker independent way? + f'{msg["resp"]}: {msg["trigger_price"]}', ], ) log.runtime(result) @@ -666,7 +669,7 @@ async def open_order_mode( ) # vbox.setAlignment(feed_label, Qt.AlignBottom) # vbox.setAlignment(Qt.AlignBottom) - blank_h = chart.height() - ( + _ = chart.height() - ( form.height() + form.fill_bar.height() # feed_label.height() From 0271841412163124316609297e8d78d035e6d4fa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 6 Feb 2022 12:15:05 -0500 Subject: [PATCH 29/47] Add `PlotItemOverlay.get_axes()` Enables retrieving all "named axes" on a particular "side" of the overlayed plot items. This is useful for calculating how much space needs to be allocated for the axes before the view box area starts. --- piker/ui/_overlay.py | 49 +++++++++++++++++++++++++++++--------------- 1 file changed, 33 insertions(+), 16 deletions(-) diff --git a/piker/ui/_overlay.py b/piker/ui/_overlay.py index 256909bd..e683afd5 100644 --- a/piker/ui/_overlay.py +++ b/piker/ui/_overlay.py @@ -103,11 +103,6 @@ class ComposedGridLayout: dict[str, AxisItem], ] = {} - self._axes2pi: dict[ - AxisItem, - dict[str, PlotItem], - ] = {} - # TODO: better name? # construct surrounding layouts for placing outer axes and # their legends and title labels. @@ -158,8 +153,8 @@ class ComposedGridLayout: for name, axis_info in plotitem.axes.items(): axis = axis_info['item'] # register this plot's (maybe re-placed) axes for lookup. - self._pi2axes.setdefault(index, {})[name] = axis - self._axes2pi.setdefault(index, {})[name] = plotitem + # print(f'inserting {name}:{axis} to index {index}') + self._pi2axes.setdefault(name, {})[index] = axis # enter plot into list for index tracking self.items.insert(index, plotitem) @@ -213,11 +208,12 @@ class ComposedGridLayout: # invert insert index for layouts which are # not-left-to-right, top-to-bottom insert oriented + insert_index = index if name in ('top', 'left'): - index = min(len(axes) - index, 0) - assert index >= 0 + insert_index = min(len(axes) - index, 0) + assert insert_index >= 0 - linlayout.insertItem(index, axis) + linlayout.insertItem(insert_index, axis) axes.insert(index, axis) self._register_item(index, plotitem) @@ -243,13 +239,15 @@ class ComposedGridLayout: plot: PlotItem, name: str, - ) -> AxisItem: + ) -> Optional[AxisItem]: ''' - Retrieve the named axis for overlayed ``plot``. + Retrieve the named axis for overlayed ``plot`` or ``None`` + if axis for that name is not shown. ''' index = self.items.index(plot) - return self._pi2axes[index][name] + named = self._pi2axes[name] + return named.get(index) def pop( self, @@ -341,7 +339,7 @@ def mk_relay_method( # halt/short circuit the graphicscene loop). Further the # surrounding handler for this signal must be allowed to execute # and get processed by **this consumer**. - print(f'{vb.name} rx relayed from {relayed_from.name}') + # print(f'{vb.name} rx relayed from {relayed_from.name}') ev.ignore() return slot( @@ -351,7 +349,7 @@ def mk_relay_method( ) if axis is not None: - print(f'{vb.name} handling axis event:\n{str(ev)}') + # print(f'{vb.name} handling axis event:\n{str(ev)}') ev.accept() return slot( vb, @@ -490,7 +488,6 @@ class PlotItemOverlay: vb.setZValue(1000) # XXX: critical for scene layering/relaying self.overlays: list[PlotItem] = [] - from piker.ui._overlay import ComposedGridLayout self.layout = ComposedGridLayout( root_plotitem, root_plotitem.layout, @@ -613,6 +610,26 @@ class PlotItemOverlay: ''' return self.layout.get_axis(plot, name) + def get_axes( + self, + name: str, + + ) -> list[AxisItem]: + ''' + Retrieve all axes for all plots with ``name: str``. + + If a particular overlay doesn't have a displayed named axis + then it is not delivered in the returned ``list``. + + ''' + axes = [] + for plot in self.overlays: + axis = self.layout.get_axis(plot, name) + if axis: + axes.append(axis) + + return axes + # TODO: i guess we need this if you want to detach existing plots # dynamically? XXX: untested as of now. def _disconnect_all( From 2f2aef28dd3977150cb03560d329156ec905d846 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 6 Feb 2022 12:19:05 -0500 Subject: [PATCH 30/47] Adjust x-axis label from summed left axes widths --- piker/ui/_cursor.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/piker/ui/_cursor.py b/piker/ui/_cursor.py index f6f8edde..e006858e 100644 --- a/piker/ui/_cursor.py +++ b/piker/ui/_cursor.py @@ -253,7 +253,7 @@ class ContentsLabels: and index < array[-1]['index'] ): # out of range - print('out of range?') + print('WTF out of range?') continue # array = chart._arrays[name] @@ -550,17 +550,20 @@ class Cursor(pg.GraphicsObject): for cursor in opts.get('cursors', ()): cursor.setIndex(ix) - # update the label on the bottom of the crosshair - axes = plot.plotItem.axes - + # Update the label on the bottom of the crosshair. # TODO: make this an up-front calc that we update - # on axis-widget resize events. + # on axis-widget resize events instead of on every mouse + # update cylce. + # left axis offset width for calcuating # absolute x-axis label placement. left_axis_width = 0 - left = axes.get('left') - if left: - left_axis_width = left['item'].width() + if len(plot.pi_overlay.overlays): + # breakpoint() + lefts = plot.pi_overlay.get_axes('left') + if lefts: + for left in lefts: + left_axis_width += left.width() # map back to abs (label-local) coordinates self.xaxis_label.update_label( From 9490129a7454ff08f4e5e43505908f891ff2876e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 6 Feb 2022 12:30:50 -0500 Subject: [PATCH 31/47] Add overlays to end of layout grid (aka append) by default --- piker/ui/_fsp.py | 1 - piker/ui/_overlay.py | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 0ae4b9ea..097b639e 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -841,7 +841,6 @@ async def open_vlm_displays( #################### tr_pi = chart.overlay_plotitem( 'trade_rates', - index=1, # place axis on inside (nearest to chart) # TODO: dynamically update period (and thus this axis?) # title from user input. diff --git a/piker/ui/_overlay.py b/piker/ui/_overlay.py index e683afd5..65ec2364 100644 --- a/piker/ui/_overlay.py +++ b/piker/ui/_overlay.py @@ -508,7 +508,7 @@ class PlotItemOverlay: ) -> None: - index = index or 0 + index = index or len(self.overlays) root = self.root_plotitem # layout: QGraphicsGridLayout = root.layout self.overlays.insert(index, plotitem) From a006b87546b819502ebb43c6d616715dfa7fff53 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 6 Feb 2022 13:23:54 -0500 Subject: [PATCH 32/47] Exit `.maxmin()` early on non-yet-registered array lookup --- piker/ui/_chart.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index f4a3c19e..74ef2891 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -1164,7 +1164,10 @@ class ChartPlotWidget(pg.PlotWidget): # f"begin: {begin}, end: {end}, extra: {extra}" # ) - a = self._arrays[name or self.name] + a = self._arrays.get(name or self.name) + if a is None: + return None + ifirst = a[0]['index'] bars = a[lbar - ifirst:rbar - ifirst + 1] From e7516447dfb2f9c254124314a89d099197aac2cf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 6 Feb 2022 14:46:16 -0500 Subject: [PATCH 33/47] Better rate axis title? --- piker/ui/_fsp.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 097b639e..d5459d1f 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -844,7 +844,7 @@ async def open_vlm_displays( # TODO: dynamically update period (and thus this axis?) # title from user input. - axis_title='clears/P', + axis_title='clrs/Ts', axis_side='left', axis_kwargs={ From ee4ad32d3b1733e3d73ccea529d8bce6fdd3e6ca Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Feb 2022 08:17:20 -0500 Subject: [PATCH 34/47] Fix `dvlm` to actually yield trade count, add instantaneous support --- piker/fsp/_volume.py | 48 +++++++++++++++++++++++++++++--------------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 6ad04403..eaeca7cf 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -180,20 +180,21 @@ async def dolla_vlm( ttype = tick.get('type') if ttype == 'dark_trade': - # print(f'dark_trade: {tick}') - key = 'dark_vlm' dvlm += price * size yield 'dark_vlm', dvlm + dark_trade_count += 1 yield 'dark_trade_count', dark_trade_count + # print(f'{dark_trade_count}th dark_trade: {tick}') + else: # print(f'vlm: {tick}') - key = 'dolla_vlm' vlm += price * size yield 'dolla_vlm', vlm + trade_count += 1 - yield 'trade_count', vlm + yield 'trade_count', trade_count # TODO: plot both to compare? # c, h, l, v = ohlcv.last()[ @@ -279,6 +280,10 @@ async def flow_rates( # on this same source flow. dvlm_shm = dolla_vlm.get_shm(ohlcv) + # breakpoint() + # import tractor + # await tractor.breakpoint() + # precompute arithmetic mean weights (all ones) seq = np.full((period,), 1) weights = seq / seq.sum() @@ -294,12 +299,18 @@ async def flow_rates( weights=weights, ) yield 'dvlm_rate', dvlm_wma[-1] - trade_rate_wma = _wma( - dvlm_shm.array['trade_count'], - period, - weights=weights, - ) - yield 'trade_rate', trade_rate_wma[-1] + + if period > 1: + trade_rate_wma = _wma( + dvlm_shm.array['trade_count'], + period, + weights=weights, + ) + yield 'trade_rate', trade_rate_wma[-1] + else: + # instantaneous rate per sample step + count = dvlm_shm.array['trade_count'][-1] + yield 'trade_rate', count # TODO: skip this if no dark vlm is declared # by symbol info (eg. in crypto$) @@ -310,12 +321,17 @@ async def flow_rates( ) yield 'dark_dvlm_rate', dark_dvlm_wma[-1] - dark_trade_rate_wma = _wma( - dvlm_shm.array['dark_trade_count'], - period, - weights=weights, - ) - yield 'dark_trade_rate', dark_trade_rate_wma[-1] + if period > 1: + dark_trade_rate_wma = _wma( + dvlm_shm.array['dark_trade_count'], + period, + weights=weights, + ) + yield 'dark_trade_rate', dark_trade_rate_wma[-1] + else: + # instantaneous rate per sample step + dark_count = dvlm_shm.array['dark_trade_count'][-1] + yield 'dark_trade_rate', dark_count # XXX: ib specific schema we should # probably pre-pack ourselves. From 30cf54480def0af7ad77ca3bc39d26400650021a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Feb 2022 13:59:26 -0500 Subject: [PATCH 35/47] Add more appropriate default params --- piker/ui/_fsp.py | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index d5459d1f..3cf086ce 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -158,7 +158,11 @@ async def open_fsp_sidepane( sidepane.model = FspConfig() # just a logger for now until we get fsp configs up and running. - async def settings_change(key: str, value: str) -> bool: + async def settings_change( + key: str, + value: str + + ) -> bool: print(f'{key}: {value}') return True @@ -573,12 +577,14 @@ async def open_vlm_displays( async with ( open_fsp_sidepane( linked, { - 'vlm': { + 'flows': { + + # TODO: add support for dynamically changing these 'params': { - 'price_func': { - 'default_value': 'chl3', - # tell target ``Edit`` widget to not allow - # edits for now. + u'\u03BC' + '_type': {'default_value': 'arithmetic'}, + 'period': { + 'default_value': '16', + # make widget un-editable for now. 'widget_kwargs': {'readonly': True}, }, }, From 4a7b2d835b92699381a5d9ab9889afa08d9be94d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Feb 2022 07:46:36 -0500 Subject: [PATCH 36/47] Yield 0 initial values from `flow_rates` fsp --- piker/fsp/_volume.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index eaeca7cf..bef5ea5f 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -272,6 +272,11 @@ async def flow_rates( vr = quote.get('volumeRate') yield '1m_vlm_rate', vr or 0 + yield 'trade_rate', 0 + yield 'dark_trade_rate', 0 + yield 'dvlm_rate', 0 + yield 'dark_dvlm_rate', 0 + # NOTE: in theory we could dynamically allocate a cascade based on # this call but not sure if that's too "dynamic" in terms of # validating cascade flows from message typing perspective. @@ -280,10 +285,6 @@ async def flow_rates( # on this same source flow. dvlm_shm = dolla_vlm.get_shm(ohlcv) - # breakpoint() - # import tractor - # await tractor.breakpoint() - # precompute arithmetic mean weights (all ones) seq = np.full((period,), 1) weights = seq / seq.sum() @@ -306,7 +307,9 @@ async def flow_rates( period, weights=weights, ) - yield 'trade_rate', trade_rate_wma[-1] + trade_rate = trade_rate_wma[-1] + # print(trade_rate) + yield 'trade_rate', trade_rate else: # instantaneous rate per sample step count = dvlm_shm.array['trade_count'][-1] From 8f467bf4f0ce4abf0d1e2c163eeb2d1cd2bd7c28 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Feb 2022 08:21:08 -0500 Subject: [PATCH 37/47] Factor batch curve plotting into helper func --- piker/ui/_fsp.py | 203 ++++++++++++++++++----------------------------- 1 file changed, 77 insertions(+), 126 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 3cf086ce..7718f9e7 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -617,15 +617,13 @@ async def open_vlm_displays( names: list[str], ) -> tuple[float, float]: + mx = 0 for name in names: mxmn = chart.maxmin(name=name) if mxmn: mx = max(mxmn[1], mx) - # if mx: - # return 0, mxmn[1] - return 0, mx chart.view._maxmin = partial(maxmin, names=['volume']) @@ -700,9 +698,10 @@ async def open_vlm_displays( for event in tasks_ready: n.start_soon(event.wait) - ################### # dolla vlm overlay - ################### + # XXX: the main chart already contains a vlm "units" axis + # so here we add an overlay wth a y-range in + # $ liquidity-value units (normally a fiat like USD). dvlm_pi = chart.overlay_plotitem( 'dolla_vlm', index=0, # place axis on inside (nearest to chart) @@ -717,26 +716,27 @@ async def open_vlm_displays( }, ) + # all to be overlayed curve names + fields = [ + 'dolla_vlm', + 'dark_vlm', + ] + dvlm_rate_fields = [ + 'dvlm_rate', + 'dark_dvlm_rate', + ] + trade_rate_fields = [ + 'trade_rate', + 'dark_trade_rate', + ] + # add custom auto range handler dvlm_pi.vb._maxmin = partial( maxmin, # keep both regular and dark vlm in view - names=[ - 'dolla_vlm', - 'dark_vlm', - - 'dvlm_rate', - 'dark_dvlm_rate', - ], + names=fields + dvlm_rate_fields, ) - curve, _ = chart.draw_curve( - name='dolla_vlm', - data=dvlm_shm.array, - array_key='dolla_vlm', - overlay=dvlm_pi, - step_mode=True, - ) # TODO: is there a way to "sync" the dual axes such that only # one curve is needed? # hide the original vlm curve since the $vlm one is now @@ -744,27 +744,50 @@ async def open_vlm_displays( # liquidity events (well at least on low OHLC periods - 1s). vlm_curve.hide() - # TODO: we need a better API to do this.. - # specially store ref to shm for lookup in display loop - # since only a placeholder of `None` is entered in - # ``.draw_curve()``. - chart._overlays['dolla_vlm'] = dvlm_shm - - ################ - # dark vlm curve - ################ - # darker theme hue (obvsly) + # use slightly less light (then bracket) gray + # for volume from "main exchange" and a more "bluey" + # gray for "dark" vlm. + vlm_color = 'i3' dark_vlm_color = 'charcoal' - curve, _ = chart.draw_curve( - name='dark_vlm', - data=dvlm_shm.array, - array_key='dark_vlm', - overlay=dvlm_pi, - color=dark_vlm_color, + + # add dvlm (step) curves to common view + def chart_curves( + names: list[str], + pi: pg.PlotItem, + shm: ShmArray, + step_mode: bool = False, + style: str = 'solid', + + ) -> None: + for name in names: + if 'dark' in name: + color = dark_vlm_color + else: + color = vlm_color + + curve, _ = chart.draw_curve( + # name='dolla_vlm', + name=name, + data=shm.array, + array_key=name, + overlay=pi, + color=color, + step_mode=step_mode, + style=style, + ) + + # TODO: we need a better API to do this.. + # specially store ref to shm for lookup in display loop + # since only a placeholder of `None` is entered in + # ``.draw_curve()``. + chart._overlays[name] = shm + + chart_curves( + fields, + dvlm_pi, + dvlm_shm, step_mode=True, - # **conf.get('chart_kwargs', {}) ) - chart._overlays['dark_vlm'] = dvlm_shm # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is # up since this one depends on it. @@ -777,80 +800,21 @@ async def open_vlm_displays( ) await started.wait() - # curve, _ = chart.draw_curve( - # name='1m_vlm_rate', - # data=fr_shm.array, - # array_key='1m_vlm_rate', - # overlay=fr_pi, - # color='jet', - # style='solid', - # ) - # chart._overlays['1m_vlm_rate'] = fr_shm - - # use slightly less light (then bracket) gray - # for volume from "main exchange". - vlm_color = 'i3' - - curve, _ = chart.draw_curve( - name='dvlm_rate', - data=fr_shm.array, - array_key='dvlm_rate', - overlay=dvlm_pi, - color=vlm_color, - style='solid', + chart_curves( + dvlm_rate_fields, + dvlm_pi, + fr_shm, ) - chart._overlays['dvlm_rate'] = fr_shm - curve, _ = chart.draw_curve( - name='dark_dvlm_rate', - data=fr_shm.array, - array_key='dark_dvlm_rate', - overlay=dvlm_pi, - color=dark_vlm_color, - style='solid', - ) - chart._overlays['dark_dvlm_rate'] = fr_shm - - # vlm rate overlay - #################### - # (needs separate axis since trade counts are likely - # different scale then vlm) - - # vlmrate_pi = chart.overlay_plotitem( - # 'vlm_rates', - # index=0, # place axis on inside (nearest to chart) - - # # NOTE: we might want to suffix with a \w - # # on lhs and prefix for the rhs axis labels? - # axis_title=' vlm/m', - # axis_side='left', - # axis_kwargs={ - # 'typical_max_str': ' 100.0 M ', - # 'formatter': partial( - # humanize, - # digits=2, - # ), - # 'text_color': vlm_color, - # }, - # ) - # # add custom auto range handler - # vlmrate.vb._maxmin = partial( - # maxmin, - # # keep both regular and dark vlm in view - # names=[ - # # '1m_vlm_rate', - # ], - # ) - - #################### # Trade rate overlay - #################### + # XXX: requires an additional overlay for + # a trades-per-period (time) y-range. tr_pi = chart.overlay_plotitem( 'trade_rates', # TODO: dynamically update period (and thus this axis?) # title from user input. - axis_title='clrs/Ts', + axis_title='clrs/ts', axis_side='left', axis_kwargs={ @@ -863,35 +827,22 @@ async def open_vlm_displays( }, ) - fields = [ - 'trade_rate', - 'dark_trade_rate', - # '1m_trade_rate', - ] + # add custom auto range handler tr_pi.vb._maxmin = partial( maxmin, # keep both regular and dark vlm in view - names=fields, + names=trade_rate_fields, ) + chart_curves( + trade_rate_fields, + tr_pi, + fr_shm, - for field in fields: - if 'dark' in field: - color = dark_vlm_color - else: - color = vlm_color - - curve, _ = chart.draw_curve( - name=field, - data=fr_shm.array, - array_key=field, - overlay=tr_pi, - color=color, - # dashed line to represent "individual trades" being - # more "granular" B) - style='dash', - ) - chart._overlays[field] = fr_shm + # dashed line to represent "individual trades" being + # more "granular" B) + style='dash', + ) for pi in (dvlm_pi, tr_pi): for name, axis_info in pi.axes.items(): From 326b2c089a4b0742bdc4b5b344bf5378530ad366 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Feb 2022 12:04:01 -0500 Subject: [PATCH 38/47] Drop dvlm 'rates' (they're just means), add default params, period -> 6 --- piker/fsp/_volume.py | 46 +++++++++++++++++++++++++------------------- 1 file changed, 26 insertions(+), 20 deletions(-) diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index bef5ea5f..47211234 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -97,7 +97,10 @@ async def tina_vwap( # vwap_tot = h_vwap[-1] async for quote in source: - for tick in iterticks(quote, types=['trade']): + for tick in iterticks( + quote, + types=['trade'], + ): # c, h, l, v = ohlcv.array[-1][ # ['closes', 'high', 'low', 'volume'] @@ -150,7 +153,7 @@ async def dolla_vlm( } i = ohlcv.index - output = dvlm = vlm = 0 + dvlm = vlm = 0 dark_trade_count = trade_count = 0 async for quote in source: @@ -172,7 +175,6 @@ async def dolla_vlm( i = li trade_count = dark_trade_count = dvlm = vlm = 0 - # TODO: for marginned instruments (futes, etfs?) we need to # show the margin $vlm by multiplying by whatever multiplier # is reported in the sym info. @@ -232,7 +234,11 @@ async def flow_rates( # FSPs, user input, and possibly any general event stream in # real-time. Hint: ideally implemented with caching until mutated # ;) - period: 'Param[int]' = 16, # noqa + period: 'Param[int]' = 6, # noqa + + # TODO: support other means by providing a map + # to weights `partial()`-ed with `wma()`? + mean_type: str = 'arithmetic', # TODO (idea): a generic for declaring boxed fsps much like ``pytest`` # fixtures? This probably needs a lot of thought if we want to offer @@ -262,11 +268,11 @@ async def flow_rates( 'dark_dvlm_rate': None, } - ltr = 0 - lvr = 0 - # TODO: 3.10 do ``anext()`` quote = await source.__anext__() + + # ltr = 0 + # lvr = 0 tr = quote.get('tradeRate') yield '1m_trade_rate', tr or 0 vr = quote.get('volumeRate') @@ -294,12 +300,12 @@ async def flow_rates( log.error("OH WTF NO QUOTE IN FSP") continue - dvlm_wma = _wma( - dvlm_shm.array['dolla_vlm'], - period, - weights=weights, - ) - yield 'dvlm_rate', dvlm_wma[-1] + # dvlm_wma = _wma( + # dvlm_shm.array['dolla_vlm'], + # period, + # weights=weights, + # ) + # yield 'dvlm_rate', dvlm_wma[-1] if period > 1: trade_rate_wma = _wma( @@ -317,12 +323,12 @@ async def flow_rates( # TODO: skip this if no dark vlm is declared # by symbol info (eg. in crypto$) - dark_dvlm_wma = _wma( - dvlm_shm.array['dark_vlm'], - period, - weights=weights, - ) - yield 'dark_dvlm_rate', dark_dvlm_wma[-1] + # dark_dvlm_wma = _wma( + # dvlm_shm.array['dark_vlm'], + # period, + # weights=weights, + # ) + # yield 'dark_dvlm_rate', dark_dvlm_wma[-1] if period > 1: dark_trade_rate_wma = _wma( @@ -338,13 +344,13 @@ async def flow_rates( # XXX: ib specific schema we should # probably pre-pack ourselves. + # tr = quote.get('tradeRate') # if tr is not None and tr != ltr: # # print(f'trade rate: {tr}') # yield '1m_trade_rate', tr # ltr = tr - # # TODO: we *could* do an ohlc3 # vr = quote.get('volumeRate') # if vr is not None and vr != lvr: # # print(f'vlm rate: {vr}') From 860ed99757d8c8fb3a27e13b2a5a9ba9a5ccbda8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Feb 2022 12:05:56 -0500 Subject: [PATCH 39/47] Drop dvlm "rates" curves from flows chart --- piker/ui/_fsp.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 7718f9e7..bd037d9e 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -22,6 +22,7 @@ Financial signal processing cluster and real-time graphics management. ''' from contextlib import asynccontextmanager as acm from functools import partial +import inspect from itertools import cycle from typing import Optional, AsyncGenerator, Any @@ -574,6 +575,9 @@ async def open_vlm_displays( be spawned here. ''' + sig = inspect.signature(flow_rates.func) + params = sig.parameters + async with ( open_fsp_sidepane( linked, { @@ -581,9 +585,11 @@ async def open_vlm_displays( # TODO: add support for dynamically changing these 'params': { - u'\u03BC' + '_type': {'default_value': 'arithmetic'}, + u'\u03BC' + '_type': { + 'default_value': str(params['mean_type'].default), + }, 'period': { - 'default_value': '16', + 'default_value': str(params['period'].default), # make widget un-editable for now. 'widget_kwargs': {'readonly': True}, }, @@ -593,6 +599,12 @@ async def open_vlm_displays( ) as sidepane, open_fsp_admin(linked, ohlcv) as admin, ): + # TODO: support updates + # period_field = sidepane.fields['period'] + # period_field.setText( + # str(period_param.default) + # ) + # built-in vlm which we plot ASAP since it's # usually data provided directly with OHLC history. shm = ohlcv @@ -762,8 +774,10 @@ async def open_vlm_displays( for name in names: if 'dark' in name: color = dark_vlm_color - else: + elif 'rate' in name: color = vlm_color + else: + color = 'bracket' curve, _ = chart.draw_curve( # name='dolla_vlm', @@ -791,6 +805,7 @@ async def open_vlm_displays( # spawn flow rates fsp **ONLY AFTER** the 'dolla_vlm' fsp is # up since this one depends on it. + fr_shm, started = await admin.start_engine_task( flow_rates, { # fsp engine conf @@ -814,7 +829,7 @@ async def open_vlm_displays( # TODO: dynamically update period (and thus this axis?) # title from user input. - axis_title='clrs/ts', + axis_title='clears', axis_side='left', axis_kwargs={ @@ -838,10 +853,11 @@ async def open_vlm_displays( trade_rate_fields, tr_pi, fr_shm, + # step_mode=True, # dashed line to represent "individual trades" being # more "granular" B) - style='dash', + # style='dash', ) for pi in (dvlm_pi, tr_pi): From e3a3fd2d391de1f2eaace50026f5cc154577f986 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Feb 2022 15:52:50 -0500 Subject: [PATCH 40/47] Add a `Flow` compound type for coupling graphics with backing data-streams --- piker/ui/_chart.py | 38 ++++++++++++++++++++++++++++---------- 1 file changed, 28 insertions(+), 10 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 74ef2891..e2fcc9ec 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -33,6 +33,7 @@ from PyQt5.QtWidgets import ( import numpy as np import pyqtgraph as pg import trio +from pydantic import BaseModel from ._axes import ( DynamicDateAxis, @@ -614,17 +615,30 @@ class LinkedSplits(QWidget): cpw.sidepane.setMinimumWidth(sp_w) cpw.sidepane.setMaximumWidth(sp_w) -# import pydantic -# class Graphics(pydantic.BaseModel): +# class FlowsTable(pydantic.BaseModel): # ''' # Data-AGGRegate: high level API onto multiple (categorized) -# ``ShmArray``s with high level processing routines for -# graphics computations and display. +# ``Flow``s with high level processing routines for +# multi-graphics computations and display. # ''' -# arrays: dict[str, np.ndarray] = {} -# graphics: dict[str, pg.GraphicsObject] = {} +# flows: dict[str, np.ndarray] = {} + + +class Flow(BaseModel): + ''' + (FinancialSignal-)Flow compound type which wraps a real-time + graphics (curve) and its backing data stream together for high level + access and control. + + ''' + class Config: + arbitrary_types_allowed = True + + name: str + plot: pg.PlotItem + shm: Optional[ShmArray] = None # may be filled in "later" class ChartPlotWidget(pg.PlotWidget): @@ -721,8 +735,9 @@ class ChartPlotWidget(pg.PlotWidget): self.data_key: array, } self._graphics = {} # registry of underlying graphics + # registry of overlay curve names - self._overlays: dict[str, ShmArray] = {} + self._flows: dict[str, Flow] = {} self._feeds: dict[Symbol, Feed] = {} @@ -981,7 +996,7 @@ class ChartPlotWidget(pg.PlotWidget): # TODO: this probably needs its own method? if overlay: # anchor_at = ('bottom', 'left') - self._overlays[name] = None + self._flows[name] = Flow(name=name, plot=pi) if isinstance(overlay, pg.PlotItem): if overlay not in self.pi_overlay.overlays: @@ -1062,7 +1077,7 @@ class ChartPlotWidget(pg.PlotWidget): assert len(array) data_key = array_key or graphics_name - if graphics_name not in self._overlays: + if graphics_name not in self._flows: self._arrays[self.name] = array else: self._arrays[data_key] = array @@ -1164,12 +1179,15 @@ class ChartPlotWidget(pg.PlotWidget): # f"begin: {begin}, end: {end}, extra: {extra}" # ) + # TODO: here we should instead look up the ``Flow.shm.array`` + # and read directly from shm to avoid copying to memory first + # and then reading it again here. a = self._arrays.get(name or self.name) if a is None: return None ifirst = a[0]['index'] - bars = a[lbar - ifirst:rbar - ifirst + 1] + bars = a[lbar - ifirst:(rbar - ifirst) + 1] if not len(bars): # likely no data loaded yet or extreme scrolling? From ef04781a2ba9638b2bb4b1573f3e9d758414a998 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Feb 2022 15:55:57 -0500 Subject: [PATCH 41/47] Expect new flow type through display and fsp UI code --- piker/ui/_display.py | 24 +++++++++++++----------- piker/ui/_fsp.py | 14 ++++++++------ 2 files changed, 21 insertions(+), 17 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index c2333350..df51d444 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -106,7 +106,7 @@ def chart_maxmin( return last_bars_range, mx, max(mn, 0), mx_vlm_in_view -async def update_linked_charts_graphics( +async def graphics_update_loop( linked: LinkedSplits, stream: tractor.MsgStream, ohlcv: np.ndarray, @@ -258,13 +258,15 @@ async def update_linked_charts_graphics( ) last_mx_vlm = mx_vlm_in_view - for curve_name, shm in vlm_chart._overlays.items(): + for curve_name, flow in vlm_chart._flows.items(): update_fsp_chart( vlm_chart, - shm, + flow.shm, curve_name, array_key=curve_name, ) + # is this even doing anything? + flow.plot.vb._set_yrange() ticks_frame = quote.get('ticks', ()) @@ -411,14 +413,14 @@ async def update_linked_charts_graphics( # TODO: all overlays on all subplots.. # run synchronous update on all derived overlays - for curve_name, shm in chart._overlays.items(): + for curve_name, flow in chart._flows.items(): update_fsp_chart( chart, - shm, + flow.shm, curve_name, array_key=curve_name, ) - # chart._set_yrange() + # chart.view._set_yrange() async def check_for_new_bars( @@ -473,11 +475,11 @@ async def check_for_new_bars( ) # main chart overlays - for name in price_chart._overlays: - + # for name in price_chart._flows: + for curve_name in price_chart._flows: price_chart.update_curve_from_array( - name, - price_chart._arrays[name] + curve_name, + price_chart._arrays[curve_name] ) # each subplot @@ -614,7 +616,7 @@ async def display_symbol_data( # start graphics update loop after receiving first live quote ln.start_soon( - update_linked_charts_graphics, + graphics_update_loop, linkedsplits, feed.stream, ohlcv, diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index bd037d9e..03d483a0 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -250,7 +250,7 @@ async def run_fsp_ui( **conf.get('chart_kwargs', {}) ) # specially store ref to shm for lookup in display loop - chart._overlays[name] = shm + chart._flows[name].shm = shm else: # create a new sub-chart widget for this fsp @@ -634,7 +634,9 @@ async def open_vlm_displays( for name in names: mxmn = chart.maxmin(name=name) if mxmn: - mx = max(mxmn[1], mx) + ymax = mxmn[1] + if ymax > mx: + mx = ymax return 0, mx @@ -730,8 +732,8 @@ async def open_vlm_displays( # all to be overlayed curve names fields = [ - 'dolla_vlm', - 'dark_vlm', + 'dolla_vlm', + 'dark_vlm', ] dvlm_rate_fields = [ 'dvlm_rate', @@ -794,7 +796,7 @@ async def open_vlm_displays( # specially store ref to shm for lookup in display loop # since only a placeholder of `None` is entered in # ``.draw_curve()``. - chart._overlays[name] = shm + chart._flows[name].shm = shm chart_curves( fields, @@ -857,7 +859,7 @@ async def open_vlm_displays( # dashed line to represent "individual trades" being # more "granular" B) - # style='dash', + style='dash', ) for pi in (dvlm_pi, tr_pi): From 1c49f7f47fb4f3d9883e3f1a3ce116ed0aa2431b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Feb 2022 15:57:02 -0500 Subject: [PATCH 42/47] Tweak dash pattern to be less sparse --- piker/ui/_curve.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/piker/ui/_curve.py b/piker/ui/_curve.py index 78fdbad1..01b57c30 100644 --- a/piker/ui/_curve.py +++ b/piker/ui/_curve.py @@ -130,8 +130,10 @@ class FastAppendCurve(pg.PlotCurveItem): # all history of curve is drawn in single px thickness pen = pg.mkPen(hcolor(color)) pen.setStyle(_line_styles[style]) + if 'dash' in style: - pen.setDashPattern([6, 6]) + pen.setDashPattern([8, 3]) + self.setPen(pen) # last segment is drawn in 2px thickness for emphasis @@ -287,6 +289,7 @@ class FastAppendCurve(pg.PlotCurveItem): x_last + 0.5, y_last ) else: + # print((x[-1], y_last)) self._last_line = QLineF( x[-2], y[-2], x[-1], y_last @@ -337,6 +340,7 @@ class FastAppendCurve(pg.PlotCurveItem): p: QtGui.QPainter, opt: QtWidgets.QStyleOptionGraphicsItem, w: QtWidgets.QWidget + ) -> None: profiler = pg.debug.Profiler(disabled=not pg_profile_enabled()) @@ -354,11 +358,11 @@ class FastAppendCurve(pg.PlotCurveItem): # p.drawPath(self.path) # profiler('.drawPath()') - # else: p.setPen(self.last_step_pen) p.drawLine(self._last_line) profiler('.drawLine()') + # else: p.setPen(self.opts['pen']) p.drawPath(self.path) profiler('.drawPath()') From e0462f0a8c8cff2341923e50e14f13d85b441501 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Feb 2022 15:57:32 -0500 Subject: [PATCH 43/47] Type and formatting fixes --- piker/ui/_interaction.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index 2b8448ca..ce56b7b6 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -342,7 +342,7 @@ class ChartView(ViewBox): wheelEventRelay = QtCore.Signal(object, object, object) event_relay_source: 'Optional[ViewBox]' = None - relays: dict[str, Signal] = {} + relays: dict[str, QtCore.Signal] = {} def __init__( self, @@ -474,7 +474,11 @@ class ChartView(ViewBox): # lastPos = ev.lastPos() # dif = pos - lastPos # dif = dif * -1 - center = Point(fn.invertQTransform(self.childGroup.transform()).map(ev.pos())) + center = Point( + fn.invertQTransform( + self.childGroup.transform() + ).map(ev.pos()) + ) # scale_y = 1.3 ** (center.y() * -1 / 20) self.scaleBy(s, center) @@ -674,7 +678,7 @@ class ChartView(ViewBox): # flag to prevent triggering sibling charts from the same linked # set from recursion errors. autoscale_linked_plots: bool = True, - autoscale_overlays: bool = False, + # autoscale_overlays: bool = False, ) -> None: ''' @@ -731,7 +735,7 @@ class ChartView(ViewBox): ) if set_range: - yrange = self._maxmin() + yrange = self._maxmin() if yrange is None: return From 723eef3fd621cbf9b0cefee5513d4a0b1bdda94c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Feb 2022 16:00:10 -0500 Subject: [PATCH 44/47] :facepalm: assign `Flow` *after* type check... --- piker/ui/_chart.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index e2fcc9ec..a9350d97 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -995,9 +995,6 @@ class ChartPlotWidget(pg.PlotWidget): # TODO: this probably needs its own method? if overlay: - # anchor_at = ('bottom', 'left') - self._flows[name] = Flow(name=name, plot=pi) - if isinstance(overlay, pg.PlotItem): if overlay not in self.pi_overlay.overlays: raise RuntimeError( @@ -1005,6 +1002,9 @@ class ChartPlotWidget(pg.PlotWidget): ) pi = overlay + # anchor_at = ('bottom', 'left') + self._flows[name] = Flow(name=name, plot=pi) + else: # anchor_at = ('top', 'left') From 45464a54652d133557008cfa11b22312d2be030e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Feb 2022 22:15:57 -0500 Subject: [PATCH 45/47] Drop graphics throttle to 22Hz, add a `.maxmin` to our view box --- piker/ui/_display.py | 7 +++++-- piker/ui/_interaction.py | 10 ++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/piker/ui/_display.py b/piker/ui/_display.py index df51d444..3bfd327a 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -54,7 +54,7 @@ from ..log import get_logger log = get_logger(__name__) # TODO: load this from a config.toml! -_quote_throttle_rate: int = 58 # Hz +_quote_throttle_rate: int = 6 + 16 # Hz # a working tick-type-classes template @@ -266,7 +266,10 @@ async def graphics_update_loop( array_key=curve_name, ) # is this even doing anything? - flow.plot.vb._set_yrange() + flow.plot.vb._set_yrange( + autoscale_linked_plots=False, + name=curve_name, + ) ticks_frame = quote.get('ticks', ()) diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index ce56b7b6..dca41855 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -421,6 +421,14 @@ class ChartView(ViewBox): if self._maxmin is None: self._maxmin = chart.maxmin + @property + def maxmin(self) -> Callable: + return self._maxmin + + @maxmin.setter + def maxmin(self, callback: Callable) -> None: + self._maxmin = callback + def wheelEvent( self, ev, @@ -678,6 +686,7 @@ class ChartView(ViewBox): # flag to prevent triggering sibling charts from the same linked # set from recursion errors. autoscale_linked_plots: bool = True, + name: Optional[str] = None, # autoscale_overlays: bool = False, ) -> None: @@ -735,6 +744,7 @@ class ChartView(ViewBox): ) if set_range: + yrange = self._maxmin() if yrange is None: return From 228f21d7b051521ca5f7957d0dc6265b0925c48f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Feb 2022 22:16:33 -0500 Subject: [PATCH 46/47] Zero trade rates each step --- piker/ui/_fsp.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 03d483a0..eac8f27d 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -632,6 +632,7 @@ async def open_vlm_displays( mx = 0 for name in names: + mxmn = chart.maxmin(name=name) if mxmn: ymax = mxmn[1] @@ -640,7 +641,7 @@ async def open_vlm_displays( return 0, mx - chart.view._maxmin = partial(maxmin, names=['volume']) + chart.view.maxmin = partial(maxmin, names=['volume']) # TODO: fix the x-axis label issue where if you put # the axis on the left it's totally not lined up... @@ -812,6 +813,7 @@ async def open_vlm_displays( flow_rates, { # fsp engine conf 'func_name': 'flow_rates', + 'zero_on_step': True, }, # loglevel, ) @@ -844,13 +846,13 @@ async def open_vlm_displays( }, ) - # add custom auto range handler - tr_pi.vb._maxmin = partial( + tr_pi.vb.maxmin = partial( maxmin, # keep both regular and dark vlm in view names=trade_rate_fields, ) + chart_curves( trade_rate_fields, tr_pi, From 71f9b5c000608b2322e2e6ec89129c0d5e510bb4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Feb 2022 08:08:42 -0500 Subject: [PATCH 47/47] Don't enable curve coord cache unless in step mode You can get a weird "last line segment" artifact if *only* that segment is drawn and the cache is enabled, so just disable unless in step mode at startup and re-flash as normal when new path data is appended. Add a `.disable_cache()` method for the multi-use in the update method. Use line style on the `._last_line: QLineF` segment as well. --- piker/ui/_curve.py | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/piker/ui/_curve.py b/piker/ui/_curve.py index 01b57c30..7fc43e4e 100644 --- a/piker/ui/_curve.py +++ b/piker/ui/_curve.py @@ -116,6 +116,7 @@ class FastAppendCurve(pg.PlotCurveItem): color: str = 'default_lightest', fill_color: Optional[str] = None, style: str = 'solid', + name: Optional[str] = None, **kwargs @@ -124,7 +125,7 @@ class FastAppendCurve(pg.PlotCurveItem): # TODO: we can probably just dispense with the parent since # we're basically only using the pen setting now... super().__init__(*args, **kwargs) - + self._name = name self._xrange: tuple[int, int] = self.dataBounds(ax=0) # all history of curve is drawn in single px thickness @@ -137,7 +138,9 @@ class FastAppendCurve(pg.PlotCurveItem): self.setPen(pen) # last segment is drawn in 2px thickness for emphasis - self.last_step_pen = pg.mkPen(hcolor(color), width=2) + # self.last_step_pen = pg.mkPen(hcolor(color), width=2) + self.last_step_pen = pg.mkPen(pen, width=2) + self._last_line: QLineF = None self._last_step_rect: QRectF = None @@ -151,7 +154,12 @@ class FastAppendCurve(pg.PlotCurveItem): # interactions slower (such as zooming) and if so maybe if/when # we implement a "history" mode for the view we disable this in # that mode? - self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) + if step_mode: + # don't enable caching by default for the case where the + # only thing drawn is the "last" line segment which can + # have a weird artifact where it won't be fully drawn to its + # endpoint (something we saw on trade rate curves) + self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) def update_from_array( self, @@ -261,10 +269,13 @@ class FastAppendCurve(pg.PlotCurveItem): # self.path.connectPath(append_path) path.connectPath(append_path) - # XXX: pretty annoying but, without this there's little - # artefacts on the append updates to the curve... - self.setCacheMode(QtWidgets.QGraphicsItem.NoCache) - self.prepareGeometryChange() + self.disable_cache() + flip_cache = True + + if ( + self._step_mode + ): + self.disable_cache() flip_cache = True # print(f"update br: {self.path.boundingRect()}") @@ -304,6 +315,12 @@ class FastAppendCurve(pg.PlotCurveItem): # XXX: seems to be needed to avoid artifacts (see above). self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) + def disable_cache(self) -> None: + # XXX: pretty annoying but, without this there's little + # artefacts on the append updates to the curve... + self.setCacheMode(QtWidgets.QGraphicsItem.NoCache) + self.prepareGeometryChange() + def boundingRect(self): if self.path is None: return QtGui.QPainterPath().boundingRect()