# piker: trading gear for hackers # Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU Affero General Public License for more details. # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . ''' Remote control tasks for sending annotations (and maybe more cmds) to a chart from some other actor. ''' from __future__ import annotations from contextlib import ( asynccontextmanager as acm, AsyncExitStack, ) from functools import partial from pprint import pformat from typing import ( AsyncContextManager, Literal, ) from uuid import uuid4 import pyqtgraph as pg import tractor import trio from tractor import trionics from tractor import ( Portal, Context, MsgStream, ) from piker.log import get_logger from piker.types import Struct from piker.service import find_service from piker.brokers import SymbolNotFound from piker.ui.qt import ( QGraphicsItem, ) from PyQt6.QtGui import QFont from ._display import DisplayState from ._interaction import ChartView from ._editors import ( SelectRect, ArrowEditor, ) from ._chart import ChartPlotWidget from ._dataviz import Viz from ._style import hcolor log = get_logger(__name__) # NOTE: this is UPDATED by the `._display.graphics_update_loop()` # once all chart widgets / Viz per flume have been initialized # allowing for remote annotation (control) of any chart-actor's mkt # feed by fqme lookup Bo _dss: dict[str, DisplayState] = {} # stash each and every client connection so that they can all # be cancelled on shutdown/error. # TODO: make `tractor.Context` hashable via is `.cid: str`? # _ctxs: set[Context] = set() # TODO: use type statements from 3.12+ IpcCtxTable = dict[ str, # each `Context.cid` tuple[ Context, # handle for ctx-cancellation set[int] # set of annotation (instance) ids ] ] _ctxs: IpcCtxTable = {} # XXX: global map of all uniquely created annotation-graphics so # that they can be mutated (eventually) by a client. # NOTE: this map is only populated on the `chart` actor side (aka # the "annotations server" which actually renders to a Qt canvas). # type AnnotsTable = dict[int, QGraphicsItem] AnnotsTable = dict[int, QGraphicsItem] EditorsTable = dict[int, ArrowEditor] _annots: AnnotsTable = {} _editors: EditorsTable = {} def rm_annot( annot: ArrowEditor|SelectRect|pg.TextItem ) -> bool: global _editors match annot: case pg.ArrowItem(): editor = _editors[annot._uid] editor.remove(annot) # ^TODO? only remove each arrow or all? # if editor._arrows: # editor.remove_all() # else: # log.warning( # f'Annot already removed!\n' # f'{annot!r}\n' # ) return True case SelectRect(): annot.delete() return True case pg.TextItem(): scene = annot.scene() if scene: scene.removeItem(annot) return True return False async def serve_rc_annots( ipc_key: str, annot_req_stream: MsgStream, dss: dict[str, DisplayState], ctxs: IpcCtxTable, annots: AnnotsTable, ) -> None: ''' A small viz(ualization) server for remote ctl of chart annotations. ''' global _editors async for msg in annot_req_stream: match msg: case { 'cmd': 'SelectRect', 'fqme': fqme, 'timeframe': timeframe, 'meth': str(meth), 'kwargs': dict(kwargs), }: ds: DisplayState = _dss[fqme] try: chart: ChartPlotWidget = { 60: ds.hist_chart, 1: ds.chart, }[timeframe] except KeyError: msg: str = ( f'No chart for timeframe={timeframe}s, ' f'skipping rect annotation' ) log.exeception(msg) await annot_req_stream.send({'error': msg}) continue cv: ChartView = chart.cv # NEW: if timestamps provided, lookup current indices # from shm to ensure alignment with current buffer # state start_time = kwargs.pop('start_time', None) end_time = kwargs.pop('end_time', None) if ( start_time is not None and end_time is not None ): viz: Viz = chart.get_viz(fqme) shm = viz.shm arr = shm.array # lookup start index start_matches = arr[arr['time'] == start_time] if len(start_matches) == 0: msg: str = ( f'No shm entry for start_time={start_time}, ' f'skipping rect' ) log.error(msg) await annot_req_stream.send({'error': msg}) continue # lookup end index end_matches = arr[arr['time'] == end_time] if len(end_matches) == 0: msg: str = ( f'No shm entry for end_time={end_time}, ' f'skipping rect' ) log.error(msg) await annot_req_stream.send({'error': msg}) continue # get close price from start bar, open from end # bar start_idx = float(start_matches[0]['index']) end_idx = float(end_matches[0]['index']) start_close = float(start_matches[0]['close']) end_open = float(end_matches[0]['open']) # reconstruct start_pos and end_pos with # looked-up indices from_idx: float = 0.16 - 0.06 # BGM offset kwargs['start_pos'] = ( start_idx + 1 - from_idx, start_close, ) kwargs['end_pos'] = ( end_idx + from_idx, end_open, ) # annot type lookup from cmd rect = SelectRect( viewbox=cv, # TODO: make this more dynamic? # -[ ] pull from conf.toml? # -[ ] add `.set_color()` method to type? # -[ ] make a green/red based on direction # instead of default static color? color=kwargs.pop('color', None), ) # XXX NOTE: this is REQUIRED to set the rect # resize callback! rect.chart: ChartPlotWidget = chart # delegate generically to the requested method getattr(rect, meth)(**kwargs) rect.show() # XXX: store absolute coords for repositioning # during viz redraws (eg backfill updates) rect._meth = meth rect._kwargs = kwargs aid: int = id(rect) annots[aid] = rect aids: set[int] = ctxs[ipc_key][1] aids.add(aid) await annot_req_stream.send(aid) case { 'cmd': 'ArrowEditor', 'fqme': fqme, 'timeframe': timeframe, 'meth': 'add'|'remove' as meth, 'kwargs': { 'x': float(x), 'y': float(y), 'pointing': pointing, 'color': color, 'aid': str()|None as aid, 'alpha': int(alpha), 'headLen': int()|float()|None as headLen, 'headWidth': int()|float()|None as headWidth, 'tailLen': int()|float()|None as tailLen, 'tailWidth': int()|float()|None as tailWidth, 'pxMode': bool(pxMode), 'time': int()|float()|None as timestamp, }, # ?TODO? split based on method fn-sigs? # 'pointing', }: ds: DisplayState = _dss[fqme] try: chart: ChartPlotWidget = { 60: ds.hist_chart, 1: ds.chart, }[timeframe] except KeyError: log.warning( f'No chart for timeframe={timeframe}s, ' f'skipping arrow annotation' ) # return -1 to indicate failure await annot_req_stream.send(-1) continue cv: ChartView = chart.cv godw = chart.linked.godwidget # NEW: if timestamp provided, lookup current index # from shm to ensure alignment with current buffer # state if timestamp is not None: viz: Viz = chart.get_viz(fqme) shm = viz.shm arr = shm.array # find index where time matches timestamp matches = arr[arr['time'] == timestamp] if len(matches) == 0: log.error( f'No shm entry for timestamp={timestamp}, ' f'skipping arrow annotation' ) await annot_req_stream.send(-1) continue # use the matched row's index as x x = float(matches[0]['index']) arrows = ArrowEditor(godw=godw) # `.add/.remove()` API if meth != 'add': # await tractor.pause() raise ValueError( f'Invalid arrow-edit request ?\n' f'{msg!r}\n' ) aid: str = str(uuid4()) arrow: pg.ArrowItem = arrows.add( plot=chart.plotItem, uid=aid, x=x, y=y, pointing=pointing, color=color, alpha=alpha, headLen=headLen, headWidth=headWidth, tailLen=tailLen, tailWidth=tailWidth, pxMode=pxMode, ) # XXX: store absolute coords for repositioning # during viz redraws (eg backfill updates) arrow._abs_x = x arrow._abs_y = y annots[aid] = arrow _editors[aid] = arrows aids: set[int] = ctxs[ipc_key][1] aids.add(aid) await annot_req_stream.send(aid) case { 'cmd': 'TextItem', 'fqme': fqme, 'timeframe': timeframe, 'kwargs': { 'text': str(text), 'x': int()|float() as x, 'y': int()|float() as y, 'color': color, 'anchor': list(anchor), 'font_size': int()|None as font_size, 'time': int()|float()|None as timestamp, }, }: ds: DisplayState = _dss[fqme] try: chart: ChartPlotWidget = { 60: ds.hist_chart, 1: ds.chart, }[timeframe] except KeyError: log.warning( f'No chart for timeframe={timeframe}s, ' f'skipping text annotation' ) await annot_req_stream.send(-1) continue # NEW: if timestamp provided, lookup current index # from shm to ensure alignment with current buffer # state if timestamp is not None: viz: Viz = chart.get_viz(fqme) shm = viz.shm arr = shm.array # find index where time matches timestamp matches = arr[arr['time'] == timestamp] if len(matches) == 0: log.error( f'No shm entry for timestamp={timestamp}, ' f'skipping text annotation' ) await annot_req_stream.send(-1) continue # use the matched row's index as x, +1 for text # offset x = float(matches[0]['index']) + 1 # convert named color to hex color_hex: str = hcolor(color) # create text item text_item: pg.TextItem = pg.TextItem( text=text, color=color_hex, anchor=anchor, # ?TODO, pin to github:main for this? # legacy, can have scaling ish? # ensureInBounds=True, ) # apply font size (default to DpiAwareFont if not # provided) if font_size is None: from ._style import get_fonts font, font_small = get_fonts() font_size = font_small.px_size - 1 qfont: QFont = text_item.textItem.font() qfont.setPixelSize(font_size) text_item.setFont(qfont) text_item.setPos(x, y) chart.plotItem.addItem(text_item) # XXX: store absolute coords for repositioning # during viz redraws (eg backfill updates) text_item._abs_x = x text_item._abs_y = y aid: str = str(uuid4()) annots[aid] = text_item aids: set[int] = ctxs[ipc_key][1] aids.add(aid) await annot_req_stream.send(aid) case { 'cmd': 'remove', 'aid': int(aid)|str(aid), }: # NOTE: this is normally entered on # a client's annotation de-alloc normally # prior to detach or modify. annot: QGraphicsItem = annots[aid] assert rm_annot(annot) # respond to client indicating annot # was indeed deleted. await annot_req_stream.send(aid) case { 'cmd': 'redraw', 'fqme': fqme, 'timeframe': timeframe, # TODO: maybe more fields? # 'render': int(aid), # 'viz_name': str(viz_name), }: # NOTE: old match from the 60s display loop task # | { # 'backfilling': (str(viz_name), timeframe), # }: ds: DisplayState = _dss[fqme] viz: Viz = { 60: ds.hist_viz, 1: ds.viz, }[timeframe] log.warning( f'Forcing VIZ REDRAW:\n' f'fqme: {fqme}\n' f'timeframe: {timeframe}\n' ) viz.reset_graphics() # XXX: reposition all annotations to ensure they # stay aligned with viz data after reset (eg during # backfill when abs-index range changes) n_repositioned: int = 0 for aid, annot in annots.items(): # arrows and text items use abs x,y coords if ( hasattr(annot, '_abs_x') and hasattr(annot, '_abs_y') ): annot.setPos( annot._abs_x, annot._abs_y, ) n_repositioned += 1 # rects use method + kwargs elif ( hasattr(annot, '_meth') and hasattr(annot, '_kwargs') ): getattr(annot, annot._meth)(**annot._kwargs) n_repositioned += 1 if n_repositioned: log.info( f'Repositioned {n_repositioned} annotation(s) ' f'after viz redraw' ) case _: log.error( 'Unknown remote annotation cmd:\n' f'{pformat(msg)}' ) @tractor.context async def remote_annotate( ctx: Context, ) -> None: global _dss, _ctxs if not _dss: raise RuntimeError( 'Race condition on chart-init state ??\n' 'Anoter actor is trying to annoate this chart ' 'before it has fully spawned.\n' ) assert _dss _ctxs[ctx.cid] = (ctx, set()) # send back full fqme symbology to caller await ctx.started(list(_dss)) # open annot request handler stream async with ctx.open_stream() as annot_req_stream: try: await serve_rc_annots( ipc_key=ctx.cid, annot_req_stream=annot_req_stream, dss=_dss, ctxs=_ctxs, annots=_annots, ) finally: # ensure all annots for this connection are deleted # on any final teardown (_ctx, aids) = _ctxs[ctx.cid] assert _ctx is ctx for aid in aids: annot: QGraphicsItem = _annots[aid] assert rm_annot(annot) class AnnotCtl(Struct): ''' A control for remote "data annotations". You know those "squares they always show in machine vision UIs.." this API allows you to remotely control stuff like that in some other graphics actor. ''' ctx2fqmes: dict[str, str] fqme2ipc: dict[str, MsgStream] _annot_stack: AsyncExitStack # runtime-populated mapping of all annotation # ids to their equivalent IPC msg-streams. _ipcs: dict[int, MsgStream] = {} def _get_ipc( self, fqme: str, ) -> MsgStream: ipc: MsgStream = self.fqme2ipc.get(fqme) if ipc is None: raise SymbolNotFound( 'No chart (actor) seems to have mkt feed loaded?\n' f'{fqme}' ) return ipc async def add_rect( self, fqme: str, timeframe: float, start_pos: tuple[float, float], end_pos: tuple[float, float], # TODO: a `Literal['view', 'scene']` for this? domain: str = 'view', # or 'scene' color: str = 'dad_blue', from_acm: bool = False, # NEW: optional timestamps for server-side index lookup start_time: float|None = None, end_time: float|None = None, ) -> int|None: ''' Add a `SelectRect` annotation to the target view, return the instances `id(obj)` from the remote UI actor. ''' ipc: MsgStream = self._get_ipc(fqme) with trio.fail_after(3): await ipc.send({ 'fqme': fqme, 'cmd': 'SelectRect', 'timeframe': timeframe, # 'meth': str(meth), 'meth': 'set_view_pos' if domain == 'view' else 'set_scene_pos', 'kwargs': { 'start_pos': tuple(start_pos), 'end_pos': tuple(end_pos), 'color': color, 'update_label': False, 'start_time': start_time, 'end_time': end_time, }, }) aid: int|dict = await ipc.receive() match aid: case {'error': str(msg)}: log.error(msg) return None self._ipcs[aid] = ipc if not from_acm: self._annot_stack.push_async_callback( partial( self.remove, aid, ) ) return aid async def remove( self, aid: int, ) -> bool: ''' Remove an existing annotation by instance id. ''' ipc: MsgStream = self._ipcs[aid] await ipc.send({ 'cmd': 'remove', 'aid': aid, }) removed: bool = await ipc.receive() return removed @acm async def open_rect( self, **kwargs, ) -> int: try: aid: int = await self.add_rect( from_acm=True, **kwargs, ) yield aid finally: # async ipc send op with trio.CancelScope(shield=True): await self.remove(aid) async def redraw( self, fqme: str, timeframe: float, ) -> None: await self._get_ipc(fqme).send({ 'cmd': 'redraw', 'fqme': fqme, # 'render': int(aid), # 'viz_name': str(viz_name), 'timeframe': timeframe, }) async def add_arrow( self, fqme: str, timeframe: float, x: float, y: float, pointing: Literal[ 'up', 'down', ], # TODO: a `Literal['view', 'scene']` for this? # domain: str = 'view', # or 'scene' color: str = 'dad_blue', alpha: int = 116, headLen: float|None = None, headWidth: float|None = None, tailLen: float|None = None, tailWidth: float|None = None, pxMode: bool = True, from_acm: bool = False, # NEW: optional timestamp for server-side index lookup time: float|None = None, ) -> int|None: ''' Add a `SelectRect` annotation to the target view, return the instances `id(obj)` from the remote UI actor. ''' ipc: MsgStream = self._get_ipc(fqme) with trio.fail_after(3): await ipc.send({ 'fqme': fqme, 'cmd': 'ArrowEditor', 'timeframe': timeframe, # 'meth': str(meth), 'meth': 'add', 'kwargs': { 'x': float(x), 'y': float(y), 'color': color, 'pointing': pointing, # up|down 'alpha': alpha, 'aid': None, 'headLen': headLen, 'headWidth': headWidth, 'tailLen': tailLen, 'tailWidth': tailWidth, 'pxMode': pxMode, 'time': time, # for server-side index lookup }, }) aid: int|dict = await ipc.receive() match aid: case {'error': str(msg)}: log.error(msg) return None self._ipcs[aid] = ipc if not from_acm: self._annot_stack.push_async_callback( partial( self.remove, aid, ) ) return aid async def add_text( self, fqme: str, timeframe: float, text: str, x: float, y: float, color: str|tuple = 'dad_blue', anchor: tuple[float, float] = (0, 1), font_size: int|None = None, from_acm: bool = False, # NEW: optional timestamp for server-side index lookup time: float|None = None, ) -> int|None: ''' Add a `pg.TextItem` annotation to the target view. anchor: (x, y) where (0,0) is upper-left, (1,1) is lower-right font_size: pixel size for font, defaults to `_font.font.pixelSize()` ''' ipc: MsgStream = self._get_ipc(fqme) with trio.fail_after(3): await ipc.send({ 'fqme': fqme, 'cmd': 'TextItem', 'timeframe': timeframe, 'kwargs': { 'text': text, 'x': float(x), 'y': float(y), 'color': color, 'anchor': tuple(anchor), 'font_size': font_size, 'time': time, # for server-side index lookup }, }) aid: int|dict = await ipc.receive() match aid: case {'error': str(msg)}: log.error(msg) return None self._ipcs[aid] = ipc if not from_acm: self._annot_stack.push_async_callback( partial( self.remove, aid, ) ) return aid @acm async def open_annot_ctl( uid: tuple[str, str] | None = None, ) -> AnnotCtl: # TODO: load connetion to a specific chart actor # -[ ] pull from either service scan or config # -[ ] return some kinda client/proxy thinger? # -[ ] maybe we should finally just provide this as # a `tractor.hilevel.CallableProxy` or wtv? # -[ ] use this from the storage.cli stuff to mark up gaps! maybe_portals: list[Portal] | None fqmes: list[str] async with find_service( service_name='chart', first_only=False, ) as maybe_portals: ctx_mngrs: list[AsyncContextManager] = [] # TODO: print the current discoverable actor UID set # here as well? if not maybe_portals: raise RuntimeError( 'No chart actors found in service domain?' ) for portal in maybe_portals: ctx_mngrs.append( portal.open_context(remote_annotate) ) ctx2fqmes: dict[str, set[str]] = {} fqme2ipc: dict[str, MsgStream] = {} stream_ctxs: list[AsyncContextManager] = [] async with ( trionics.gather_contexts(ctx_mngrs) as ctxs, ): for (ctx, fqmes) in ctxs: stream_ctxs.append(ctx.open_stream()) # fill lookup table of mkt addrs to IPC ctxs for fqme in fqmes: if other := fqme2ipc.get(fqme): raise ValueError( f'More then one chart displays {fqme}!?\n' 'Other UI actor info:\n' f'channel: {other._ctx.chan}]\n' f'actor uid: {other._ctx.chan.uid}]\n' f'ctx id: {other._ctx.cid}]\n' ) ctx2fqmes.setdefault( ctx.cid, set(), ).add(fqme) async with trionics.gather_contexts(stream_ctxs) as streams: for stream in streams: fqmes: set[str] = ctx2fqmes[stream._ctx.cid] for fqme in fqmes: fqme2ipc[fqme] = stream # NOTE: on graceful teardown we always attempt to # remove all annots that were created by the # entering client. # TODO: should we maybe instead/also do this on the # server-actor side so that when a client # disconnects we always delete all annotations by # default instaead of expecting the client to? async with AsyncExitStack() as annots_stack: client = AnnotCtl( ctx2fqmes=ctx2fqmes, fqme2ipc=fqme2ipc, _annot_stack=annots_stack, ) yield client