diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 7695d89e..23237d3f 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -2,7 +2,6 @@ High level Qt chart widgets. """ from typing import Tuple, Dict, Any, Optional -import time from PyQt5 import QtCore, QtGui import numpy as np @@ -20,7 +19,10 @@ from ._style import _xaxis_at, _min_points_to_show, hcolor from ._source import Symbol from .. import brokers from .. import data -from ..data._normalize import iterticks +from ..data import ( + iterticks, + maybe_open_shm_array, +) from ..log import get_logger from ._exec import run_qtractor from ._interaction import ChartView @@ -542,7 +544,7 @@ class ChartPlotWidget(pg.PlotWidget): ylow = np.nanmin(bars['low']) yhigh = np.nanmax(bars['high']) # std = np.std(bars['close']) - except IndexError: + except (IndexError, ValueError): # must be non-ohlc array? ylow = np.nanmin(bars) yhigh = np.nanmax(bars) @@ -587,53 +589,6 @@ class ChartPlotWidget(pg.PlotWidget): self.scene().leaveEvent(ev) -async def check_for_new_bars(incr_stream, ohlcv, linked_charts): - """Task which updates from new bars in the shared ohlcv buffer every - ``delay_s`` seconds. - """ - # TODO: right now we'll spin printing bars if the last time - # stamp is before a large period of no market activity. - # Likely the best way to solve this is to make this task - # aware of the instrument's tradable hours? - - price_chart = linked_charts.chart - - async for index in incr_stream: - # TODO: generalize this increment logic - for name, chart in linked_charts.subplots.items(): - data = chart._array - chart._array = np.append( - data, - np.array(data[-1], dtype=data.dtype) - ) - - # update chart historical bars graphics - price_chart.update_from_array( - price_chart.name, - ohlcv.array, - # When appending a new bar, in the time between the insert - # here and the Qt render call the underlying price data may - # have already been updated, thus make sure to also update - # the last bar if necessary on this render cycle. - # just_history=True - ) - # resize view - price_chart._set_yrange() - - for name, curve in price_chart._overlays.items(): - # TODO: standard api for signal lookups per plot - if name in price_chart._array.dtype.fields: - # should have already been incremented above - price_chart.update_from_array( - name, - price_chart._array[name], - ) - - for name, chart in linked_charts.subplots.items(): - chart.update_from_array(chart.name, chart._array) - chart._set_yrange() - - async def _async_main( sym: str, brokername: str, @@ -656,8 +611,9 @@ async def _async_main( brokername, [sym], loglevel=loglevel, - ) as (fquote, stream, incr_stream, ohlcv): + ) as feed: + ohlcv = feed.shm bars = ohlcv.array # load in symbol's ohlc data @@ -673,6 +629,8 @@ async def _async_main( overlay=True, ) + chart._set_yrange() + async with trio.open_nursery() as n: # load initial fsp chain (otherwise known as "indicators") @@ -681,7 +639,7 @@ async def _async_main( linked_charts, 'rsi', # eventually will be n-compose syntax sym, - bars, + ohlcv, brokermod, loglevel, ) @@ -692,21 +650,22 @@ async def _async_main( *ohlcv.array[-1][['index', 'close']] ) - # wait for a first quote before we start any update tasks - quote = await stream.__anext__() - log.info(f'Received first quote {quote}') - # start graphics update loop(s)after receiving first live quote n.start_soon( chart_from_quotes, chart, - stream, + feed.stream, ohlcv, vwap_in_history, ) + + # wait for a first quote before we start any update tasks + quote = await feed.receive() + log.info(f'Received first quote {quote}') + n.start_soon( check_for_new_bars, - incr_stream, + feed, # delay, ohlcv, linked_charts @@ -743,20 +702,14 @@ async def chart_from_quotes( # - in theory we should be able to read buffer data # faster then msgs arrive.. needs some tinkering and # testing - start = time.time() array = ohlcv.array - diff = time.time() - start - print(f'read time: {diff}') - # last = ohlcv.array[-1] last = array[-1] chart.update_from_array( chart.name, - # ohlcv.array, array, ) # update sticky(s) - last_price_sticky.update_from_data( - *last[['index', 'close']]) + last_price_sticky.update_from_data(*last[['index', 'close']]) chart._set_yrange() vwap = quote.get('vwap') @@ -764,17 +717,14 @@ async def chart_from_quotes( last['vwap'] = vwap print(f"vwap: {quote['vwap']}") # update vwap overlay line - chart.update_from_array( - 'vwap', - ohlcv.array['vwap'], - ) + chart.update_from_array('vwap', ohlcv.array['vwap']) async def chart_from_fsp( linked_charts, func_name, sym, - bars, + src_shm, brokermod, loglevel, ) -> None: @@ -782,14 +732,31 @@ async def chart_from_fsp( Pass target entrypoint and historical data. """ + name = f'fsp.{func_name}' + # TODO: load function here and introspect + # return stream type(s) + fsp_dtype = np.dtype([('index', int), (func_name, float)]) + async with tractor.open_nursery() as n: + key = f'{sym}.' + name + + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) + assert opened + + # start fsp sub-actor portal = await n.run_in_actor( - f'fsp.{func_name}', # name as title of sub-chart + name, # name as title of sub-chart # subactor entrypoint - fsp.stream_and_process, - bars=bars, + fsp.cascade, brokername=brokermod.name, + src_shm_token=src_shm.token, + dst_shm_token=shm.token, symbol=sym, fsp_func_name=func_name, @@ -799,55 +766,77 @@ async def chart_from_fsp( stream = await portal.result() - # receive processed historical data-array as first message - history = (await stream.__anext__()) - - # TODO: enforce type checking here? - newbars = np.array(history) + # receive last index for processed historical + # data-array as first msg + _ = await stream.receive() chart = linked_charts.add_plot( name=func_name, - array=newbars, + + # TODO: enforce type checking here? + array=shm.array, ) - # check for data length mis-allignment and fill missing values - diff = len(chart._array) - len(linked_charts.chart._array) - if diff <= 0: - data = chart._array - chart._array = np.append( - data, - np.full(abs(diff), data[-1], dtype=data.dtype) - ) - - # XXX: hack to get curves aligned with bars graphics: prepend - # a copy of the first datum.. - # TODO: talk to ``pyqtgraph`` core about proper way to solve this - data = chart._array - chart._array = np.append( - np.array(data[0], dtype=data.dtype), - data, - ) - - value = chart._array[-1] + array = shm.array[func_name] + value = array[-1] last_val_sticky = chart._ysticks[chart.name] last_val_sticky.update_from_data(-1, value) - chart.update_from_array(chart.name, chart._array) + chart.update_from_array(chart.name, array) chart._set_yrange(yrange=(0, 100)) + chart._shm = shm + # update chart graphics async for value in stream: - - # start = time.time() - chart._array[-1] = value - # diff = time.time() - start - # print(f'FSP array append took {diff}') - + array = shm.array[func_name] + value = array[-1] last_val_sticky.update_from_data(-1, value) - chart.update_from_array(chart.name, chart._array) + chart.update_from_array(chart.name, array) # chart._set_yrange() +async def check_for_new_bars(feed, ohlcv, linked_charts): + """Task which updates from new bars in the shared ohlcv buffer every + ``delay_s`` seconds. + """ + # TODO: right now we'll spin printing bars if the last time + # stamp is before a large period of no market activity. + # Likely the best way to solve this is to make this task + # aware of the instrument's tradable hours? + + price_chart = linked_charts.chart + + async for index in await feed.index_stream(): + + # update chart historical bars graphics + price_chart.update_from_array( + price_chart.name, + ohlcv.array, + # When appending a new bar, in the time between the insert + # here and the Qt render call the underlying price data may + # have already been updated, thus make sure to also update + # the last bar if necessary on this render cycle which is + # why we **don't** set: + # just_history=True + ) + # resize view + price_chart._set_yrange() + + for name, curve in price_chart._overlays.items(): + # TODO: standard api for signal lookups per plot + if name in price_chart._array.dtype.fields: + # should have already been incremented above + price_chart.update_from_array( + name, + price_chart._array[name], + ) + + for name, chart in linked_charts.subplots.items(): + chart.update_from_array(chart.name, chart._shm.array[chart.name]) + chart._set_yrange() + + def _main( sym: str, brokername: str,