diff --git a/piker/ui/_curve.py b/piker/ui/_curve.py index 548eaac5..b9a143a2 100644 --- a/piker/ui/_curve.py +++ b/piker/ui/_curve.py @@ -28,10 +28,7 @@ from PyQt5.QtWidgets import QGraphicsItem from PyQt5.QtCore import ( Qt, QLineF, - QSizeF, QRectF, - # QRect, - QPointF, ) from PyQt5.QtGui import ( QPainter, @@ -89,9 +86,9 @@ class Curve(pg.GraphicsObject): ''' # sub-type customization methods - sub_br: Optional[Callable] = None - sub_paint: Optional[Callable] = None declare_paintables: Optional[Callable] = None + sub_paint: Optional[Callable] = None + # sub_br: Optional[Callable] = None def __init__( self, @@ -140,9 +137,7 @@ class Curve(pg.GraphicsObject): # self.last_step_pen = pg.mkPen(hcolor(color), width=2) self.last_step_pen = pg.mkPen(pen, width=2) - # self._last_line: Optional[QLineF] = None self._last_line = QLineF() - self._last_w: float = 1 # flat-top style histogram-like discrete curve # self._step_mode: bool = step_mode @@ -231,8 +226,8 @@ class Curve(pg.GraphicsObject): self.path.clear() if self.fast_path: - # self.fast_path.clear() - self.fast_path = None + self.fast_path.clear() + # self.fast_path = None @cm def reset_cache(self) -> None: @@ -252,77 +247,65 @@ class Curve(pg.GraphicsObject): self.boundingRect = self._path_br return self._path_br() + # Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect def _path_br(self): ''' Post init ``.boundingRect()```. ''' - # hb = self.path.boundingRect() - hb = self.path.controlPointRect() - hb_size = hb.size() - - fp = self.fast_path - if fp: - fhb = fp.controlPointRect() - hb_size = fhb.size() + hb_size - - # print(f'hb_size: {hb_size}') - - # if self._last_step_rect: - # hb_size += self._last_step_rect.size() - - # if self._line: - # br = self._last_step_rect.bottomRight() - - # tl = QPointF( - # # self._vr[0], - # # hb.topLeft().y(), - # # 0, - # # hb_size.height() + 1 + # profiler = Profiler( + # msg=f'Curve.boundingRect(): `{self._name}`', + # disabled=not pg_profile_enabled(), + # ms_threshold=ms_slower_then, # ) - - # br = self._last_step_rect.bottomRight() - - w = hb_size.width() - h = hb_size.height() - - sbr = self.sub_br - if sbr: - w, h = self.sub_br(w, h) - else: - # assume plain line graphic and use - # default unit step in each direction. - - # only on a plane line do we include - # and extra index step's worth of width - # since in the step case the end of the curve - # actually terminates earlier so we don't need - # this for the last step. - w += self._last_w - # ll = self._last_line - h += 1 # ll.y2() - ll.y1() - - # br = QPointF( - # self._vr[-1], - # # tl.x() + w, - # tl.y() + h, - # ) - - br = QRectF( - - # top left - # hb.topLeft() - # tl, - QPointF(hb.topLeft()), - - # br, - # total size - # QSizeF(hb_size) - # hb_size, - QSizeF(w, h) + pr = self.path.controlPointRect() + hb_tl, hb_br = ( + pr.topLeft(), + pr.bottomRight(), + ) + mn_y = hb_tl.y() + mx_y = hb_br.y() + most_left = hb_tl.x() + most_right = hb_br.x() + # profiler('calc path vertices') + + # TODO: if/when we get fast path appends working in the + # `Renderer`, then we might need to actually use this.. + # fp = self.fast_path + # if fp: + # fhb = fp.controlPointRect() + # # hb_size = fhb.size() + hb_size + # br = pr.united(fhb) + + # XXX: *was* a way to allow sub-types to extend the + # boundingrect calc, but in the one use case for a step curve + # doesn't seem like we need it as long as the last line segment + # is drawn as it is? + + # sbr = self.sub_br + # if sbr: + # # w, h = self.sub_br(w, h) + # sub_br = sbr() + # br = br.united(sub_br) + + # assume plain line graphic and use + # default unit step in each direction. + ll = self._last_line + y1, y2 = ll.y1(), ll.y2() + x1, x2 = ll.x1(), ll.x2() + + ymn = min(y1, y2, mn_y) + ymx = max(y1, y2, mx_y) + most_left = min(x1, x2, most_left) + most_right = max(x1, x2, most_right) + # profiler('calc last line vertices') + + return QRectF( + most_left, + ymn, + most_right - most_left + 1, + ymx, ) - # print(f'bounding rect: {br}') - return br def paint( self, @@ -340,7 +323,7 @@ class Curve(pg.GraphicsObject): sub_paint = self.sub_paint if sub_paint: - sub_paint(p, profiler) + sub_paint(p) p.setPen(self.last_step_pen) p.drawLine(self._last_line) @@ -450,36 +433,34 @@ class StepCurve(Curve): y = src_data[array_key] x_last = x[-1] + x_2last = x[-2] y_last = y[-1] + step_size = x_last - x_2last + half_step = step_size / 2 # lol, commenting this makes step curves # all "black" for me :eyeroll:.. self._last_line = QLineF( - x_last - w, 0, - x_last + w, 0, + x_2last, 0, + x_last, 0, ) self._last_step_rect = QRectF( - x_last - w, 0, - x_last + w, y_last, + x_last - half_step, 0, + step_size, y_last, ) return x, y def sub_paint( self, p: QPainter, - profiler: Profiler, ) -> None: # p.drawLines(*tuple(filter(bool, self._last_step_lines))) # p.drawRect(self._last_step_rect) p.fillRect(self._last_step_rect, self._brush) - profiler('.fillRect()') - def sub_br( - self, - path_w: float, - path_h: float, - - ) -> (float, float): - # passthrough - return path_w, path_h + # def sub_br( + # self, + # parent_br: QRectF | None = None, + # ) -> QRectF: + # return self._last_step_rect diff --git a/piker/ui/_flows.py b/piker/ui/_flows.py index f8a4a4c0..a2908905 100644 --- a/piker/ui/_flows.py +++ b/piker/ui/_flows.py @@ -25,13 +25,10 @@ incremental update. from __future__ import annotations from typing import ( Optional, - Callable, - Union, ) import msgspec import numpy as np -from numpy.lib import recfunctions as rfn import pyqtgraph as pg from PyQt5.QtGui import QPainterPath from PyQt5.QtCore import QLineF @@ -44,9 +41,10 @@ from .._profile import ( # ms_slower_then, ) from ._pathops import ( - gen_ohlc_qpath, - ohlc_to_line, - to_step_format, + IncrementalFormatter, + OHLCBarsFmtr, # Plain OHLC renderer + OHLCBarsAsCurveFmtr, # OHLC converted to line + StepCurveFmtr, # "step" curve (like for vlm) xy_downsample, ) from ._ohlc import ( @@ -65,65 +63,6 @@ from .._profile import Profiler log = get_logger(__name__) -# class FlowsTable(msgspec.Struct): -# ''' -# Data-AGGRegate: high level API onto multiple (categorized) -# ``Flow``s with high level processing routines for -# multi-graphics computations and display. - -# ''' -# flows: dict[str, np.ndarray] = {} - - -def update_ohlc_to_line( - src_shm: ShmArray, - array_key: str, - src_update: np.ndarray, - slc: slice, - ln: int, - first: int, - last: int, - is_append: bool, - -) -> np.ndarray: - - fields = ['open', 'high', 'low', 'close'] - return ( - rfn.structured_to_unstructured(src_update[fields]), - slc, - ) - - -def ohlc_flat_to_xy( - r: Renderer, - array: np.ndarray, - array_key: str, - vr: tuple[int, int], - -) -> tuple[ - np.ndarray, - np.nd.array, - str, -]: - # TODO: in the case of an existing ``.update_xy()`` - # should we be passing in array as an xy arrays tuple? - - # 2 more datum-indexes to capture zero at end - x_flat = r.x_data[r._xy_first:r._xy_last] - y_flat = r.y_data[r._xy_first:r._xy_last] - - # slice to view - ivl, ivr = vr - x_iv_flat = x_flat[ivl:ivr] - y_iv_flat = y_flat[ivl:ivr] - - # reshape to 1d for graphics rendering - y_iv = y_iv_flat.reshape(-1) - x_iv = x_iv_flat.reshape(-1) - - return x_iv, y_iv, 'all' - - def render_baritems( flow: Flow, graphics: BarItems, @@ -155,21 +94,24 @@ def render_baritems( r = self._src_r if not r: show_bars = True + # OHLC bars path renderer r = self._src_r = Renderer( flow=self, - format_xy=gen_ohlc_qpath, - last_read=read, + fmtr=OHLCBarsFmtr( + shm=flow.shm, + flow=flow, + _last_read=read, + ), ) ds_curve_r = Renderer( flow=self, - last_read=read, - - # incr update routines - allocate_xy=ohlc_to_line, - update_xy=update_ohlc_to_line, - format_xy=ohlc_flat_to_xy, + fmtr=OHLCBarsAsCurveFmtr( + shm=flow.shm, + flow=flow, + _last_read=read, + ), ) curve = FlattenedOHLC( @@ -253,77 +195,6 @@ def render_baritems( ) -def update_step_xy( - src_shm: ShmArray, - array_key: str, - y_update: np.ndarray, - slc: slice, - ln: int, - first: int, - last: int, - is_append: bool, - -) -> np.ndarray: - - # for a step curve we slice from one datum prior - # to the current "update slice" to get the previous - # "level". - if is_append: - start = max(last - 1, 0) - end = src_shm._last.value - new_y = src_shm._array[start:end][array_key] - slc = slice(start, end) - - else: - new_y = y_update - - return ( - np.broadcast_to( - new_y[:, None], (new_y.size, 2), - ), - slc, - ) - - -def step_to_xy( - r: Renderer, - array: np.ndarray, - array_key: str, - vr: tuple[int, int], - -) -> tuple[ - np.ndarray, - np.nd.array, - str, -]: - - # 2 more datum-indexes to capture zero at end - x_step = r.x_data[r._xy_first:r._xy_last+2] - y_step = r.y_data[r._xy_first:r._xy_last+2] - - lasts = array[['index', array_key]] - last = lasts[array_key][-1] - y_step[-1] = last - - # slice out in-view data - ivl, ivr = vr - ys_iv = y_step[ivl:ivr+1] - xs_iv = x_step[ivl:ivr+1] - - # flatten to 1d - y_iv = ys_iv.reshape(ys_iv.size) - x_iv = xs_iv.reshape(xs_iv.size) - - # print( - # f'ys_iv : {ys_iv[-s:]}\n' - # f'y_iv: {y_iv[-s:]}\n' - # f'xs_iv: {xs_iv[-s:]}\n' - # f'x_iv: {x_iv[-s:]}\n' - # ) - - return x_iv, y_iv, 'all' - - class Flow(msgspec.Struct): # , frozen=True): ''' (Financial Signal-)Flow compound type which wraps a real-time @@ -337,7 +208,7 @@ class Flow(msgspec.Struct): # , frozen=True): ''' name: str plot: pg.PlotItem - graphics: Union[Curve, BarItems] + graphics: Curve | BarItems _shm: ShmArray yrange: tuple[float, float] = None @@ -346,7 +217,6 @@ class Flow(msgspec.Struct): # , frozen=True): # normally this is just a plain line. ds_graphics: Optional[Curve] = None - is_ohlc: bool = False render: bool = True # toggle for display loop @@ -554,9 +424,14 @@ class Flow(msgspec.Struct): # , frozen=True): slice_to_head: int = -1 should_redraw: bool = False + should_line: bool = False rkwargs = {} - should_line = False + # TODO: probably specialize ``Renderer`` types instead of + # these logic checks? + # - put these blocks into a `.load_renderer()` meth? + # - consider a OHLCRenderer, StepCurveRenderer, Renderer? + r = self._src_r if isinstance(graphics, BarItems): # XXX: special case where we change out graphics # to a line after a certain uppx threshold. @@ -576,16 +451,36 @@ class Flow(msgspec.Struct): # , frozen=True): should_redraw = changed_to_line or not should_line self._in_ds = should_line - else: - r = self._src_r - if not r: - # just using for ``.diff()`` atm.. + elif not r: + if isinstance(graphics, StepCurve): + r = self._src_r = Renderer( flow=self, - # TODO: rename this to something with ohlc - last_read=read, + fmtr=StepCurveFmtr( + shm=self.shm, + flow=self, + _last_read=read, + ), ) + # TODO: append logic inside ``.render()`` isn't + # correct yet for step curves.. remove this to see it. + should_redraw = True + slice_to_head = -2 + + else: + r = self._src_r + if not r: + # just using for ``.diff()`` atm.. + r = self._src_r = Renderer( + flow=self, + fmtr=IncrementalFormatter( + shm=self.shm, + flow=self, + _last_read=read, + ), + ) + # ``Curve`` derivative case(s): array_key = array_key or self.name # print(array_key) @@ -595,19 +490,6 @@ class Flow(msgspec.Struct): # , frozen=True): should_ds: bool = r._in_ds showing_src_data: bool = not r._in_ds - # step_mode = getattr(graphics, '_step_mode', False) - step_mode = isinstance(graphics, StepCurve) - if step_mode: - - r.allocate_xy = to_step_format - r.update_xy = update_step_xy - r.format_xy = step_to_xy - - # TODO: append logic inside ``.render()`` isn't - # correct yet for step curves.. remove this to see it. - should_redraw = True - slice_to_head = -2 - # downsampling incremental state checking # check for and set std m4 downsample conditions uppx = graphics.x_uppx() @@ -683,26 +565,27 @@ class Flow(msgspec.Struct): # , frozen=True): # XXX: SUPER UGGGHHH... without this we get stale cache # graphics that don't update until you downsampler again.. - if reset: - with graphics.reset_cache(): - # assign output paths to graphicis obj - graphics.path = r.path - graphics.fast_path = r.fast_path + # reset = False + # if reset: + # with graphics.reset_cache(): + # # assign output paths to graphicis obj + # graphics.path = r.path + # graphics.fast_path = r.fast_path - # XXX: we don't need this right? - # graphics.draw_last_datum( - # path, - # src_array, - # data, - # reset, - # array_key, - # ) - # graphics.update() - # profiler('.update()') - else: - # assign output paths to graphicis obj - graphics.path = r.path - graphics.fast_path = r.fast_path + # # XXX: we don't need this right? + # # graphics.draw_last_datum( + # # path, + # # src_array, + # # data, + # # reset, + # # array_key, + # # ) + # # graphics.update() + # # profiler('.update()') + # else: + # assign output paths to graphicis obj + graphics.path = r.path + graphics.fast_path = r.fast_path graphics.draw_last_datum( path, @@ -786,51 +669,10 @@ class Flow(msgspec.Struct): # , frozen=True): g.update() -def by_index_and_key( - renderer: Renderer, - array: np.ndarray, - array_key: str, - vr: tuple[int, int], - -) -> tuple[ - np.ndarray, - np.ndarray, - np.ndarray, -]: - return array['index'], array[array_key], 'all' - - class Renderer(msgspec.Struct): flow: Flow - # last array view read - last_read: Optional[tuple] = None - - # default just returns index, and named array from data - format_xy: Callable[ - [np.ndarray, str], - tuple[np.ndarray] - ] = by_index_and_key - - # optional pre-graphics xy formatted data which - # is incrementally updated in sync with the source data. - allocate_xy: Optional[Callable[ - [int, slice], - tuple[np.ndarray, np.nd.array] - ]] = None - - update_xy: Optional[Callable[ - [int, slice], None] - ] = None - - x_data: Optional[np.ndarray] = None - y_data: Optional[np.ndarray] = None - - # indexes which slice into the above arrays (which are allocated - # based on source data shm input size) and allow retrieving - # incrementally updated data. - _xy_first: int = 0 - _xy_last: int = 0 + fmtr: IncrementalFormatter # output graphics rendering, the main object # processed in ``QGraphicsObject.paint()`` @@ -852,58 +694,11 @@ class Renderer(msgspec.Struct): _last_uppx: float = 0 _in_ds: bool = False - # incremental update state(s) - _last_vr: Optional[tuple[float, float]] = None - _last_ivr: Optional[tuple[float, float]] = None - - def diff( - self, - new_read: tuple[np.ndarray], - - ) -> tuple[ - np.ndarray, - np.ndarray, - ]: - ( - last_xfirst, - last_xlast, - last_array, - last_ivl, - last_ivr, - last_in_view, - ) = self.last_read - - # TODO: can the renderer just call ``Flow.read()`` directly? - # unpack latest source data read - ( - xfirst, - xlast, - array, - ivl, - ivr, - in_view, - ) = new_read - - # compute the length diffs between the first/last index entry in - # the input data and the last indexes we have on record from the - # last time we updated the curve index. - prepend_length = int(last_xfirst - xfirst) - append_length = int(xlast - last_xlast) - - # blah blah blah - # do diffing for prepend, append and last entry - return ( - slice(xfirst, last_xfirst), - prepend_length, - append_length, - slice(last_xlast, xlast), - ) - def draw_path( self, x: np.ndarray, y: np.ndarray, - connect: Union[str, np.ndarray] = 'all', + connect: str | np.ndarray = 'all', path: Optional[QPainterPath] = None, redraw: bool = False, @@ -981,166 +776,54 @@ class Renderer(msgspec.Struct): ''' # TODO: can the renderer just call ``Flow.read()`` directly? # unpack latest source data read + fmtr = self.fmtr + ( - xfirst, - xlast, + _, + _, array, ivl, ivr, in_view, ) = new_read - ( - pre_slice, - prepend_length, - append_length, - post_slice, - ) = self.diff(new_read) - - if self.update_xy: - - shm = self.flow.shm - - if self.y_data is None: - # we first need to allocate xy data arrays - # from the source data. - assert self.allocate_xy - self.x_data, self.y_data = self.allocate_xy( - shm, - array_key, - ) - self._xy_first = shm._first.value - self._xy_last = shm._last.value - profiler('allocated xy history') - - if prepend_length: - y_prepend = shm._array[pre_slice] - - if read_from_key: - y_prepend = y_prepend[array_key] - - xy_data, xy_slice = self.update_xy( - shm, - array_key, - - # this is the pre-sliced, "normally expected" - # new data that an updater would normally be - # expected to process, however in some cases (like - # step curves) the updater routine may want to do - # the source history-data reading itself, so we pass - # both here. - y_prepend, - - pre_slice, - prepend_length, - self._xy_first, - self._xy_last, - is_append=False, - ) - self.y_data[xy_slice] = xy_data - self._xy_first = shm._first.value - profiler('prepended xy history: {prepend_length}') - - if append_length: - y_append = shm._array[post_slice] - - if read_from_key: - y_append = y_append[array_key] - - xy_data, xy_slice = self.update_xy( - shm, - array_key, - - y_append, - post_slice, - append_length, - - self._xy_first, - self._xy_last, - is_append=True, - ) - # self.y_data[post_slice] = xy_data - # self.y_data[xy_slice or post_slice] = xy_data - self.y_data[xy_slice] = xy_data - self._xy_last = shm._last.value - profiler('appened xy history: {append_length}') - - if use_vr: - array = in_view - # else: - # ivl, ivr = xfirst, xlast - - hist = array[:slice_to_head] - # xy-path data transform: convert source data to a format # able to be passed to a `QPainterPath` rendering routine. - if not len(hist): + fmt_out = fmtr.format_to_1d( + new_read, + array_key, + profiler, + + slice_to_head=slice_to_head, + read_src_from_key=read_from_key, + slice_to_inview=use_vr, + ) + + # no history in view case + if not fmt_out: # XXX: this might be why the profiler only has exits? return - x_out, y_out, connect = self.format_xy( - self, - # TODO: hist here should be the pre-sliced - # x/y_data in the case where allocate_xy is - # defined? - hist, - array_key, - (ivl, ivr), - ) + ( + x_1d, + y_1d, + connect, + prepend_length, + append_length, + view_changed, + # append_tres, - profiler('sliced input arrays') - - if ( - use_vr - ): - # if a view range is passed, plan to draw the - # source ouput that's "in view" of the chart. - view_range = (ivl, ivr) - # print(f'{self._name} vr: {view_range}') - - profiler(f'view range slice {view_range}') - - vl, vr = view_range - - zoom_or_append = False - last_vr = self._last_vr - last_ivr = self._last_ivr or vl, vr - - # incremental in-view data update. - if last_vr: - # relative slice indices - lvl, lvr = last_vr - # abs slice indices - al, ar = last_ivr - - # left_change = abs(x_iv[0] - al) >= 1 - # right_change = abs(x_iv[-1] - ar) >= 1 - - if ( - # likely a zoom view change - (vr - lvr) > 2 or vl < lvl - # append / prepend update - # we had an append update where the view range - # didn't change but the data-viewed (shifted) - # underneath, so we need to redraw. - # or left_change and right_change and last_vr == view_range - - # not (left_change and right_change) and ivr - # ( - # or abs(x_iv[ivr] - livr) > 1 - ): - zoom_or_append = True - - self._last_vr = view_range - if len(x_out): - self._last_ivr = x_out[0], x_out[slice_to_head] + ) = fmt_out # redraw conditions if ( prepend_length > 0 or new_sample_rate + or view_changed + + # NOTE: comment this to try and make "append paths" + # work below.. or append_length > 0 - or zoom_or_append ): should_redraw = True @@ -1162,9 +845,9 @@ class Renderer(msgspec.Struct): elif should_ds and uppx > 1: - x_out, y_out, ymn, ymx = xy_downsample( - x_out, - y_out, + x_1d, y_1d, ymn, ymx = xy_downsample( + x_1d, + y_1d, uppx, ) self.flow.yrange = ymn, ymx @@ -1175,8 +858,8 @@ class Renderer(msgspec.Struct): self._in_ds = True path = self.draw_path( - x=x_out, - y=y_out, + x=x_1d, + y=y_1d, connect=connect, path=path, redraw=True, @@ -1191,7 +874,6 @@ class Renderer(msgspec.Struct): # TODO: get this piecewise prepend working - right now it's # giving heck on vwap... # elif prepend_length: - # breakpoint() # prepend_path = pg.functions.arrayToQPath( # x[0:prepend_length], @@ -1208,18 +890,22 @@ class Renderer(msgspec.Struct): elif ( append_length > 0 and do_append - and not should_redraw ): - # print(f'{array_key} append len: {append_length}') - new_x = x_out[-append_length - 2:] # slice_to_head] - new_y = y_out[-append_length - 2:] # slice_to_head] + print(f'{array_key} append len: {append_length}') + # new_x = x_1d[-append_length - 2:] # slice_to_head] + # new_y = y_1d[-append_length - 2:] # slice_to_head] profiler('sliced append path') + # ( + # x_1d, + # y_1d, + # connect, + # ) = append_tres profiler( f'diffed array input, append_length={append_length}' ) - # if should_ds: + # if should_ds and uppx > 1: # new_x, new_y = xy_downsample( # new_x, # new_y, @@ -1228,14 +914,15 @@ class Renderer(msgspec.Struct): # profiler(f'fast path downsample redraw={should_ds}') append_path = self.draw_path( - x=new_x, - y=new_y, + x=x_1d, + y=y_1d, connect=connect, path=fast_path, ) profiler('generated append qpath') if use_fpath: + # print(f'{self.flow.name}: FAST PATH') # an attempt at trying to make append-updates faster.. if fast_path is None: fast_path = append_path @@ -1245,7 +932,12 @@ class Renderer(msgspec.Struct): size = fast_path.capacity() profiler(f'connected fast path w size: {size}') - # print(f"append_path br: {append_path.boundingRect()}") + print( + f"append_path br: {append_path.boundingRect()}\n" + f"path size: {size}\n" + f"append_path len: {append_path.length()}\n" + f"fast_path len: {fast_path.length()}\n" + ) # graphics.path.moveTo(new_x[0], new_y[0]) # path.connectPath(append_path) @@ -1259,10 +951,4 @@ class Renderer(msgspec.Struct): self.path = path self.fast_path = fast_path - # TODO: eventually maybe we can implement some kind of - # transform on the ``QPainterPath`` that will more or less - # detect the diff in "elements" terms? - # update diff state since we've now rendered paths. - self.last_read = new_read - return self.path, array, reset diff --git a/piker/ui/_ohlc.py b/piker/ui/_ohlc.py index 048861d0..b2ff6e10 100644 --- a/piker/ui/_ohlc.py +++ b/piker/ui/_ohlc.py @@ -25,8 +25,15 @@ from typing import ( import numpy as np import pyqtgraph as pg -from PyQt5 import QtCore, QtGui, QtWidgets -from PyQt5.QtCore import QLineF, QPointF +from PyQt5 import ( + QtGui, + QtWidgets, +) +from PyQt5.QtCore import ( + QLineF, + QRectF, +) + from PyQt5.QtGui import QPainterPath from .._profile import pg_profile_enabled, ms_slower_then @@ -114,8 +121,13 @@ class BarItems(pg.GraphicsObject): # we expect the downsample curve report this. return 0 + # Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect def boundingRect(self): - # Qt docs: https://doc.qt.io/qt-5/qgraphicsitem.html#boundingRect + # profiler = Profiler( + # msg=f'BarItems.boundingRect(): `{self._name}`', + # disabled=not pg_profile_enabled(), + # ms_threshold=ms_slower_then, + # ) # TODO: Can we do rect caching to make this faster # like `pg.PlotCurveItem` does? In theory it's just @@ -135,32 +147,37 @@ class BarItems(pg.GraphicsObject): hb.topLeft(), hb.bottomRight(), ) + mn_y = hb_tl.y() + mx_y = hb_br.y() + most_left = hb_tl.x() + most_right = hb_br.x() + # profiler('calc path vertices') # need to include last bar height or BR will be off - mx_y = hb_br.y() - mn_y = hb_tl.y() - - last_lines = self._last_bar_lines + # OHLC line segments: [hl, o, c] + last_lines: tuple[QLineF] | None = self._last_bar_lines if last_lines: - body_line = self._last_bar_lines[0] - if body_line: - mx_y = max(mx_y, max(body_line.y1(), body_line.y2())) - mn_y = min(mn_y, min(body_line.y1(), body_line.y2())) + ( + hl, + o, + c, + ) = last_lines + most_right = c.x2() + 1 + ymx = ymn = c.y2() - return QtCore.QRectF( - - # top left - QPointF( - hb_tl.x(), - mn_y, - ), - - # bottom right - QPointF( - hb_br.x() + 1, - mx_y, - ) + if hl: + y1, y2 = hl.y1(), hl.y2() + ymn = min(y1, y2) + ymx = max(y1, y2) + mx_y = max(ymx, mx_y) + mn_y = min(ymn, mn_y) + # profiler('calc last bar vertices') + return QRectF( + most_left, + mn_y, + most_right - most_left + 1, + mx_y - mn_y, ) def paint( @@ -213,11 +230,15 @@ class BarItems(pg.GraphicsObject): # relevant fields ohlc = src_data[fields] - last_row = ohlc[-1:] + # last_row = ohlc[-1:] # individual values last_row = i, o, h, l, last = ohlc[-1] + # times = src_data['time'] + # if times[-1] - times[-2]: + # breakpoint() + # generate new lines objects for updatable "current bar" self._last_bar_lines = bar_from_ohlc_row(last_row) @@ -248,4 +269,5 @@ class BarItems(pg.GraphicsObject): # date / from some previous sample. It's weird though # because i've seen it do this to bars i - 3 back? + # return ohlc['time'], ohlc['close'] return ohlc['index'], ohlc['close'] diff --git a/piker/ui/_pathops.py b/piker/ui/_pathops.py index bbdde19a..807cde65 100644 --- a/piker/ui/_pathops.py +++ b/piker/ui/_pathops.py @@ -19,15 +19,16 @@ Super fast ``QPainterPath`` generation related operator routines. """ from __future__ import annotations from typing import ( - # Optional, + Optional, TYPE_CHECKING, ) +import msgspec import numpy as np from numpy.lib import recfunctions as rfn from numba import njit, float64, int64 # , optional # import pyqtgraph as pg -from PyQt5 import QtGui +# from PyQt5 import QtGui # from PyQt5.QtCore import QLineF, QPointF from ..data._sharedmem import ( @@ -39,7 +40,778 @@ from ._compression import ( ) if TYPE_CHECKING: - from ._flows import Renderer + from ._flows import ( + Renderer, + Flow, + ) + from .._profile import Profiler + + +def by_index_and_key( + renderer: Renderer, + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + +) -> tuple[ + np.ndarray, + np.ndarray, + np.ndarray, +]: + return array['index'], array[array_key], 'all' + + +class IncrementalFormatter(msgspec.Struct): + ''' + Incrementally updating, pre-path-graphics tracking, formatter. + + Allows tracking source data state in an updateable pre-graphics + ``np.ndarray`` format (in local process memory) as well as + incrementally rendering from that format **to** 1d x/y for path + generation using ``pg.functions.arrayToQPath()``. + + ''' + shm: ShmArray + flow: Flow + + # last read from shm (usually due to an update call) + _last_read: tuple[ + int, + int, + np.ndarray + + ] + + @property + def last_read(self) -> tuple | None: + return self._last_read + + def __repr__(self) -> str: + msg = ( + f'{type(self)}: ->\n\n' + f'fqsn={self.flow.name}\n' + f'shm_name={self.shm.token["shm_name"]}\n\n' + + f'last_vr={self._last_vr}\n' + f'last_ivdr={self._last_ivdr}\n\n' + + f'xy_nd_start={self.xy_nd_start}\n' + f'xy_nd_stop={self.xy_nd_stop}\n\n' + ) + + x_nd_len = 0 + y_nd_len = 0 + if self.x_nd is not None: + x_nd_len = len(self.x_nd) + y_nd_len = len(self.y_nd) + + msg += ( + f'x_nd_len={x_nd_len}\n' + f'y_nd_len={y_nd_len}\n' + ) + + return msg + + def diff( + self, + new_read: tuple[np.ndarray], + + ) -> tuple[ + np.ndarray, + np.ndarray, + ]: + ( + last_xfirst, + last_xlast, + last_array, + last_ivl, + last_ivr, + last_in_view, + ) = self.last_read + + # TODO: can the renderer just call ``Flow.read()`` directly? + # unpack latest source data read + ( + xfirst, + xlast, + array, + ivl, + ivr, + in_view, + ) = new_read + + # compute the length diffs between the first/last index entry in + # the input data and the last indexes we have on record from the + # last time we updated the curve index. + prepend_length = int(last_xfirst - xfirst) + append_length = int(xlast - last_xlast) + + # blah blah blah + # do diffing for prepend, append and last entry + return ( + slice(xfirst, last_xfirst), + prepend_length, + append_length, + slice(last_xlast, xlast), + ) + + # Incrementally updated xy ndarray formatted data, a pre-1d + # format which is updated and cached independently of the final + # pre-graphics-path 1d format. + x_nd: Optional[np.ndarray] = None + y_nd: Optional[np.ndarray] = None + + # indexes which slice into the above arrays (which are allocated + # based on source data shm input size) and allow retrieving + # incrementally updated data. + xy_nd_start: int = 0 + xy_nd_stop: int = 0 + + # TODO: eventually incrementally update 1d-pre-graphics path data? + # x_1d: Optional[np.ndarray] = None + # y_1d: Optional[np.ndarray] = None + + # incremental view-change state(s) tracking + _last_vr: tuple[float, float] | None = None + _last_ivdr: tuple[float, float] | None = None + + def _track_inview_range( + self, + view_range: tuple[int, int], + + ) -> bool: + # if a view range is passed, plan to draw the + # source ouput that's "in view" of the chart. + vl, vr = view_range + zoom_or_append = False + last_vr = self._last_vr + + # incremental in-view data update. + if last_vr: + lvl, lvr = last_vr # relative slice indices + + # TODO: detecting more specifically the interaction changes + # last_ivr = self._last_ivdr or (vl, vr) + # al, ar = last_ivr # abs slice indices + # left_change = abs(x_iv[0] - al) >= 1 + # right_change = abs(x_iv[-1] - ar) >= 1 + + # likely a zoom/pan view change or data append update + if ( + (vr - lvr) > 2 + or vl < lvl + + # append / prepend update + # we had an append update where the view range + # didn't change but the data-viewed (shifted) + # underneath, so we need to redraw. + # or left_change and right_change and last_vr == view_range + + # not (left_change and right_change) and ivr + # ( + # or abs(x_iv[ivr] - livr) > 1 + ): + zoom_or_append = True + + self._last_vr = view_range + + return zoom_or_append + + def format_to_1d( + self, + new_read: tuple, + array_key: str, + profiler: Profiler, + + slice_to_head: int = -1, + read_src_from_key: bool = True, + slice_to_inview: bool = True, + + ) -> tuple[ + np.ndarray, + np.ndarray, + ]: + shm = self.shm + + ( + _, + _, + array, + ivl, + ivr, + in_view, + + ) = new_read + + ( + pre_slice, + prepend_len, + append_len, + post_slice, + ) = self.diff(new_read) + + if self.y_nd is None: + # we first need to allocate xy data arrays + # from the source data. + self.x_nd, self.y_nd = self.allocate_xy_nd( + shm, + array_key, + ) + self.xy_nd_start = shm._first.value + self.xy_nd_stop = shm._last.value + profiler('allocated xy history') + + if prepend_len: + y_prepend = shm._array[pre_slice] + if read_src_from_key: + y_prepend = y_prepend[array_key] + + ( + new_y_nd, + y_nd_slc, + + ) = self.incr_update_xy_nd( + shm, + array_key, + + # this is the pre-sliced, "normally expected" + # new data that an updater would normally be + # expected to process, however in some cases (like + # step curves) the updater routine may want to do + # the source history-data reading itself, so we pass + # both here. + y_prepend, + pre_slice, + prepend_len, + + self.xy_nd_start, + self.xy_nd_stop, + is_append=False, + ) + + # y_nd_view = self.y_nd[y_nd_slc] + self.y_nd[y_nd_slc] = new_y_nd + # if read_src_from_key: + # y_nd_view[:][array_key] = new_y_nd + # else: + # y_nd_view[:] = new_y_nd + + self.xy_nd_start = shm._first.value + profiler('prepended xy history: {prepend_length}') + + if append_len: + y_append = shm._array[post_slice] + if read_src_from_key: + y_append = y_append[array_key] + + ( + new_y_nd, + y_nd_slc, + + ) = self.incr_update_xy_nd( + shm, + array_key, + + y_append, + post_slice, + append_len, + + self.xy_nd_start, + self.xy_nd_stop, + is_append=True, + ) + # self.y_nd[post_slice] = new_y_nd + # self.y_nd[xy_slice or post_slice] = xy_data + self.y_nd[y_nd_slc] = new_y_nd + # if read_src_from_key: + # y_nd_view[:][array_key] = new_y_nd + # else: + # y_nd_view[:] = new_y_nd + + self.xy_nd_stop = shm._last.value + profiler('appened xy history: {append_length}') + + view_changed: bool = False + view_range: tuple[int, int] = (ivl, ivr) + if slice_to_inview: + view_changed = self._track_inview_range(view_range) + array = in_view + profiler(f'{self.flow.name} view range slice {view_range}') + + hist = array[:slice_to_head] + + # xy-path data transform: convert source data to a format + # able to be passed to a `QPainterPath` rendering routine. + if not len(hist): + # XXX: this might be why the profiler only has exits? + return + + # TODO: hist here should be the pre-sliced + # x/y_data in the case where allocate_xy is + # defined? + x_1d, y_1d, connect = self.format_xy_nd_to_1d( + hist, + array_key, + view_range, + ) + + # app_tres = None + # if append_len: + # appended = array[-append_len-1:slice_to_head] + # app_tres = self.format_xy_nd_to_1d( + # appended, + # array_key, + # ( + # view_range[1] - append_len + slice_to_head, + # view_range[1] + # ), + # ) + # # assert (len(appended) - 1) == append_len + # # assert len(appended) == append_len + # print( + # f'{self.flow.name} APPEND LEN: {append_len}\n' + # f'{self.flow.name} APPENDED: {appended}\n' + # f'{self.flow.name} app_tres: {app_tres}\n' + # ) + + # update the last "in view data range" + if len(x_1d): + self._last_ivdr = x_1d[0], x_1d[slice_to_head] + + # TODO: eventually maybe we can implement some kind of + # transform on the ``QPainterPath`` that will more or less + # detect the diff in "elements" terms? + # update diff state since we've now rendered paths. + self._last_read = new_read + + profiler('.format_to_1d()') + return ( + x_1d, + y_1d, + connect, + prepend_len, + append_len, + view_changed, + # app_tres, + ) + + ############################### + # Sub-type override interface # + ############################### + + # optional pre-graphics xy formatted data which + # is incrementally updated in sync with the source data. + # XXX: was ``.allocate_xy()`` + def allocate_xy_nd( + self, + src_shm: ShmArray, + data_field: str, + index_field: str = 'index', + + ) -> tuple[ + np.ndarray, # x + np.nd.array # y + ]: + ''' + Convert the structured-array ``src_shm`` format to + a equivalently shaped (and field-less) ``np.ndarray``. + + Eg. a 4 field x N struct-array => (N, 4) + + ''' + y_nd = src_shm._array[data_field].copy() + x_nd = src_shm._array[index_field].copy() + return x_nd, y_nd + + # XXX: was ``.update_xy()`` + def incr_update_xy_nd( + self, + + src_shm: ShmArray, + data_field: str, + + new_from_src: np.ndarray, # portion of source that was updated + + read_slc: slice, + ln: int, # len of updated + + nd_start: int, + nd_stop: int, + + is_append: bool, + index_field: str = 'index', + + ) -> tuple[ + np.ndarray, + slice, + ]: + # write pushed data to flattened copy + new_y_nd = new_from_src + + # XXX + # TODO: this should be returned and written by caller! + # XXX + # generate same-valued-per-row x support based on y shape + if index_field != 'index': + self.x_nd[read_slc, :] = new_from_src[index_field] + + return new_y_nd, read_slc + + # XXX: was ``.format_xy()`` + def format_xy_nd_to_1d( + self, + + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + + ) -> tuple[ + np.ndarray, # 1d x + np.ndarray, # 1d y + np.ndarray | str, # connection array/style + ]: + ''' + Default xy-nd array to 1d pre-graphics-path render routine. + + Return single field column data verbatim + + ''' + return ( + array['index'], + array[array_key], + + # 1d connection array or style-key to + # ``pg.functions.arrayToQPath()`` + 'all', + ) + + +class OHLCBarsFmtr(IncrementalFormatter): + + fields: list[str] = ['open', 'high', 'low', 'close'] + + def allocate_xy_nd( + self, + + ohlc_shm: ShmArray, + data_field: str, + + ) -> tuple[ + np.ndarray, # x + np.nd.array # y + ]: + ''' + Convert an input struct-array holding OHLC samples into a pair of + flattened x, y arrays with the same size (datums wise) as the source + data. + + ''' + y_nd = ohlc_shm.ustruct(self.fields) + + # generate an flat-interpolated x-domain + x_nd = ( + np.broadcast_to( + ohlc_shm._array['index'][:, None], + ( + ohlc_shm._array.size, + # 4, # only ohlc + y_nd.shape[1], + ), + ) + np.array([-0.5, 0, 0, 0.5]) + ) + assert y_nd.any() + + # write pushed data to flattened copy + return ( + x_nd, + y_nd, + ) + + @staticmethod + @njit( + # TODO: for now need to construct this manually for readonly + # arrays, see https://github.com/numba/numba/issues/4511 + # ntypes.tuple((float64[:], float64[:], float64[:]))( + # numba_ohlc_dtype[::1], # contiguous + # int64, + # optional(float64), + # ), + nogil=True + ) + def path_arrays_from_ohlc( + data: np.ndarray, + start: int64, + bar_gap: float64 = 0.43, + + ) -> tuple[ + np.ndarray, + np.ndarray, + np.ndarray, + ]: + ''' + Generate an array of lines objects from input ohlc data. + + ''' + size = int(data.shape[0] * 6) + + x = np.zeros( + # data, + shape=size, + dtype=float64, + ) + y, c = x.copy(), x.copy() + + # TODO: report bug for assert @ + # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991 + for i, q in enumerate(data[start:], start): + + # TODO: ask numba why this doesn't work.. + # open, high, low, close, index = q[ + # ['open', 'high', 'low', 'close', 'index']] + + open = q['open'] + high = q['high'] + low = q['low'] + close = q['close'] + index = float64(q['index']) + + istart = i * 6 + istop = istart + 6 + + # x,y detail the 6 points which connect all vertexes of a ohlc bar + x[istart:istop] = ( + index - bar_gap, + index, + index, + index, + index, + index + bar_gap, + ) + y[istart:istop] = ( + open, + open, + low, + high, + close, + close, + ) + + # specifies that the first edge is never connected to the + # prior bars last edge thus providing a small "gap"/"space" + # between bars determined by ``bar_gap``. + c[istart:istop] = (1, 1, 1, 1, 1, 0) + + return x, y, c + + # TODO: can we drop this frame and just use the above? + def format_xy_nd_to_1d( + self, + + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + + start: int = 0, # XXX: do we need this? + # 0.5 is no overlap between arms, 1.0 is full overlap + w: float = 0.43, + + ) -> tuple[ + np.ndarray, + np.ndarray, + np.ndarray, + ]: + ''' + More or less direct proxy to the ``numba``-fied + ``path_arrays_from_ohlc()`` (above) but with closed in kwargs + for line spacing. + + ''' + x, y, c = self.path_arrays_from_ohlc( + array, + start, + bar_gap=w, + ) + return x, y, c + + def incr_update_xy_nd( + self, + + src_shm: ShmArray, + data_field: str, + + new_from_src: np.ndarray, # portion of source that was updated + + read_slc: slice, + ln: int, # len of updated + + nd_start: int, + nd_stop: int, + + is_append: bool, + index_field: str = 'index', + + ) -> tuple[ + np.ndarray, + slice, + ]: + # write newly pushed data to flattened copy + # a struct-arr is always passed in. + new_y_nd = rfn.structured_to_unstructured( + new_from_src[self.fields] + ) + + # XXX + # TODO: this should be returned and written by caller! + # XXX + # generate same-valued-per-row x support based on y shape + if index_field != 'index': + self.x_nd[read_slc, :] = new_from_src[index_field] + + return new_y_nd, read_slc + + +class OHLCBarsAsCurveFmtr(OHLCBarsFmtr): + + def format_xy_nd_to_1d( + self, + + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + + ) -> tuple[ + np.ndarray, + np.ndarray, + str, + ]: + # TODO: in the case of an existing ``.update_xy()`` + # should we be passing in array as an xy arrays tuple? + + # 2 more datum-indexes to capture zero at end + x_flat = self.x_nd[self.xy_nd_start:self.xy_nd_stop] + y_flat = self.y_nd[self.xy_nd_start:self.xy_nd_stop] + + # slice to view + ivl, ivr = vr + x_iv_flat = x_flat[ivl:ivr] + y_iv_flat = y_flat[ivl:ivr] + + # reshape to 1d for graphics rendering + y_iv = y_iv_flat.reshape(-1) + x_iv = x_iv_flat.reshape(-1) + + return x_iv, y_iv, 'all' + + +class StepCurveFmtr(IncrementalFormatter): + + def allocate_xy_nd( + self, + + shm: ShmArray, + data_field: str, + + index_field: str = 'index', + + ) -> tuple[ + np.ndarray, # x + np.nd.array # y + ]: + ''' + Convert an input 1d shm array to a "step array" format + for use by path graphics generation. + + ''' + i = shm._array['index'].copy() + out = shm._array[data_field].copy() + + x_out = np.broadcast_to( + i[:, None], + (i.size, 2), + ) + np.array([-0.5, 0.5]) + + y_out = np.empty((len(out), 2), dtype=out.dtype) + y_out[:] = out[:, np.newaxis] + + # start y at origin level + y_out[0, 0] = 0 + return x_out, y_out + + def incr_update_xy_nd( + self, + + src_shm: ShmArray, + array_key: str, + + src_update: np.ndarray, # portion of source that was updated + slc: slice, + ln: int, # len of updated + + first: int, + last: int, + + is_append: bool, + + ) -> tuple[ + np.ndarray, + slice, + ]: + # for a step curve we slice from one datum prior + # to the current "update slice" to get the previous + # "level". + if is_append: + start = max(last - 1, 0) + end = src_shm._last.value + new_y = src_shm._array[start:end][array_key] + slc = slice(start, end) + + else: + new_y = src_update + + return ( + np.broadcast_to( + new_y[:, None], (new_y.size, 2), + ), + slc, + ) + + def format_xy_nd_to_1d( + self, + + array: np.ndarray, + array_key: str, + vr: tuple[int, int], + + ) -> tuple[ + np.ndarray, + np.ndarray, + str, + ]: + lasts = array[['index', array_key]] + last = lasts[array_key][-1] + + # 2 more datum-indexes to capture zero at end + x_step = self.x_nd[self.xy_nd_start:self.xy_nd_stop+2] + y_step = self.y_nd[self.xy_nd_start:self.xy_nd_stop+2] + y_step[-1] = last + + # slice out in-view data + ivl, ivr = vr + ys_iv = y_step[ivl:ivr+1] + xs_iv = x_step[ivl:ivr+1] + + # flatten to 1d + y_iv = ys_iv.reshape(ys_iv.size) + x_iv = xs_iv.reshape(xs_iv.size) + + # print( + # f'ys_iv : {ys_iv[-s:]}\n' + # f'y_iv: {y_iv[-s:]}\n' + # f'xs_iv: {xs_iv[-s:]}\n' + # f'x_iv: {x_iv[-s:]}\n' + # ) + + return x_iv, y_iv, 'all' def xy_downsample( @@ -55,7 +827,11 @@ def xy_downsample( float, float, ]: + ''' + Downsample 1D (flat ``numpy.ndarray``) arrays using M4 given an input + ``uppx`` (units-per-pixel) and add space between discreet datums. + ''' # downsample whenever more then 1 pixels per datum can be shown. # always refresh data bounds until we get diffing # working properly, see above.. @@ -73,169 +849,3 @@ def xy_downsample( y = y.flatten() return x, y, ymn, ymx - - -@njit( - # TODO: for now need to construct this manually for readonly arrays, see - # https://github.com/numba/numba/issues/4511 - # ntypes.tuple((float64[:], float64[:], float64[:]))( - # numba_ohlc_dtype[::1], # contiguous - # int64, - # optional(float64), - # ), - nogil=True -) -def path_arrays_from_ohlc( - data: np.ndarray, - start: int64, - bar_gap: float64 = 0.43, - -) -> np.ndarray: - ''' - Generate an array of lines objects from input ohlc data. - - ''' - size = int(data.shape[0] * 6) - - x = np.zeros( - # data, - shape=size, - dtype=float64, - ) - y, c = x.copy(), x.copy() - - # TODO: report bug for assert @ - # /home/goodboy/repos/piker/env/lib/python3.8/site-packages/numba/core/typing/builtins.py:991 - for i, q in enumerate(data[start:], start): - - # TODO: ask numba why this doesn't work.. - # open, high, low, close, index = q[ - # ['open', 'high', 'low', 'close', 'index']] - - open = q['open'] - high = q['high'] - low = q['low'] - close = q['close'] - index = float64(q['index']) - - istart = i * 6 - istop = istart + 6 - - # x,y detail the 6 points which connect all vertexes of a ohlc bar - x[istart:istop] = ( - index - bar_gap, - index, - index, - index, - index, - index + bar_gap, - ) - y[istart:istop] = ( - open, - open, - low, - high, - close, - close, - ) - - # specifies that the first edge is never connected to the - # prior bars last edge thus providing a small "gap"/"space" - # between bars determined by ``bar_gap``. - c[istart:istop] = (1, 1, 1, 1, 1, 0) - - return x, y, c - - -def gen_ohlc_qpath( - r: Renderer, - data: np.ndarray, - array_key: str, # we ignore this - vr: tuple[int, int], - - start: int = 0, # XXX: do we need this? - # 0.5 is no overlap between arms, 1.0 is full overlap - w: float = 0.43, - -) -> QtGui.QPainterPath: - ''' - More or less direct proxy to ``path_arrays_from_ohlc()`` - but with closed in kwargs for line spacing. - - ''' - x, y, c = path_arrays_from_ohlc( - data, - start, - bar_gap=w, - ) - return x, y, c - - -def ohlc_to_line( - ohlc_shm: ShmArray, - data_field: str, - fields: list[str] = ['open', 'high', 'low', 'close'] - -) -> tuple[ - np.ndarray, - np.ndarray, -]: - ''' - Convert an input struct-array holding OHLC samples into a pair of - flattened x, y arrays with the same size (datums wise) as the source - data. - - ''' - y_out = ohlc_shm.ustruct(fields) - first = ohlc_shm._first.value - last = ohlc_shm._last.value - - # write pushed data to flattened copy - y_out[first:last] = rfn.structured_to_unstructured( - ohlc_shm.array[fields] - ) - - # generate an flat-interpolated x-domain - x_out = ( - np.broadcast_to( - ohlc_shm._array['index'][:, None], - ( - ohlc_shm._array.size, - # 4, # only ohlc - y_out.shape[1], - ), - ) + np.array([-0.5, 0, 0, 0.5]) - ) - assert y_out.any() - - return ( - x_out, - y_out, - ) - - -def to_step_format( - shm: ShmArray, - data_field: str, - index_field: str = 'index', - -) -> tuple[int, np.ndarray, np.ndarray]: - ''' - Convert an input 1d shm array to a "step array" format - for use by path graphics generation. - - ''' - i = shm._array['index'].copy() - out = shm._array[data_field].copy() - - x_out = np.broadcast_to( - i[:, None], - (i.size, 2), - ) + np.array([-0.5, 0.5]) - - y_out = np.empty((len(out), 2), dtype=out.dtype) - y_out[:] = out[:, np.newaxis] - - # start y at origin level - y_out[0, 0] = 0 - return x_out, y_out