diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 4b77db0b..ec169c42 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -20,6 +20,7 @@ Financial signal processing for the peeps. from typing import AsyncIterator, Callable, Tuple import trio +from trio_typing import TaskStatus import tractor import numpy as np @@ -75,6 +76,7 @@ async def increment_signals( # write new slot to the buffer dst_shm.push(last) + len(dst_shm.array) @tractor.stream @@ -99,60 +101,107 @@ async def cascade( async with data.open_feed(brokername, [symbol]) as feed: assert src.token == feed.shm.token - # TODO: load appropriate fsp with input args - async def filter_by_sym(sym, stream): - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes + async def fsp_compute( + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + ) -> None: - out_stream = func( - filter_by_sym(symbol, feed.stream), - feed.shm, - ) + # TODO: load appropriate fsp with input args - # TODO: XXX: - # THERE'S A BIG BUG HERE WITH THE `index` field since we're - # prepending a copy of the first value a few times to make - # sub-curves align with the parent bar chart. - # This likely needs to be fixed either by, - # - manually assigning the index and historical data - # seperately to the shm array (i.e. not using .push()) - # - developing some system on top of the shared mem array that - # is `index` aware such that historical data can be indexed - # relative to the true first datum? Not sure if this is sane - # for derivatives. + async def filter_by_sym( + sym: str, + stream, + ): + # task cancellation won't kill the channel + async with stream.shield(): + async for quotes in stream: + for symbol, quotes in quotes.items(): + if symbol == sym: + yield quotes - # Conduct a single iteration of fsp with historical bars input - # and get historical output - history_output = await out_stream.__anext__() + out_stream = func( + filter_by_sym(symbol, feed.stream), + feed.shm, + ) - # build a struct array which includes an 'index' field to push - # as history - history = np.array( - np.arange(len(history_output)), - dtype=dst.array.dtype - ) - history[fsp_func_name] = history_output + # TODO: XXX: + # THERE'S A BIG BUG HERE WITH THE `index` field since we're + # prepending a copy of the first value a few times to make + # sub-curves align with the parent bar chart. + # This likely needs to be fixed either by, + # - manually assigning the index and historical data + # seperately to the shm array (i.e. not using .push()) + # - developing some system on top of the shared mem array that + # is `index` aware such that historical data can be indexed + # relative to the true first datum? Not sure if this is sane + # for incremental compuations. + dst._first.value = src._first.value + dst._last.value = src._first.value - # check for data length mis-allignment and fill missing values - diff = len(src.array) - len(history) - if diff >= 0: - print(f"WTF DIFF SIGNAL to HISTORY {diff}") - for _ in range(diff): - dst.push(history[:1]) + # Conduct a single iteration of fsp with historical bars input + # and get historical output + history_output = await out_stream.__anext__() - # compare with source signal and time align - index = dst.push(history) + # build a struct array which includes an 'index' field to push + # as history + history = np.array( + np.arange(len(history_output)), + dtype=dst.array.dtype + ) + history[fsp_func_name] = history_output - yield index + + # check for data length mis-allignment and fill missing values + diff = len(src.array) - len(history) + if diff >= 0: + print(f"WTF DIFF SIGNAL to HISTORY {diff}") + for _ in range(diff): + dst.push(history[:1]) + + # compare with source signal and time align + index = dst.push(history) + + await ctx.send_yield(index) + + # setup a respawn handle + with trio.CancelScope() as cs: + task_status.started(cs) + + # rt stream + async for processed in out_stream: + log.debug(f"{fsp_func_name}: {processed}") + index = src.index + dst.array[-1][fsp_func_name] = processed + + # stream latest shm array index entry + await ctx.send_yield(index) + + last_len = new_len = len(src.array) async with trio.open_nursery() as n: - n.start_soon(increment_signals, feed, dst) - async for processed in out_stream: - log.debug(f"{fsp_func_name}: {processed}") - index = src.index - dst.array[-1][fsp_func_name] = processed - await ctx.send_yield(index) + cs = await n.start(fsp_compute) + + # Increment the underlying shared memory buffer on every "increment" + # msg received from the underlying data feed. + + async for msg in await feed.index_stream(): + + new_len = len(src.array) + + if new_len > last_len + 1: + # respawn the signal compute task if the source + # signal has been updated + cs.cancel() + cs = await n.start(fsp_compute) + + # TODO: adopt an incremental update engine/approach + # where possible here eventually! + + array = dst.array + last = array[-1:].copy() + + # write new slot to the buffer + dst.push(last) + + last_len = new_len diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index 13bad728..f8811afa 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -151,8 +151,8 @@ def wma( return np.convolve(signal, weights, 'valid') -# @piker.fsp( - # aggregates=[60, 60*5, 60*60, '4H', '1D'], +# @piker.fsp.signal( +# timeframes=['1s', '5s', '15s', '1m', '5m', '1H'], # ) async def _rsi( source: 'QuoteStream[Dict[str, Any]]', # noqa @@ -171,8 +171,8 @@ async def _rsi( # TODO: the emas here should be seeded with a period SMA as per # wilder's original formula.. rsi_h, last_up_ema_close, last_down_ema_close = rsi(sig, period, seed, seed) - up_ema_last = last_up_ema_close - down_ema_last = last_down_ema_close + up_ema_last = last_up_ema_close + down_ema_last = last_down_ema_close # deliver history yield rsi_h diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index a708a82e..fedfdaa5 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -558,7 +558,9 @@ class ChartPlotWidget(pg.PlotWidget): # TODO: see how this handles with custom ohlcv bars graphics # and/or if we can implement something similar for OHLC graphics - clipToView=True, + # clipToView=True, + autoDownsample=True, + downsampleMethod='subsample', **pdi_kwargs, ) @@ -1221,9 +1223,23 @@ async def update_signals( # update chart graphics async for value in stream: - # read last - array = shm.array - value = array[-1][fsp_func_name] + # TODO: provide a read sync mechanism to avoid this polling. + # the underlying issue is that a backfill and subsequent shm + # array first/last index update could result in an empty array + # read here since the stream is never torn down on the + # re-compute steps. + read_tries = 2 + while read_tries > 0: + + try: + # read last + array = shm.array + value = array[-1][fsp_func_name] + break + + except IndexError: + read_tries -= 1 + continue if last_val_sticky: last_val_sticky.update_from_data(-1, value)