diff --git a/piker/data/feed.py b/piker/data/feed.py index c2fbbd5e..46151a89 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -21,7 +21,7 @@ This module is enabled for ``brokerd`` daemons. """ from __future__ import annotations -from contextlib import asynccontextmanager +from contextlib import asynccontextmanager as acm from dataclasses import ( dataclass, field, @@ -287,7 +287,7 @@ async def start_backfill( - pendulum.from_timestamp(times[-2]) ).seconds - # "frame"'s worth of sample period steps in seconds + # frame's worth of sample-period-steps, in seconds frame_size_s = len(array) * step_size_s to_push = diff_history( @@ -298,7 +298,7 @@ async def start_backfill( ) log.info(f'Pushing {to_push.size} to shm!') - shm.push(to_push) + shm.push(to_push, prepend=True) # TODO: *** THIS IS A BUG *** # we need to only broadcast to subscribers for this fqsn.. @@ -310,7 +310,11 @@ async def start_backfill( bf_done = trio.Event() # let caller unblock and deliver latest history frame - task_status.started((start_dt, end_dt, bf_done)) + task_status.started(( + start_dt, + end_dt, + bf_done, + )) # based on the sample step size, maybe load a certain amount history if last_tsdb_dt is None: @@ -325,14 +329,14 @@ async def start_backfill( # when no tsdb "last datum" is provided, we just load # some near-term history. periods = { - 1: {'seconds': 4000}, + 1: {'days': 1}, 60: {'days': 14}, } if tsdb_is_up: # do a decently sized backfill and load it into storage. periods = { - 1: {'days': 6}, + 1: {'days': 1}, 60: {'years': 6}, } @@ -461,6 +465,186 @@ async def start_backfill( return +async def basic_backfill( + bus: _FeedsBus, + mod: ModuleType, + bfqsn: str, + shms: dict[int, ShmArray], + +) -> None: + + # do a legacy incremental backfill from the provider. + log.info('No TSDB (marketstored) found, doing basic backfill..') + + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + for timeframe, shm in shms.items(): + await bus.nursery.start( + partial( + start_backfill, + mod, + bfqsn, + shm, + timeframe=timeframe, + ) + ) + + +async def tsdb_backfill( + mod: ModuleType, + marketstore: ModuleType, + bus: _FeedsBus, + storage: Storage, + fqsn: str, + bfqsn: str, + shms: dict[int, ShmArray], + + # some_data_ready: trio.Event, + task_status: TaskStatus[ + tuple[ShmArray, ShmArray] + ] = trio.TASK_STATUS_IGNORED, + +) -> None: + + # TODO: this should be used verbatim for the pure + # shm backfiller approach below. + dts_per_tf: dict[int, datetime] = {} + + # start history anal and load missing new data via backend. + for timeframe, shm in shms.items(): + series, _, last_tsdb_dt = await storage.load( + fqsn, + timeframe=timeframe, + ) + + broker, symbol, expiry = unpack_fqsn(fqsn) + ( + latest_start_dt, + latest_end_dt, + bf_done, + ) = await bus.nursery.start( + partial( + start_backfill, + mod, + bfqsn, + shm, + timeframe=timeframe, + last_tsdb_dt=last_tsdb_dt, + tsdb_is_up=True, + storage=storage, + ) + ) + dts_per_tf[timeframe] = ( + series.get(timeframe), + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + ) + + # if len(hist_shm.array) < 2: + # TODO: there's an edge case here to solve where if the last + # frame before market close (at least on ib) was pushed and + # there was only "1 new" row pushed from the first backfill + # query-iteration, then the sample step sizing calcs will + # break upstream from here since you can't diff on at least + # 2 steps... probably should also add logic to compute from + # the tsdb series and stash that somewhere as meta data on + # the shm buffer?.. no se. + + # unblock the feed bus management task + assert len(shms[1].array) + task_status.started(( + shms[60], + shms[1], + )) + + # sync to backend history task's query/load completion + await bf_done.wait() + + for timeframe, shm in shms.items(): + ( + tsdb_history, + last_tsdb_dt, + latest_start_dt, + latest_end_dt, + ) = dts_per_tf[timeframe] + + # do diff against last start frame of history and only fill + # in from the tsdb an allotment that allows for most recent + # to be loaded into mem *before* tsdb data. + if last_tsdb_dt: + dt_diff_s = ( + latest_start_dt - last_tsdb_dt + ).seconds + else: + dt_diff_s = 0 + + # TODO: see if there's faster multi-field reads: + # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields + # re-index with a `time` and index field + prepend_start = shm._first.value + + # sanity check on most-recent-data loading + assert prepend_start > dt_diff_s + + if tsdb_history and len(tsdb_history): + to_push = tsdb_history[:prepend_start] + + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map=marketstore.ohlc_key_map, + ) + + # load as much from storage into shm as space will + # allow according to user's shm size settings. + end = tsdb_history['Epoch'][0] + + while shm._first.value > 0: + series = await storage.read_ohlcv( + fqsn, + end=end, + timeframe=timeframe, + ) + prepend_start -= len(to_push) + to_push = tsdb_history[:prepend_start] + + shm.push( + to_push, + + # insert the history pre a "days worth" of samples + # to leave some real-time buffer space at the end. + prepend=True, + # update_first=False, + # start=prepend_start, + field_map=marketstore.ohlc_key_map, + ) + + # manually trigger step update to update charts/fsps + # which need an incremental update. + # NOTE: the way this works is super duper + # un-intuitive right now: + # - the broadcaster fires a msg to the fsp subsystem. + # - fsp subsys then checks for a sample step diff and + # possibly recomputes prepended history. + # - the fsp then sends back to the parent actor + # (usually a chart showing graphics for said fsp) + # which tells the chart to conduct a manual full + # graphics loop cycle. + for delay_s in sampler.subscribers: + await broadcast(delay_s) + + log.info(f'Loaded {to_push.shape} datums from storage') + + # TODO: write new data to tsdb to be ready to for next read. + + async def manage_history( mod: ModuleType, bus: _FeedsBus, @@ -469,7 +653,9 @@ async def manage_history( feed_is_live: trio.Event, timeframe: float = 60, # in seconds - task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[ + tuple[ShmArray, ShmArray] + ] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -490,6 +676,8 @@ async def manage_history( # we expect the sub-actor to write readonly=False, ) + hist_zero_index = hist_shm.index - 1 + # TODO: history validation if not opened: raise RuntimeError( @@ -506,184 +694,93 @@ async def manage_history( readonly=False, size=3*_secs_in_day, ) + + # (for now) set the rt (hft) shm array with space to prepend + # only a days worth of 1s history. + days = 1 + rt_shm._first.value = days*_secs_in_day + rt_shm._last.value = days*_secs_in_day + rt_zero_index = rt_shm.index - 1 + if not opened: raise RuntimeError( "Persistent shm for sym was already open?!" ) log.info('Scanning for existing `marketstored`') - - is_up = await check_for_service('marketstored') - - # for now only do backfilling if no tsdb can be found - do_legacy_backfill = not is_up and opened + tsdb_is_up = await check_for_service('marketstored') bfqsn = fqsn.replace('.' + mod.name, '') open_history_client = getattr(mod, 'open_history_client', None) assert open_history_client - if is_up and opened and open_history_client: - + if ( + tsdb_is_up + and opened + and open_history_client + ): log.info('Found existing `marketstored`') + from . import marketstore - async with marketstore.open_storage_client( - fqsn, - ) as storage: - - # TODO: this should be used verbatim for the pure - # shm backfiller approach below. - - # start history anal and load missing new data via backend. - series, _, last_tsdb_dt = await storage.load( + async with ( + marketstore.open_storage_client(fqsn)as storage, + ): + hist_shm, rt_shm = await bus.nursery.start( + tsdb_backfill, + mod, + marketstore, + bus, + storage, fqsn, - timeframe=timeframe, + bfqsn, + { + 1: rt_shm, + 60: hist_shm, + }, + # some_data_ready=some_data_ready, + # task_status=task_status, ) - broker, symbol, expiry = unpack_fqsn(fqsn) - ( - latest_start_dt, - latest_end_dt, - bf_done, - ) = await bus.nursery.start( - partial( - start_backfill, - mod, - bfqsn, - hist_shm, - timeframe=timeframe, - last_tsdb_dt=last_tsdb_dt, - tsdb_is_up=True, - storage=storage, - ) - ) + # yield back after client connect with filled shm + task_status.started(( + hist_zero_index, + hist_shm, + rt_zero_index, + rt_shm, + )) - # if len(hist_shm.array) < 2: - # TODO: there's an edge case here to solve where if the last - # frame before market close (at least on ib) was pushed and - # there was only "1 new" row pushed from the first backfill - # query-iteration, then the sample step sizing calcs will - # break upstream from here since you can't diff on at least - # 2 steps... probably should also add logic to compute from - # the tsdb series and stash that somewhere as meta data on - # the shm buffer?.. no se. - - task_status.started((hist_shm, rt_shm)) + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. some_data_ready.set() - await bf_done.wait() - # do diff against last start frame of history and only fill - # in from the tsdb an allotment that allows for most recent - # to be loaded into mem *before* tsdb data. - if last_tsdb_dt: - dt_diff_s = ( - latest_start_dt - last_tsdb_dt - ).seconds - else: - dt_diff_s = 0 + # history retreival loop depending on user interaction and thus + # a small RPC-prot for remotely controllinlg what data is loaded + # for viewing. + await trio.sleep_forever() - # TODO: see if there's faster multi-field reads: - # https://numpy.org/doc/stable/user/basics.rec.html#accessing-multiple-fields - # re-index with a `time` and index field - prepend_start = hist_shm._first.value - - # sanity check on most-recent-data loading - assert prepend_start > dt_diff_s - - history = list(series.values()) - if history: - fastest = history[0] - to_push = fastest[:prepend_start] - - hist_shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - # start=prepend_start, - field_map=marketstore.ohlc_key_map, - ) - - # load as much from storage into shm as space will - # allow according to user's shm size settings. - count = 0 - end = fastest['Epoch'][0] - - while hist_shm._first.value > 0: - count += 1 - series = await storage.read_ohlcv( - fqsn, - end=end, - timeframe=timeframe, - ) - history = list(series.values()) - fastest = history[0] - end = fastest['Epoch'][0] - prepend_start -= len(to_push) - to_push = fastest[:prepend_start] - - hist_shm.push( - to_push, - - # insert the history pre a "days worth" of samples - # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - # start=prepend_start, - field_map=marketstore.ohlc_key_map, - ) - - # manually trigger step update to update charts/fsps - # which need an incremental update. - # NOTE: the way this works is super duper - # un-intuitive right now: - # - the broadcaster fires a msg to the fsp subsystem. - # - fsp subsys then checks for a sample step diff and - # possibly recomputes prepended history. - # - the fsp then sends back to the parent actor - # (usually a chart showing graphics for said fsp) - # which tells the chart to conduct a manual full - # graphics loop cycle. - for delay_s in sampler.subscribers: - await broadcast(delay_s) - - if count > 6: - break - - log.info(f'Loaded {to_push.shape} datums from storage') - - # TODO: write new data to tsdb to be ready to for next read. - - if do_legacy_backfill: - # do a legacy incremental backfill from the provider. - log.info('No existing `marketstored` found..') - - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - await bus.nursery.start( - partial( - start_backfill, - mod, - bfqsn, - hist_shm, - timeframe=timeframe, - ) + # load less history if no tsdb can be found + elif ( + not tsdb_is_up + and opened + ): + await basic_backfill( + bus, + mod, + bfqsn, + shms={ + 1: rt_shm, + 60: hist_shm, + }, ) - - # yield back after client connect with filled shm - task_status.started((hist_shm, rt_shm)) - - # indicate to caller that feed can be delivered to - # remote requesting client since we've loaded history - # data that can be used. + task_status.started(( + hist_zero_index, + hist_shm, + rt_zero_index, + rt_shm, + )) some_data_ready.set() - - # history retreival loop depending on user interaction and thus - # a small RPC-prot for remotely controllinlg what data is loaded - # for viewing. - await trio.sleep_forever() + await trio.sleep_forever() async def allocate_persistent_feed( @@ -750,7 +847,12 @@ async def allocate_persistent_feed( # https://github.com/python-trio/trio/issues/2258 # bus.nursery.start_soon( # await bus.start_task( - hist_shm, rt_shm = await bus.nursery.start( + ( + izero_hist, + hist_shm, + izero_rt, + rt_shm, + ) = await bus.nursery.start( manage_history, mod, bus, @@ -764,7 +866,9 @@ async def allocate_persistent_feed( # this task. msg = init_msg[symbol] msg['hist_shm_token'] = hist_shm.token - msg['startup_hist_index'] = hist_shm.index - 1 + # msg['startup_hist_index'] = hist_shm.index - 1 + msg['izero_hist'] = izero_hist + msg['izero_rt'] = izero_rt msg['rt_shm_token'] = rt_shm.token # true fqsn @@ -794,31 +898,19 @@ async def allocate_persistent_feed( fqsn: first_quote, } + # for ambiguous names we simply apply the retreived + # feed to that name (for now). bus.feeds[symbol] = bus.feeds[bfqsn] = ( init_msg, generic_first_quotes, ) - # for ambiguous names we simply apply the retreived - # feed to that name (for now). + # insert 1s ohlc into the increment buffer set + # to update and shift every second sampler.ohlcv_shms.setdefault( 1, [] ).append(rt_shm) - ohlckeys = ['open', 'high', 'low', 'close'] - - # set the rt (hft) shm array as append only - # (for now). - rt_shm._first.value = 0 - rt_shm._last.value = 0 - - # push last sample from history to rt buffer just as a filler datum - # but we don't want a history sized datum outlier so set vlm to zero - # and ohlc to the close value. - rt_shm.push(hist_shm.array[-2:-1]) - - rt_shm.array[ohlckeys] = hist_shm.array['close'][-1] - rt_shm._array['volume'] = 0 task_status.started() @@ -829,16 +921,12 @@ async def allocate_persistent_feed( # the backend will indicate when real-time quotes have begun. await feed_is_live.wait() - # start shm incrementer task for OHLC style sampling - # at the current detected step period. - times = hist_shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - sampler.ohlcv_shms.setdefault(delay_s, []).append(hist_shm) + # insert 1m ohlc into the increment buffer set + # to shift every 60s. + sampler.ohlcv_shms.setdefault(60, []).append(hist_shm) # create buffer a single incrementer task broker backend # (aka `brokerd`) using the lowest sampler period. - # await tractor.breakpoint() - # for delay_s in sampler.ohlcv_shms: if sampler.incrementers.get(_default_delay_s) is None: await bus.start_task( increment_ohlc_buffer, @@ -849,7 +937,8 @@ async def allocate_persistent_feed( 'shm_write_opts', {} ).get('sum_tick_vlm', True) - # start sample loop + # start sample loop and shm incrementer task for OHLC style sampling + # at the above registered step periods. try: await sample_and_broadcast( bus, @@ -1037,7 +1126,8 @@ class Feed: stream: trio.abc.ReceiveChannel[dict[str, Any]] status: dict[str, Any] - startup_hist_index: int = 0 + izero_hist: int = 0 + izero_rt: int = 0 throttle_rate: Optional[int] = None @@ -1055,7 +1145,7 @@ class Feed: async def receive(self) -> dict: return await self.stream.receive() - @asynccontextmanager + @acm async def index_stream( self, delay_s: int = 1, @@ -1116,7 +1206,7 @@ class Feed: ) -@asynccontextmanager +@acm async def install_brokerd_search( portal: tractor.Portal, @@ -1150,7 +1240,7 @@ async def install_brokerd_search( yield -@asynccontextmanager +@acm async def open_feed( fqsns: list[str], @@ -1226,7 +1316,8 @@ async def open_feed( stream=stream, _portal=portal, status={}, - startup_hist_index=init['startup_hist_index'], + izero_hist=init['izero_hist'], + izero_rt=init['izero_rt'], throttle_rate=tick_throttle, ) @@ -1278,7 +1369,7 @@ async def open_feed( await ctx.cancel() -@asynccontextmanager +@acm async def maybe_open_feed( fqsns: list[str],