diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index bf2ea127..0cb58867 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -1,13 +1,16 @@ """ Financial signal processing for the peeps. """ -from typing import AsyncIterator, Callable +from typing import AsyncIterator, Callable, Tuple +import trio +import tractor import numpy as np from ..log import get_logger from .. import data from ._momo import _rsi +from ..data import attach_shm_array, Feed log = get_logger(__name__) @@ -19,7 +22,7 @@ async def latency( source: 'TickStream[Dict[str, float]]', # noqa ohlcv: np.ndarray ) -> AsyncIterator[np.ndarray]: - """Compute High-Low midpoint value. + """Latency measurements, broker to piker. """ # TODO: do we want to offer yielding this async # before the rt data connection comes up? @@ -37,33 +40,40 @@ async def latency( yield value -async def stream_and_process( - bars: np.ndarray, +async def increment_signals( + feed: Feed, + dst_shm: 'SharedArray', # noqa +) -> None: + async for msg in await feed.index_stream(): + array = dst_shm.array + last = array[-1:].copy() + + # write new slot to the buffer + dst_shm.push(last) + + + +@tractor.stream +async def cascade( + ctx: tractor.Context, brokername: str, - # symbols: List[str], + src_shm_token: dict, + dst_shm_token: Tuple[str, np.dtype], symbol: str, fsp_func_name: str, ) -> AsyncIterator[dict]: + """Chain streaming signal processors and deliver output to + destination mem buf. + """ + src = attach_shm_array(token=src_shm_token) + dst = attach_shm_array(readonly=False, token=dst_shm_token) - # remember, msgpack-numpy's ``from_buffer` returns read-only array - # bars = np.array(bars[list(ohlc_dtype.names)]) - - # async def _yield_bars(): - # yield bars - - # hist_out: np.ndarray = None - - # Conduct a single iteration of fsp with historical bars input - # async for hist_out in func(_yield_bars(), bars): - # yield {symbol: hist_out} func: Callable = _fsps[fsp_func_name] # open a data feed stream with requested broker - async with data.open_feed( - brokername, - [symbol], - ) as (fquote, stream): + async with data.open_feed(brokername, [symbol]) as feed: + assert src.token == feed.shm.token # TODO: load appropriate fsp with input args async def filter_by_sym(sym, stream): @@ -72,9 +82,37 @@ async def stream_and_process( if symbol == sym: yield quotes - async for processed in func( - filter_by_sym(symbol, stream), - bars, - ): - log.info(f"{fsp_func_name}: {processed}") - yield processed + out_stream = func( + filter_by_sym(symbol, feed.stream), + feed.shm, + ) + + # Conduct a single iteration of fsp with historical bars input + # and get historical output + history = await out_stream.__anext__() + + + # TODO: talk to ``pyqtgraph`` core about proper way to solve this: + # XXX: hack to get curves aligned with bars graphics: prepend + # a copy of the first datum.. + dst.push(history[:1]) + + # check for data length mis-allignment and fill missing values + diff = len(src.array) - len(history) + if diff >= 0: + for _ in range(diff): + dst.push(history[:1]) + + # compare with source signal and time align + index = dst.push(history) + + yield index + + async with trio.open_nursery() as n: + n.start_soon(increment_signals, feed, dst) + + async for processed in out_stream: + log.info(f"{fsp_func_name}: {processed}") + index = src.index + dst.array[-1][fsp_func_name] = processed + await ctx.send_yield(index)