diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index b33a613c..780fe502 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -20,6 +20,7 @@ from ._style import _xaxis_at, _min_points_to_show, hcolor from ._source import Symbol from .. import brokers from .. import data +from ..data._normalize import iterticks from ..log import get_logger from ._exec import run_qtractor from ._source import base_ohlc_dtype @@ -483,6 +484,7 @@ class ChartPlotWidget(pg.PlotWidget): array: np.ndarray, **kwargs, ) -> pg.GraphicsObject: + self._array = array graphics = self._graphics[name] graphics.update_from_array(array, **kwargs) @@ -586,8 +588,9 @@ class ChartPlotWidget(pg.PlotWidget): self.scene().leaveEvent(ev) -async def add_new_bars(delay_s, linked_charts): - """Task which inserts new bars into the ohlc every ``delay_s`` seconds. +async def check_for_new_bars(delay_s, ohlcv, linked_charts): + """Task which updates from new bars in the shared ohlcv buffer every + ``delay_s`` seconds. """ # TODO: right now we'll spin printing bars if the last time # stamp is before a large period of no market activity. @@ -598,29 +601,19 @@ async def add_new_bars(delay_s, linked_charts): ad = delay_s - 0.002 price_chart = linked_charts.chart - ohlc = price_chart._array + # ohlc = price_chart._array async def sleep(): """Sleep until next time frames worth has passed from last bar. """ - last_ts = ohlc[-1]['time'] - delay = max((last_ts + ad) - time.time(), 0) - await trio.sleep(delay) + # last_ts = ohlcv.array[-1]['time'] + # delay = max((last_ts + ad) - time.time(), 0) + # await trio.sleep(delay) + await trio.sleep(ad) # sleep for duration of current bar await sleep() - def incr_ohlc_array(array: np.ndarray): - (index, t, close) = array[-1][['index', 'time', 'close']] - - # this copies non-std fields (eg. vwap) from the last datum - _next = np.array(array[-1], dtype=array.dtype) - _next[ - ['index', 'time', 'volume', 'open', 'high', 'low', 'close'] - ] = (index + 1, t + delay_s, 0, close, close, close, close) - new_array = np.append(array, _next,) - return new_array - while True: # TODO: bunch of stuff: # - I'm starting to think all this logic should be @@ -633,12 +626,6 @@ async def add_new_bars(delay_s, linked_charts): # of copying it from last bar's close # - 5 sec bar lookback-autocorrection like tws does? - # add new increment/bar - start = time.time() - ohlc = price_chart._array = incr_ohlc_array(ohlc) - diff = time.time() - start - print(f'array append took {diff}') - # TODO: generalize this increment logic for name, chart in linked_charts.subplots.items(): @@ -655,11 +642,12 @@ async def add_new_bars(delay_s, linked_charts): # keep. # if last_quote == ohlc[-1]: # log.debug("Printing flat line for {sym}") + # print(ohlcv.array) # update chart historical bars graphics price_chart.update_from_array( price_chart.name, - ohlc, + ohlcv.array, # When appending a new bar, in the time between the insert # here and the Qt render call the underlying price data may # have already been updated, thus make sure to also update @@ -712,107 +700,134 @@ async def _async_main( # historical data fetch brokermod = brokers.get_brokermod(brokername) - async with brokermod.get_client() as client: - # figure out the exact symbol - bars = await client.bars(symbol=sym) + async with data.open_feed( + brokername, + [sym], + loglevel=loglevel, + ) as (fquote, stream, incr_stream, ohlcv): - # allow broker to declare historical data fields - ohlc_dtype = getattr(brokermod, 'ohlc_dtype', base_ohlc_dtype) + bars = ohlcv.array - # remember, msgpack-numpy's ``from_buffer` returns read-only array - bars = np.array(bars[list(ohlc_dtype.names)]) + # load in symbol's ohlc data + linked_charts, chart = chart_app.load_symbol(sym, bars) - # load in symbol's ohlc data - linked_charts, chart = chart_app.load_symbol(sym, bars) + # plot historical vwap if available + vwap_in_history = False + if 'vwap' in bars.dtype.fields: + vwap_in_history = True + chart.draw_curve( + name='vwap', + data=bars['vwap'], + overlay=True, + ) - # plot historical vwap if available - vwap_in_history = False - if 'vwap' in bars.dtype.fields: - vwap_in_history = True - chart.draw_curve( - name='vwap', - data=bars['vwap'], - overlay=True, - ) + # determine ohlc delay between bars + # to determine time step between datums + times = bars['time'] + delay = times[-1] - times[times != times[-1]][-1] - # determine ohlc delay between bars - times = bars['time'] + async with trio.open_nursery() as n: - # find expected time step between datums - delay = times[-1] - times[times != times[-1]][-1] + # load initial fsp chain (otherwise known as "indicators") + n.start_soon( + chart_from_fsp, + linked_charts, + 'rsi', # eventually will be n-compose syntax + sym, + bars, + brokermod, + loglevel, + ) - async with trio.open_nursery() as n: - - # load initial fsp chain (otherwise known as "indicators") - n.start_soon( - chart_from_fsp, - linked_charts, - 'rsi', - sym, - bars, - brokermod, - loglevel, - ) - - # update last price sticky - last_price_sticky = chart._ysticks[chart.name] - last_price_sticky.update_from_data( - *chart._array[-1][['index', 'close']] - ) - - # graphics update loop - - async with data.open_feed( - brokername, - [sym], - loglevel=loglevel, - ) as (fquote, stream): + # update last price sticky + last_price_sticky = chart._ysticks[chart.name] + last_price_sticky.update_from_data( + *ohlcv.array[-1][['index', 'close']] + ) # wait for a first quote before we start any update tasks quote = await stream.__anext__() - log.info(f'RECEIVED FIRST QUOTE {quote}') + log.info(f'Received first quote {quote}') - # start graphics tasks after receiving first live quote - n.start_soon(add_new_bars, delay, linked_charts) + # start graphics update loop(s)after receiving first live quote + n.start_soon( + chart_from_quotes, + chart, + stream, + ohlcv, + vwap_in_history, + ) + n.start_soon( + check_for_new_bars, + delay, + ohlcv, + linked_charts + ) - async for quotes in stream: - for sym, quote in quotes.items(): - ticks = quote.get('ticks', ()) - for tick in ticks: - if tick.get('type') == 'trade': + # probably where we'll eventually start the user input loop + await trio.sleep_forever() - # TODO: eventually we'll want to update - # bid/ask labels and other data as - # subscribed by underlying UI consumers. - # last = quote.get('last') or quote['close'] - last = tick['price'] - # update ohlc (I guess we're enforcing this - # for now?) overwrite from quote - high, low = chart._array[-1][['high', 'low']] - chart._array[['high', 'low', 'close']][-1] = ( - max(high, last), - min(low, last), - last, - ) - chart.update_from_array( - chart.name, - chart._array, - ) - # update sticky(s) - last_price_sticky.update_from_data( - *chart._array[-1][['index', 'close']]) - chart._set_yrange() +async def chart_from_quotes( + chart: ChartPlotWidget, + stream, + ohlcv: np.ndarray, + vwap_in_history: bool = False, +) -> None: + """The 'main' (price) chart real-time update loop. + """ - vwap = quote.get('vwap') - if vwap and vwap_in_history: - chart._array['vwap'][-1] = vwap - print(f"vwap: {quote['vwap']}") - # update vwap overlay line - chart.update_from_array( - 'vwap', - chart._array['vwap'], - ) + last_price_sticky = chart._ysticks[chart.name] + + print_next = False + async for quotes in stream: + for sym, quote in quotes.items(): + for tick in iterticks(quote, type='trade'): + # TODO: eventually we'll want to update + # bid/ask labels and other data as + # subscribed by underlying UI consumers. + # last_close = ohlcv.array[-1]['close'] + + # last = quote.get('last') or quote['close'] + # last = tick['price'] + + # if print_next: + # print(f"next last: {last}") + # print_next = False + + # if last_close != last: + # log.error(f"array last_close: {last_close}\nlast: {last}") + # print_next = True + + # update ohlc (I guess we're enforcing this + # for now?) overwrite from quote + # high, low = chart._array[-1][['high', 'low']] + # chart._array[['high', 'low', 'close']][-1] = ( + # max(high, last), + # min(low, last), + # last, + # ) + last = ohlcv.array[-1] + chart.update_from_array( + chart.name, + ohlcv.array, + ) + # update sticky(s) + last_price_sticky.update_from_data( + *last[['index', 'close']]) + chart._set_yrange() + + vwap = quote.get('vwap') + if vwap and vwap_in_history: + # chart._array['vwap'][-1] = vwap + last['vwap'] = vwap + print(f"vwap: {quote['vwap']}") + # update vwap overlay line + chart.update_from_array( + 'vwap', + # chart._array['vwap'], + ohlcv.array['vwap'], + ) async def chart_from_fsp( @@ -882,7 +897,12 @@ async def chart_from_fsp( # update chart graphics async for value in stream: + + # start = time.time() chart._array[-1] = value + # diff = time.time() - start + # print(f'FSP array append took {diff}') + last_val_sticky.update_from_data(-1, value) chart.update_from_array(chart.name, chart._array) # chart._set_yrange()