diff --git a/piker/data/__init__.py b/piker/data/__init__.py index cae1347c..fa26801c 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -42,7 +42,7 @@ from ._sharedmem import ( ShmArray, get_shm_token, ) -from ._source import base_ohlc_dtype +from ._source import base_iohlc_dtype from ._buffer import ( increment_ohlc_buffer, subscribe_ohlc_for_increment @@ -139,6 +139,7 @@ class Feed: name: str stream: AsyncIterator[Dict[str, Any]] shm: ShmArray + # ticks: ShmArray _broker_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None @@ -188,7 +189,7 @@ async def open_feed( key=sym_to_shm_key(name, symbols[0]), # use any broker defined ohlc dtype: - dtype=getattr(mod, '_ohlc_dtype', base_ohlc_dtype), + dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), # we expect the sub-actor to write readonly=True, diff --git a/piker/data/_buffer.py b/piker/data/_buffer.py index 64460476..fed6b965 100644 --- a/piker/data/_buffer.py +++ b/piker/data/_buffer.py @@ -91,19 +91,20 @@ async def increment_ohlc_buffer( # append new entry to buffer thus "incrementing" the bar array = shm.array - last = array[-1:].copy() - (index, t, close) = last[0][['index', 'time', 'close']] + last = array[-1:][shm._write_fields].copy() + # (index, t, close) = last[0][['index', 'time', 'close']] + (t, close) = last[0][['time', 'close']] # this copies non-std fields (eg. vwap) from the last datum last[ - ['index', 'time', 'volume', 'open', 'high', 'low', 'close'] - ][0] = (index + 1, t + delay_s, 0, close, close, close, close) + ['time', 'volume', 'open', 'high', 'low', 'close'] + ][0] = (t + delay_s, 0, close, close, close, close) # write to the buffer shm.push(last) # broadcast the buffer index step - yield {'index': shm._i.value} + yield {'index': shm._last.value} def subscribe_ohlc_for_increment( diff --git a/piker/data/_normalize.py b/piker/data/_normalize.py index cbda6062..363f3c01 100644 --- a/piker/data/_normalize.py +++ b/piker/data/_normalize.py @@ -33,6 +33,6 @@ def iterticks( ticks = quote.get('ticks', ()) if ticks: for tick in ticks: - print(f"{quote['symbol']}: {tick}") + # print(f"{quote['symbol']}: {tick}") if tick.get('type') in types: yield tick diff --git a/piker/data/_source.py b/piker/data/_source.py index 3ad6d3e8..26180443 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -15,27 +15,36 @@ # along with this program. If not, see . """ -Numpy data source machinery. +numpy data source coversion helpers. """ import decimal from dataclasses import dataclass import numpy as np import pandas as pd +# from numba import from_dtype +ohlc_fields = [ + ('time', float), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('volume', int), + ('bar_wap', float), +] + +ohlc_with_index = ohlc_fields.copy() +ohlc_with_index.insert(0, ('index', int)) + # our minimum structured array layout for ohlc data -base_ohlc_dtype = np.dtype( - [ - ('index', int), - ('time', float), - ('open', float), - ('high', float), - ('low', float), - ('close', float), - ('volume', int), - ] -) +base_iohlc_dtype = np.dtype(ohlc_with_index) +base_ohlc_dtype = np.dtype(ohlc_fields) + +# TODO: for now need to construct this manually for readonly arrays, see +# https://github.com/numba/numba/issues/4511 +# numba_ohlc_dtype = from_dtype(base_ohlc_dtype) # map time frame "keys" to minutes values tf_in_1m = { @@ -110,18 +119,27 @@ def from_df( 'Low': 'low', 'Close': 'close', 'Volume': 'volume', + + # most feeds are providing this over sesssion anchored + 'vwap': 'bar_wap', + + # XXX: ib_insync calls this the "wap of the bar" + # but no clue what is actually is... + # https://github.com/pikers/piker/issues/119#issuecomment-729120988 + 'average': 'bar_wap', } df = df.rename(columns=columns) for name in df.columns: - if name not in base_ohlc_dtype.names[1:]: + # if name not in base_ohlc_dtype.names[1:]: + if name not in base_ohlc_dtype.names: del df[name] # TODO: it turns out column access on recarrays is actually slower: # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # it might make sense to make these structured arrays? - array = df.to_records() + array = df.to_records(index=False) _nan_to_closest_num(array) return array