diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 5459b4ba..e46db9b7 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -23,10 +23,13 @@ from ._sharedmem import ( maybe_open_shm_array, attach_shm_array, open_shm_array, - SharedArray, + ShmArray, get_shm_token, ) -from ._buffer import incr_buffer +from ._buffer import ( + increment_ohlc_buffer, + subscribe_ohlc_for_increment +) __all__ = [ @@ -35,7 +38,7 @@ __all__ = [ 'attach_shm_array', 'open_shm_array', 'get_shm_token', - 'incr_buffer', + 'subscribe_ohlc_for_increment', ] @@ -115,7 +118,7 @@ class Feed: """ name: str stream: AsyncIterator[Dict[str, Any]] - shm: SharedArray + shm: ShmArray _broker_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[Dict[str, Any]]] = None @@ -129,7 +132,7 @@ class Feed: # created for all practical purposes self._index_stream = await self._broker_portal.run( 'piker.data', - 'incr_buffer', + 'increment_ohlc_buffer', shm_token=self.shm.token, topics=['index'], ) diff --git a/piker/data/_buffer.py b/piker/data/_buffer.py index 43be7dc0..5e1c3588 100644 --- a/piker/data/_buffer.py +++ b/piker/data/_buffer.py @@ -1,74 +1,99 @@ """ Data buffers for fast shared humpy. """ -from typing import Tuple, Callable -import time +from typing import Tuple, Callable, Dict +# import time import tractor import trio -from ._sharedmem import attach_shm_array +from ._sharedmem import ShmArray + + +_shms: Dict[int, ShmArray] = {} @tractor.msg.pub -async def incr_buffer( +async def increment_ohlc_buffer( shm_token: dict, get_topics: Callable[..., Tuple[str]], # delay_s: Optional[float] = None, ): """Task which inserts new bars into the provide shared memory array every ``delay_s`` seconds. + + This task fulfills 2 purposes: + - it takes the subscribed set of shm arrays and increments them + on a common time period + - broadcast of this increment "signal" message to other actor + subscribers + + Note that if **no** actor has initiated this task then **none** of + the underlying buffers will actually be incremented. """ - # TODO: right now we'll spin printing bars if the last time - # stamp is before a large period of no market activity. - # Likely the best way to solve this is to make this task - # aware of the instrument's tradable hours? - - shm = attach_shm_array( - token=shm_token, - readonly=False, - ) - - # determine ohlc delay between bars - # to determine time step between datums - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + # TODO: right now we'll spin printing bars if the last time stamp is + # before a large period of no market activity. Likely the best way + # to solve this is to make this task aware of the instrument's + # tradable hours? # adjust delay to compensate for trio processing time - ad = delay_s - 0.002 + ad = min(_shms.keys()) - 0.001 - async def sleep(): - """Sleep until next time frames worth has passed from last bar. - """ - # last_ts = shm.array[-1]['time'] - # delay = max((last_ts + ad) - time.time(), 0) - # await trio.sleep(delay) - await trio.sleep(ad) + # async def sleep(): + # """Sleep until next time frames worth has passed from last bar. + # """ + # # last_ts = shm.array[-1]['time'] + # # delay = max((last_ts + ad) - time.time(), 0) + # # await trio.sleep(delay) + # await trio.sleep(ad) + + total_s = 0 # total seconds counted + lowest = min(_shms.keys()) + ad = lowest - 0.001 while True: - # sleep for duration of current bar - await sleep() + # TODO: do we want to support dynamically + # adding a "lower" lowest increment period? + await trio.sleep(ad) + total_s += lowest - # TODO: in theory we could make this faster by copying the - # "last" readable value into the underlying larger buffer's - # next value and then incrementing the counter instead of - # using ``.push()``? + # # sleep for duration of current bar + # await sleep() - # append new entry to buffer thus "incrementing" the bar - array = shm.array - last = array[-1:].copy() - (index, t, close) = last[0][['index', 'time', 'close']] + # increment all subscribed shm arrays + # TODO: this in ``numba`` + for delay_s, shms in _shms.items(): + if total_s % delay_s != 0: + continue - # this copies non-std fields (eg. vwap) from the last datum - last[ - ['index', 'time', 'volume', 'open', 'high', 'low', 'close'] - ][0] = (index + 1, t + delay_s, 0, close, close, close, close) + # TODO: numa this! + for shm in shms: + # TODO: in theory we could make this faster by copying the + # "last" readable value into the underlying larger buffer's + # next value and then incrementing the counter instead of + # using ``.push()``? - # write to the buffer - shm.push(last) - # print('incrementing array') + # append new entry to buffer thus "incrementing" the bar + array = shm.array + last = array[-1:].copy() + (index, t, close) = last[0][['index', 'time', 'close']] - # print(get_topics()) + # this copies non-std fields (eg. vwap) from the last datum + last[ + ['index', 'time', 'volume', 'open', 'high', 'low', 'close'] + ][0] = (index + 1, t + delay_s, 0, close, close, close, close) + + # write to the buffer + shm.push(last) # broadcast the buffer index step yield {'index': shm._i.value} + + +def subscribe_ohlc_for_increment( + shm: ShmArray, + delay: int, +) -> None: + """Add an OHLC ``ShmArray`` to the increment set. + """ + _shms.setdefault(delay, []).append(shm) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index d6c53a95..6c410423 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -134,7 +134,7 @@ def _make_token( ) -class SharedArray: +class ShmArray: def __init__( self, shmarr: np.ndarray, @@ -216,7 +216,7 @@ def open_shm_array( size: int = int(2*60*60*10/5), dtype: np.dtype = base_ohlc_dtype, readonly: bool = False, -) -> SharedArray: +) -> ShmArray: """Open a memory shared ``numpy`` using the standard library. This call unlinks (aka permanently destroys) the buffer on teardown @@ -245,7 +245,7 @@ def open_shm_array( ) counter.value = 0 - shmarr = SharedArray( + shmarr = ShmArray( array, counter, shm, @@ -268,7 +268,7 @@ def attach_shm_array( size: int = int(60*60*10/5), # dtype: np.dtype = base_ohlc_dtype, readonly: bool = True, -) -> SharedArray: +) -> ShmArray: """Load and attach to an existing shared memory array previously created by another process using ``open_shared_array``. """ @@ -289,7 +289,7 @@ def attach_shm_array( # make sure we can read counter.value - sha = SharedArray( + sha = ShmArray( shmarr, counter, shm, @@ -314,7 +314,7 @@ def maybe_open_shm_array( key: str, dtype: np.dtype = base_ohlc_dtype, **kwargs, -) -> Tuple[SharedArray, bool]: +) -> Tuple[ShmArray, bool]: """Attempt to attach to a shared memory block by a "key" determined by the users overall "system" (presumes you don't have the block's explicit token).