# piker: trading gear for hackers # Copyright (C) Tyler Goodlet (in stewardship for piker0) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . """ Annotations for ur faces. """ from typing import Callable from pyqtgraph import ( Point, functions as fn, Color, GraphicsObject, ) from pyqtgraph.Qt import internals import numpy as np import pyqtgraph as pg from piker.ui.qt import ( QtCore, QtGui, QtWidgets, QPointF, QRectF, QGraphicsPathItem, ) from piker.ui._style import hcolor from piker.log import get_logger log = get_logger(__name__) def mk_marker_path( style: str, ) -> QGraphicsPathItem: ''' Add a marker to be displayed on the line wrapped in a ``QGraphicsPathItem`` ready to be placed using scene coordinates (not view). **Arguments** style String indicating the style of marker to add: ``'<|'``, ``'|>'``, ``'>|'``, ``'|<'``, ``'<|>'``, ``'>|<'``, ``'^'``, ``'v'``, ``'o'`` This code is taken nearly verbatim from the `InfiniteLine.addMarker()` method but does not attempt do be aware of low(er) level graphics controls and expects for the output polygon to be applied to a ``QGraphicsPathItem``. ''' path = QtGui.QPainterPath() if style == 'o': path.addEllipse(QtCore.QRectF(-0.5, -0.5, 1, 1)) # arrow pointing away-from the top of line if '<|' in style: p = QtGui.QPolygonF([Point(0.5, 0), Point(0, -0.5), Point(-0.5, 0)]) path.addPolygon(p) path.closeSubpath() # arrow pointing away-from the bottom of line if '|>' in style: p = QtGui.QPolygonF([Point(0.5, 0), Point(0, 0.5), Point(-0.5, 0)]) path.addPolygon(p) path.closeSubpath() # arrow pointing in-to the top of line if '>|' in style: p = QtGui.QPolygonF([Point(0.5, -0.5), Point(0, 0), Point(-0.5, -0.5)]) path.addPolygon(p) path.closeSubpath() # arrow pointing in-to the bottom of line if '|<' in style: p = QtGui.QPolygonF([Point(0.5, 0.5), Point(0, 0), Point(-0.5, 0.5)]) path.addPolygon(p) path.closeSubpath() if '^' in style: p = QtGui.QPolygonF([Point(0, -0.5), Point(0.5, 0), Point(0, 0.5)]) path.addPolygon(p) path.closeSubpath() if 'v' in style: p = QtGui.QPolygonF([Point(0, -0.5), Point(-0.5, 0), Point(0, 0.5)]) path.addPolygon(p) path.closeSubpath() # self._maxMarkerSize = max([m[2] / 2. for m in self.markers]) return path class LevelMarker(QGraphicsPathItem): ''' An arrow marker path graphic which redraws itself to the specified view coordinate level on each paint cycle. ''' def __init__( self, chart: 'ChartPlotWidget', # noqa style: str, get_level: Callable[..., float], size: float = 20, keep_in_view: bool = True, on_paint: Callable | None = None, ) -> None: # get polygon and scale super().__init__() # self.setScale(size, size) self.setScale(size) # interally generates path self._style = None self.style = style self.chart = chart self.get_level = get_level self._on_paint = on_paint self.scene_x = lambda: chart.marker_right_points()[1] self.level: float = 0 self.keep_in_view = keep_in_view @property def style(self) -> str: return self._style @style.setter def style(self, value: str) -> None: if self._style != value: polygon = mk_marker_path(value) self.setPath(polygon) self._style = value def path_br(self) -> QRectF: '''Return the bounding rect for the opaque path part of this item. ''' return self.mapToScene( self.path() ).boundingRect() def delete(self) -> None: self.scene().removeItem(self) @property def h(self) -> float: return self.path_br().height() @property def w(self) -> float: return self.path_br().width() def position_in_view(self) -> None: ''' Show a pp off-screen indicator for a level label. This is like in fps games where you have a gps "nav" indicator but your teammate is outside the range of view, except in 2D, on the y-dimension. ''' level = self.get_level() view = self.chart.getViewBox() vr = view.state['viewRange'] ymn, ymx = vr[1] # _, marker_right, _ = line._chart.marker_right_points() x = self.scene_x() if self.style == '>|': # short style, points "down-to" line top_offset = self.h bottom_offset = 0 else: top_offset = 0 bottom_offset = self.h if level > ymx: # pin to top of view self.setPos( QPointF( x, top_offset + self.h/3, ) ) elif level < ymn: # pin to bottom of view self.setPos( QPointF( x, view.height() - (bottom_offset + self.h/3), ) ) else: # pp line is viewable so show marker normally self.setPos( x, self.chart.view.mapFromView( QPointF(0, self.get_level()) ).y() ) def paint( self, p: QtGui.QPainter, opt: QtWidgets.QStyleOptionGraphicsItem, w: QtWidgets.QWidget ) -> None: ''' Core paint which we override to always update our marker position in scene coordinates from a view cooridnate "level". ''' if self.keep_in_view: self.position_in_view() super().paint(p, opt, w) if self._on_paint: self._on_paint(self) def qgo_draw_markers( markers: list, color: Color, p: QtGui.QPainter, left: float, right: float, right_offset: float, ) -> float: ''' Paint markers in ``pg.GraphicsItem`` style by first removing the view transform for the painter, drawing the markers in scene coords, then restoring the view coords. ''' # paint markers in native coordinate system orig_tr = p.transform() start = orig_tr.map(Point(left, 0)) end = orig_tr.map(Point(right, 0)) up = orig_tr.map(Point(left, 1)) dif = end - start # length = Point(dif).length() angle = np.arctan2(dif.y(), dif.x()) * 180 / np.pi p.resetTransform() p.translate(start) p.rotate(angle) up = up - start det = up.x() * dif.y() - dif.x() * up.y() p.scale(1, 1 if det > 0 else -1) p.setBrush(fn.mkBrush(color)) # p.setBrush(fn.mkBrush(self.currentPen.color())) tr = p.transform() sizes = [] for path, pos, size in markers: p.setTransform(tr) # XXX: we drop the "scale / %" placement # x = length * pos x = right_offset p.translate(x, 0) p.scale(size, size) p.drawPath(path) sizes.append(size) p.setTransform(orig_tr) return max(sizes) class GapAnnotations(GraphicsObject): ''' Batch-rendered gap annotations using Qt's efficient drawing APIs. Instead of creating individual `QGraphicsItem` instances per gap (which is very slow for 1000+ gaps), this class stores all gap rectangles and arrows in numpy-backed arrays and renders them in single batch paint calls. Performance: ~1000x faster than individual items for large gap counts. Based on patterns from: - `pyqtgraph.BarGraphItem` (batch rect rendering) - `pyqtgraph.ScatterPlotItem` (fragment rendering) - `piker.ui._curve.FlowGraphic` (single path pattern) ''' def __init__( self, gap_specs: list[dict], array: np.ndarray|None = None, color: str = 'dad_blue', alpha: int = 169, arrow_size: float = 10.0, fqme: str|None = None, timeframe: float|None = None, ) -> None: ''' gap_specs: list of dicts with keys: - start_pos: (x, y) tuple for left corner of rect - end_pos: (x, y) tuple for right corner of rect - arrow_x: x position for arrow - arrow_y: y position for arrow - pointing: 'up' or 'down' for arrow direction - start_time: (optional) timestamp for repositioning - end_time: (optional) timestamp for repositioning array: optional OHLC numpy array for repositioning on backfill updates (when abs-index changes) fqme: symbol name for these gaps (for logging/debugging) timeframe: period in seconds that these gaps were detected on (used to skip reposition when called with wrong timeframe's array) ''' super().__init__() self._gap_specs = gap_specs self._array = array self._fqme = fqme self._timeframe = timeframe n_gaps = len(gap_specs) # shared pen/brush matching original SelectRect/ArrowItem style base_color = pg.mkColor(hcolor(color)) # rect pen: base color, fully opaque for outline self._rect_pen = pg.mkPen(base_color, width=1) # rect brush: base color with alpha=66 (SelectRect default) rect_fill = pg.mkColor(hcolor(color)) rect_fill.setAlpha(66) self._rect_brush = pg.functions.mkBrush(rect_fill) # arrow pen: same as rects self._arrow_pen = pg.mkPen(base_color, width=1) # arrow brush: base color with user-specified alpha (default 169) arrow_fill = pg.mkColor(hcolor(color)) arrow_fill.setAlpha(alpha) self._arrow_brush = pg.functions.mkBrush(arrow_fill) # allocate rect array using Qt's efficient storage self._rectarray = internals.PrimitiveArray( QtCore.QRectF, 4, ) self._rectarray.resize(n_gaps) rect_memory = self._rectarray.ndarray() # fill rect array from gap specs for ( i, spec, ) in enumerate(gap_specs): ( start_x, start_y, ) = spec['start_pos'] ( end_x, end_y, ) = spec['end_pos'] # QRectF expects (x, y, width, height) rect_memory[i, 0] = start_x rect_memory[i, 1] = min(start_y, end_y) rect_memory[i, 2] = end_x - start_x rect_memory[i, 3] = abs(end_y - start_y) # build single QPainterPath for all arrows self._arrow_path = QtGui.QPainterPath() self._arrow_size = arrow_size for spec in gap_specs: arrow_x = spec['arrow_x'] arrow_y = spec['arrow_y'] pointing = spec['pointing'] # create arrow polygon if pointing == 'down': # arrow points downward arrow_poly = QtGui.QPolygonF([ QPointF(arrow_x, arrow_y), # tip QPointF( arrow_x - arrow_size/2, arrow_y - arrow_size, ), # left QPointF( arrow_x + arrow_size/2, arrow_y - arrow_size, ), # right ]) else: # up # arrow points upward arrow_poly = QtGui.QPolygonF([ QPointF(arrow_x, arrow_y), # tip QPointF( arrow_x - arrow_size/2, arrow_y + arrow_size, ), # left QPointF( arrow_x + arrow_size/2, arrow_y + arrow_size, ), # right ]) self._arrow_path.addPolygon(arrow_poly) self._arrow_path.closeSubpath() # cache bounding rect self._br: QRectF|None = None def boundingRect(self) -> QRectF: ''' Compute bounding rect from rect array and arrow path. ''' if self._br is not None: return self._br # get rect bounds rect_memory = self._rectarray.ndarray() if len(rect_memory) == 0: self._br = QRectF() return self._br x_min = rect_memory[:, 0].min() y_min = rect_memory[:, 1].min() x_max = (rect_memory[:, 0] + rect_memory[:, 2]).max() y_max = (rect_memory[:, 1] + rect_memory[:, 3]).max() # expand for arrow path arrow_br = self._arrow_path.boundingRect() x_min = min(x_min, arrow_br.left()) y_min = min(y_min, arrow_br.top()) x_max = max(x_max, arrow_br.right()) y_max = max(y_max, arrow_br.bottom()) self._br = QRectF( x_min, y_min, x_max - x_min, y_max - y_min, ) return self._br def paint( self, p: QtGui.QPainter, opt: QtWidgets.QStyleOptionGraphicsItem, w: QtWidgets.QWidget, ) -> None: ''' Batch render all rects and arrows in minimal paint calls. ''' # draw all rects in single batch call (data coordinates) p.setPen(self._rect_pen) p.setBrush(self._rect_brush) drawargs = self._rectarray.drawargs() p.drawRects(*drawargs) # draw arrows in scene/pixel coordinates so they maintain # size regardless of zoom level orig_tr = p.transform() p.resetTransform() # rebuild arrow path in scene coordinates arrow_path_scene = QtGui.QPainterPath() # arrow geometry matching pg.ArrowItem defaults # headLen=10, headWidth=2.222 # headWidth is the half-width (center to edge distance) head_len = self._arrow_size head_width = head_len * 0.2222 # 2.222 at size=10 for spec in self._gap_specs: if 'arrow_x' not in spec: continue arrow_x = spec['arrow_x'] arrow_y = spec['arrow_y'] pointing = spec['pointing'] # transform data coords to scene coords scene_pt = orig_tr.map(QPointF(arrow_x, arrow_y)) sx = scene_pt.x() sy = scene_pt.y() # create arrow polygon in scene/pixel coords # matching pg.ArrowItem geometry but rotated for up/down if pointing == 'down': # tip points downward (negative y direction) arrow_poly = QtGui.QPolygonF([ QPointF(sx, sy), # tip QPointF( sx - head_width, sy - head_len, ), # left base QPointF( sx + head_width, sy - head_len, ), # right base ]) else: # up # tip points upward (positive y direction) arrow_poly = QtGui.QPolygonF([ QPointF(sx, sy), # tip QPointF( sx - head_width, sy + head_len, ), # left base QPointF( sx + head_width, sy + head_len, ), # right base ]) arrow_path_scene.addPolygon(arrow_poly) arrow_path_scene.closeSubpath() p.setPen(self._arrow_pen) p.setBrush(self._arrow_brush) p.drawPath(arrow_path_scene) # restore original transform p.setTransform(orig_tr) def reposition( self, array: np.ndarray|None = None, fqme: str|None = None, timeframe: float|None = None, ) -> None: ''' Reposition all annotations based on timestamps. Used when viz is updated (eg during backfill) and abs-index range changes - we need to lookup new indices from timestamps. ''' # skip reposition if timeframe doesn't match # (e.g., 1s gaps being repositioned with 60s array) if ( timeframe is not None and self._timeframe is not None and timeframe != self._timeframe ): log.debug( f'Skipping reposition for {self._fqme} gaps:\n' f' gap timeframe: {self._timeframe}s\n' f' array timeframe: {timeframe}s\n' ) return if array is None: array = self._array if array is None: log.warning( 'GapAnnotations.reposition() called but no array ' 'provided' ) return # collect all unique timestamps we need to lookup timestamps: set[float] = set() for spec in self._gap_specs: if spec.get('start_time') is not None: timestamps.add(spec['start_time']) if spec.get('end_time') is not None: timestamps.add(spec['end_time']) if spec.get('time') is not None: timestamps.add(spec['time']) # vectorized timestamp -> row lookup using binary search time_to_row: dict[float, dict] = {} if timestamps: import numpy as np time_arr = array['time'] ts_array = np.array(list(timestamps)) search_indices = np.searchsorted( time_arr, ts_array, ) # vectorized bounds check and exact match verification valid_mask = ( (search_indices < len(array)) & (time_arr[search_indices] == ts_array) ) valid_indices = search_indices[valid_mask] valid_timestamps = ts_array[valid_mask] matched_rows = array[valid_indices] time_to_row = { float(ts): { 'index': float(row['index']), 'open': float(row['open']), 'close': float(row['close']), } for ts, row in zip( valid_timestamps, matched_rows, ) } # rebuild rect array from gap specs with new indices rect_memory = self._rectarray.ndarray() for ( i, spec, ) in enumerate(self._gap_specs): start_time = spec.get('start_time') end_time = spec.get('end_time') if ( start_time is None or end_time is None ): continue start_row = time_to_row.get(start_time) end_row = time_to_row.get(end_time) if ( start_row is None or end_row is None ): log.warning( f'Timestamp lookup failed for gap[{i}] during ' f'reposition:\n' f' fqme: {fqme}\n' f' timeframe: {timeframe}s\n' f' start_time: {start_time}\n' f' end_time: {end_time}\n' f' array time range: ' f'{array["time"][0]} -> {array["time"][-1]}\n' ) continue start_idx = start_row['index'] end_idx = end_row['index'] start_close = start_row['close'] end_open = end_row['open'] from_idx: float = 0.16 - 0.06 start_x = start_idx + 1 - from_idx end_x = end_idx + from_idx # update rect in array rect_memory[i, 0] = start_x rect_memory[i, 1] = min(start_close, end_open) rect_memory[i, 2] = end_x - start_x rect_memory[i, 3] = abs(end_open - start_close) # rebuild arrow path with new indices self._arrow_path.clear() for spec in self._gap_specs: time_val = spec.get('time') if time_val is None: continue arrow_row = time_to_row.get(time_val) if arrow_row is None: continue arrow_x = arrow_row['index'] arrow_y = arrow_row['close'] pointing = spec['pointing'] # create arrow polygon if pointing == 'down': arrow_poly = QtGui.QPolygonF([ QPointF(arrow_x, arrow_y), QPointF( arrow_x - self._arrow_size/2, arrow_y - self._arrow_size, ), QPointF( arrow_x + self._arrow_size/2, arrow_y - self._arrow_size, ), ]) else: # up arrow_poly = QtGui.QPolygonF([ QPointF(arrow_x, arrow_y), QPointF( arrow_x - self._arrow_size/2, arrow_y + self._arrow_size, ), QPointF( arrow_x + self._arrow_size/2, arrow_y + self._arrow_size, ), ]) self._arrow_path.addPolygon(arrow_poly) self._arrow_path.closeSubpath() # invalidate bounding rect cache self._br = None self.prepareGeometryChange() self.update()