From daa429f7ca0f9ac438a23d2f1c5cda10b0cb7e89 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 13 Nov 2020 10:39:30 -0500 Subject: [PATCH] Put fsp plotting into a couple tasks, startup speedups. Break the chart update code for fsps into a new task (add a nursery) in new `spawn_fsps` (was `chart_from_fsps`) that async requests actor spawning and initial historical data (all CPU bound work). For multiple fsp subcharts this allows processing initial output in parallel (multi-core). We might want to wrap this in a "feed" like api eventually. Basically the fsp startup sequence is now: - start all requested fsp actors in an async loop and wait for historical data to arrive - loop through them all again to start update tasks which do chart graphics rendering Add separate x-axis objects for each new subchart (required by pyqtgraph); still need to fix hiding unnecessary ones. Add a `ChartPlotWidget._arrays: dict` for holding overlay data distinct from ohlc. Drop the sizing yrange to label heights for now since it's pretty much all gone to hell since adding L1 labels. Fix y-stickies to look up correct overly arrays. --- piker/ui/_chart.py | 357 +++++++++++++++++++++++++++++---------------- 1 file changed, 231 insertions(+), 126 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 0caf0d17..edb9d6cd 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -184,11 +184,6 @@ class LinkedSplitCharts(QtGui.QWidget): orientation='bottom', linked_charts=self ) - self.xaxis_ind = DynamicDateAxis( - orientation='bottom', - linked_charts=self - ) - # if _xaxis_at == 'bottom': # self.xaxis.setStyle(showValues=False) # self.xaxis.hide() @@ -274,7 +269,12 @@ class LinkedSplitCharts(QtGui.QWidget): cv.linked_charts = self # use "indicator axis" by default - xaxis = self.xaxis_ind if xaxis is None else xaxis + if xaxis is None: + xaxis = DynamicDateAxis( + orientation='bottom', + linked_charts=self + ) + cpw = ChartPlotWidget( array=array, parent=self.splitter, @@ -286,6 +286,8 @@ class LinkedSplitCharts(QtGui.QWidget): cursor=self._ch, **cpw_kwargs, ) + cv.chart = cpw + # this name will be used to register the primary # graphics curve managed by the subchart cpw.name = name @@ -357,6 +359,7 @@ class ChartPlotWidget(pg.PlotWidget): ) # self.setViewportMargins(0, 0, 0, 0) self._array = array # readonly view of data + self._arrays = {} # readonly view of overlays self._graphics = {} # registry of underlying graphics self._overlays = {} # registry of overlay curves self._labels = {} # registry of underlying graphics @@ -389,11 +392,19 @@ class ChartPlotWidget(pg.PlotWidget): def last_bar_in_view(self) -> bool: self._array[-1]['index'] - def update_contents_labels(self, index: int) -> None: + def update_contents_labels( + self, + index: int, + # array_name: str, + ) -> None: if index >= 0 and index < len(self._array): - array = self._array - for name, (label, update) in self._labels.items(): + + if name is self.name : + array = self._array + else: + array = self._arrays[name] + update(index, array) def _set_xlimits( @@ -477,7 +488,7 @@ class ChartPlotWidget(pg.PlotWidget): label = ContentsLabel(chart=self, anchor_at=('top', 'left')) self._labels[name] = (label, partial(label.update_from_ohlc, name)) label.show() - self.update_contents_labels(len(data) - 1) + self.update_contents_labels(len(data) - 1) #, name) self._add_sticky(name) @@ -512,6 +523,7 @@ class ChartPlotWidget(pg.PlotWidget): if overlay: anchor_at = ('bottom', 'right') self._overlays[name] = curve + self._arrays[name] = data else: anchor_at = ('top', 'right') @@ -523,7 +535,7 @@ class ChartPlotWidget(pg.PlotWidget): label = ContentsLabel(chart=self, anchor_at=anchor_at) self._labels[name] = (label, partial(label.update_from_value, name)) label.show() - self.update_contents_labels(len(data) - 1) + self.update_contents_labels(len(data) - 1) #, name) if self._cursor: self._cursor.add_curve_cursor(self, curve) @@ -556,9 +568,7 @@ class ChartPlotWidget(pg.PlotWidget): """Update the named internal graphics from ``array``. """ - if name not in self._overlays: - self._array = array - + self._array = array graphics = self._graphics[name] graphics.update_from_array(array, **kwargs) return graphics @@ -574,6 +584,8 @@ class ChartPlotWidget(pg.PlotWidget): """ if name not in self._overlays: self._array = array + else: + self._arrays[name] = array curve = self._graphics[name] # TODO: we should instead implement a diff based @@ -644,40 +656,43 @@ class ChartPlotWidget(pg.PlotWidget): ylow = np.nanmin(bars['low']) yhigh = np.nanmax(bars['high']) except (IndexError, ValueError): - # must be non-ohlc array? + # likely non-ohlc array? + bars = bars[self.name] ylow = np.nanmin(bars) yhigh = np.nanmax(bars) # view margins: stay within a % of the "true range" diff = yhigh - ylow ylow = ylow - (diff * 0.04) - # yhigh = yhigh + (diff * 0.01) + yhigh = yhigh + (diff * 0.04) - # compute contents label "height" in view terms - # to avoid having data "contents" overlap with them - if self._labels: - label = self._labels[self.name][0] + # # compute contents label "height" in view terms + # # to avoid having data "contents" overlap with them + # if self._labels: + # label = self._labels[self.name][0] - rect = label.itemRect() - tl, br = rect.topLeft(), rect.bottomRight() - vb = self.plotItem.vb + # rect = label.itemRect() + # tl, br = rect.topLeft(), rect.bottomRight() + # vb = self.plotItem.vb - try: - # on startup labels might not yet be rendered - top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y()) + # try: + # # on startup labels might not yet be rendered + # top, bottom = (vb.mapToView(tl).y(), vb.mapToView(br).y()) - # XXX: magic hack, how do we compute exactly? - label_h = (top - bottom) * 0.42 + # # XXX: magic hack, how do we compute exactly? + # label_h = (top - bottom) * 0.42 - except np.linalg.LinAlgError: - label_h = 0 - else: - label_h = 0 + # except np.linalg.LinAlgError: + # label_h = 0 + # else: + # label_h = 0 - # print(f'label height {self.name}: {label_h}') + # # print(f'label height {self.name}: {label_h}') - if label_h > yhigh - ylow: - label_h = 0 + # if label_h > yhigh - ylow: + # label_h = 0 + # print(f"bounds (ylow, yhigh): {(ylow, yhigh)}") + label_h = 0 self.setLimits( yMin=ylow, @@ -715,9 +730,6 @@ async def _async_main( # chart_app.init_search() - # from ._exec import get_screen - # screen = get_screen(chart_app.geometry().bottomRight()) - # XXX: bug zone if you try to ctl-c after this we get hangs again? # wtf... # await tractor.breakpoint() @@ -749,13 +761,28 @@ async def _async_main( chart._set_yrange() + # eventually we'll support some kind of n-compose syntax + fsp_conf = { + 'vwap': { + 'overlay': True, + 'anchor': 'session', + }, + 'rsi': { + 'period': 14, + 'chart_kwargs': { + 'static_yrange': (0, 100), + }, + }, + + } + async with trio.open_nursery() as n: # load initial fsp chain (otherwise known as "indicators") n.start_soon( - chart_from_fsp, + spawn_fsps, linked_charts, - 'rsi', # eventually will be n-compose syntax + fsp_conf, sym, ohlcv, brokermod, @@ -800,6 +827,7 @@ async def chart_from_quotes( vwap_in_history: bool = False, ) -> None: """The 'main' (price) chart real-time update loop. + """ # TODO: bunch of stuff: # - I'm starting to think all this logic should be @@ -836,6 +864,14 @@ async def chart_from_quotes( size_digits=min(float_digits(volume), 3) ) + # TODO: + # - in theory we should be able to read buffer data faster + # then msgs arrive.. needs some tinkering and testing + + # - if trade volume jumps above / below prior L1 price + # levels this might be dark volume we need to + # present differently? + async for quotes in stream: for sym, quote in quotes.items(): # print(f'CHART: {quote}') @@ -862,19 +898,9 @@ async def chart_from_quotes( array, ) - if vwap_in_history: - # update vwap overlay line - chart.update_curve_from_array('vwap', ohlcv.array) - - # TODO: - # - eventually we'll want to update bid/ask labels - # and other data as subscribed by underlying UI - # consumers. - # - in theory we should be able to read buffer data faster - # then msgs arrive.. needs some tinkering and testing - - # if trade volume jumps above / below prior L1 price - # levels adjust bid / ask lines to match + # if vwap_in_history: + # # update vwap overlay line + # chart.update_curve_from_array('vwap', ohlcv.array) # compute max and min trade values to display in view # TODO: we need a streaming minmax algorithm here, see @@ -910,7 +936,7 @@ async def chart_from_quotes( l1.bid_label.update_from_data(0, price) # update min price in view to keep bid on screen - mn_in_view = max(price, mn_in_view) + mn_in_view = min(price, mn_in_view) if mx_in_view > last_mx or mn_in_view < last_mn: chart._set_yrange(yrange=(mn_in_view, mx_in_view)) @@ -923,9 +949,10 @@ async def chart_from_quotes( last_bars_range = brange -async def chart_from_fsp( - linked_charts, - fsp_func_name, +async def spawn_fsps( + linked_charts: LinkedSplitCharts, + # fsp_func_name, + fsps: Dict[str, str], sym, src_shm, brokermod, @@ -934,53 +961,119 @@ async def chart_from_fsp( """Start financial signal processing in subactor. Pass target entrypoint and historical data. + """ - name = f'fsp.{fsp_func_name}' - - # TODO: load function here and introspect - # return stream type(s) - - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) - + # spawns sub-processes which execute cpu bound FSP code async with tractor.open_nursery() as n: - key = f'{sym}.' + name - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, - readonly=True, + # spawns local task that consume and chart data streams from + # sub-procs + async with trio.open_nursery() as ln: + + # Currently we spawn an actor per fsp chain but + # likely we'll want to pool them eventually to + # scale horizonatlly once cores are used up. + for fsp_func_name, conf in fsps.items(): + + display_name = f'fsp.{fsp_func_name}' + + # TODO: load function here and introspect + # return stream type(s) + + # TODO: should `index` be a required internal field? + fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) + + key = f'{sym}.' + display_name + + # this is all sync currently + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) + + # XXX: fsp may have been opened by a duplicate chart. Error for + # now until we figure out how to wrap fsps as "feeds". + assert opened, f"A chart for {key} likely already exists?" + + conf['shm'] = shm + + # spawn closure, can probably define elsewhere + async def spawn_fsp_daemon( + fsp_name, + conf, + ): + """Start an fsp subactor async. + + """ + portal = await n.run_in_actor( + + # name as title of sub-chart + display_name, + + # subactor entrypoint + fsp.cascade, + brokername=brokermod.name, + src_shm_token=src_shm.token, + dst_shm_token=conf['shm'].token, + symbol=sym, + fsp_func_name=fsp_name, + + # tractor config + loglevel=loglevel, + ) + + stream = await portal.result() + + # receive last index for processed historical + # data-array as first msg + _ = await stream.receive() + + conf['stream'] = stream + conf['portal'] = portal + + # new local task + ln.start_soon( + spawn_fsp_daemon, + fsp_func_name, + conf, + ) + + # blocks here until all daemons up + + # start and block on update loops + async with trio.open_nursery() as ln: + for fsp_func_name, conf in fsps.items(): + ln.start_soon( + update_signals, + linked_charts, + fsp_func_name, + conf, + ) + + +async def update_signals( + linked_charts: LinkedSplitCharts, + fsp_func_name: str, + conf: Dict[str, Any], + +) -> None: + """FSP stream chart update loop. + + """ + shm = conf['shm'] + + if conf.get('overlay'): + chart = linked_charts.chart + chart.draw_curve( + name='vwap', + data=shm.array, + overlay=True, ) + last_val_sticky = None - # XXX: fsp may have been opened by a duplicate chart. Error for - # now until we figure out how to wrap fsps as "feeds". - assert opened, f"A chart for {key} likely already exists?" - - # start fsp sub-actor - portal = await n.run_in_actor( - - # name as title of sub-chart - name, - - # subactor entrypoint - fsp.cascade, - brokername=brokermod.name, - src_shm_token=src_shm.token, - dst_shm_token=shm.token, - symbol=sym, - fsp_func_name=fsp_func_name, - - # tractor config - loglevel=loglevel, - ) - - stream = await portal.result() - - # receive last index for processed historical - # data-array as first msg - _ = await stream.receive() - + else: chart = linked_charts.add_plot( name=fsp_func_name, array=shm.array, @@ -989,11 +1082,15 @@ async def chart_from_fsp( ohlc=False, # settings passed down to ``ChartPlotWidget`` - static_yrange=(0, 100), + **conf.get('chart_kwargs', {}) + # static_yrange=(0, 100), ) # display contents labels asap - chart.update_contents_labels(len(shm.array) - 1) + chart.update_contents_labels( + len(shm.array) - 1, + # fsp_func_name + ) array = shm.array value = array[fsp_func_name][-1] @@ -1004,31 +1101,34 @@ async def chart_from_fsp( chart.update_curve_from_array(fsp_func_name, array) chart.default_view() - # TODO: figure out if we can roll our own `FillToThreshold` to - # get brush filled polygons for OS/OB conditions. - # ``pg.FillBetweenItems`` seems to be one technique using - # generic fills between curve types while ``PlotCurveItem`` has - # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which - # might be the best solution? - # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) - # graphics.curve.setBrush(50, 50, 200, 100) - # graphics.curve.setFillLevel(50) + # TODO: figure out if we can roll our own `FillToThreshold` to + # get brush filled polygons for OS/OB conditions. + # ``pg.FillBetweenItems`` seems to be one technique using + # generic fills between curve types while ``PlotCurveItem`` has + # logic inside ``.paint()`` for ``self.opts['fillLevel']`` which + # might be the best solution? + # graphics = chart.update_from_array(chart.name, array[fsp_func_name]) + # graphics.curve.setBrush(50, 50, 200, 100) + # graphics.curve.setFillLevel(50) - # add moveable over-[sold/bought] lines - level_line(chart, 30) - level_line(chart, 70, orient_v='top') + # add moveable over-[sold/bought] lines + level_line(chart, 30) + level_line(chart, 70, orient_v='top') - chart._shm = shm - chart._set_yrange() + chart._shm = shm + chart._set_yrange() - # update chart graphics - async for value in stream: - # p = pg.debug.Profiler(disabled=False, delayed=False) - array = shm.array - value = array[-1][fsp_func_name] + stream = conf['stream'] + + # update chart graphics + async for value in stream: + # p = pg.debug.Profiler(disabled=False, delayed=False) + array = shm.array + value = array[-1][fsp_func_name] + if last_val_sticky: last_val_sticky.update_from_data(-1, value) - chart.update_curve_from_array(fsp_func_name, array) - # p('rendered rsi datum') + chart.update_curve_from_array(fsp_func_name, array) + # p('rendered rsi datum') async def check_for_new_bars(feed, ohlcv, linked_charts): @@ -1081,11 +1181,16 @@ async def check_for_new_bars(feed, ohlcv, linked_charts): for name, curve in price_chart._overlays.items(): - # TODO: standard api for signal lookups per plot - if name in price_chart._array.dtype.fields: + price_chart.update_curve_from_array( + name, + price_chart._arrays[name] + ) - # should have already been incremented above - price_chart.update_curve_from_array(name, price_chart._array) + # # TODO: standard api for signal lookups per plot + # if name in price_chart._array.dtype.fields: + + # # should have already been incremented above + # price_chart.update_curve_from_array(name, price_chart._array) for name, chart in linked_charts.subplots.items(): chart.update_curve_from_array(chart.name, chart._shm.array)