diff --git a/piker/data/feed.py b/piker/data/feed.py index 46151a89..ea3f5e1d 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -278,7 +278,6 @@ async def start_backfill( timeframe, end_dt=None, ) - times = array['time'] # sample period step size in seconds @@ -336,11 +335,15 @@ async def start_backfill( if tsdb_is_up: # do a decently sized backfill and load it into storage. periods = { - 1: {'days': 1}, + 1: {'days': 6}, 60: {'years': 6}, } kwargs = periods[step_size_s] + + # NOTE: manually set the "latest" datetime which we intend to + # backfill history "until" so as to adhere to the history + # settings above when the tsdb is detected as being empty. last_tsdb_dt = start_dt.subtract(**kwargs) # configure async query throttling @@ -348,31 +351,24 @@ async def start_backfill( # XXX: legacy from ``trimeter`` code but unsupported now. # erlangs = config.get('erlangs', 1) + # avoid duplicate history frames with a set of datetime frame + # starts. + starts: set[datetime] = set() + # inline sequential loop where we simply pass the # last retrieved start dt to the next request as # it's end dt. - starts: set[datetime] = set() while start_dt > last_tsdb_dt: + log.info( + f'Requesting {step_size_s}s frame ending in {start_dt}' + ) + try: - log.info( - f'Requesting {step_size_s}s frame ending in {start_dt}' - ) array, next_start_dt, end_dt = await hist( timeframe, end_dt=start_dt, ) - if next_start_dt in starts: - start_dt = min(starts) - print("SKIPPING DUPLICATE FRAME @ {next_start_dt}") - continue - - # only update new start point if new - start_dt = next_start_dt - starts.add(start_dt) - - assert array['time'][0] == start_dt.timestamp() - except NoData: # XXX: unhandled history gap (shouldn't happen?) log.warning( @@ -392,6 +388,17 @@ async def start_backfill( # request loop until the condition is resolved? return + if next_start_dt in starts: + start_dt = min(starts) + print("SKIPPING DUPLICATE FRAME @ {next_start_dt}") + continue + + # only update new start point if not-yet-seen + start_dt = next_start_dt + starts.add(start_dt) + + assert array['time'][0] == start_dt.timestamp() + diff = end_dt - start_dt frame_time_diff_s = diff.seconds expected_frame_size_s = frame_size_s + step_size_s @@ -462,7 +469,6 @@ async def start_backfill( # short-circuit (for now) bf_done.set() - return async def basic_backfill( @@ -480,15 +486,19 @@ async def basic_backfill( # a required backend func this must block until shm is # filled with first set of ohlc bars for timeframe, shm in shms.items(): - await bus.nursery.start( - partial( - start_backfill, - mod, - bfqsn, - shm, - timeframe=timeframe, + try: + await bus.nursery.start( + partial( + start_backfill, + mod, + bfqsn, + shm, + timeframe=timeframe, + ) ) - ) + except DataUnavailable: + # XXX: timeframe not supported for backend + continue async def tsdb_backfill( @@ -500,7 +510,6 @@ async def tsdb_backfill( bfqsn: str, shms: dict[int, ShmArray], - # some_data_ready: trio.Event, task_status: TaskStatus[ tuple[ShmArray, ShmArray] ] = trio.TASK_STATUS_IGNORED, @@ -513,30 +522,42 @@ async def tsdb_backfill( # start history anal and load missing new data via backend. for timeframe, shm in shms.items(): - series, _, last_tsdb_dt = await storage.load( + tsdb_history, first_tsdb_dt, last_tsdb_dt = await storage.load( fqsn, timeframe=timeframe, ) broker, symbol, expiry = unpack_fqsn(fqsn) - ( - latest_start_dt, - latest_end_dt, - bf_done, - ) = await bus.nursery.start( - partial( - start_backfill, - mod, - bfqsn, - shm, - timeframe=timeframe, - last_tsdb_dt=last_tsdb_dt, - tsdb_is_up=True, - storage=storage, + try: + ( + latest_start_dt, + latest_end_dt, + bf_done, + ) = await bus.nursery.start( + partial( + start_backfill, + mod, + bfqsn, + shm, + timeframe=timeframe, + last_tsdb_dt=last_tsdb_dt, + tsdb_is_up=True, + storage=storage, + ) ) - ) + except DataUnavailable: + # XXX: timeframe not supported for backend + dts_per_tf[timeframe] = ( + tsdb_history, + last_tsdb_dt, + None, + None, + ) + continue + + # tsdb_history = series.get(timeframe) dts_per_tf[timeframe] = ( - series.get(timeframe), + tsdb_history, last_tsdb_dt, latest_start_dt, latest_end_dt, @@ -553,7 +574,7 @@ async def tsdb_backfill( # the shm buffer?.. no se. # unblock the feed bus management task - assert len(shms[1].array) + # assert len(shms[1].array) task_status.started(( shms[60], shms[1], @@ -562,6 +583,11 @@ async def tsdb_backfill( # sync to backend history task's query/load completion await bf_done.wait() + # Load tsdb history into shm buffer (for display). + + # TODO: eventually it'd be nice to not require a shm array/buffer + # to accomplish this.. maybe we can do some kind of tsdb direct to + # graphics format eventually in a child-actor? for timeframe, shm in shms.items(): ( tsdb_history, @@ -573,7 +599,7 @@ async def tsdb_backfill( # do diff against last start frame of history and only fill # in from the tsdb an allotment that allows for most recent # to be loaded into mem *before* tsdb data. - if last_tsdb_dt: + if last_tsdb_dt and latest_start_dt: dt_diff_s = ( latest_start_dt - last_tsdb_dt ).seconds @@ -588,9 +614,10 @@ async def tsdb_backfill( # sanity check on most-recent-data loading assert prepend_start > dt_diff_s - if tsdb_history and len(tsdb_history): + if ( + len(tsdb_history) + ): to_push = tsdb_history[:prepend_start] - shm.push( to_push, @@ -601,46 +628,63 @@ async def tsdb_backfill( # start=prepend_start, field_map=marketstore.ohlc_key_map, ) + prepend_start = shm._first.value - # load as much from storage into shm as space will - # allow according to user's shm size settings. - end = tsdb_history['Epoch'][0] + # load as much from storage into shm as space will + # allow according to user's shm size settings. + last_frame_start = tsdb_history['Epoch'][0] - while shm._first.value > 0: - series = await storage.read_ohlcv( - fqsn, - end=end, - timeframe=timeframe, - ) - prepend_start -= len(to_push) - to_push = tsdb_history[:prepend_start] + while ( + shm._first.value > 0 + # and frame_start < last_frame_start + ): + tsdb_history = await storage.read_ohlcv( + fqsn, + end=last_frame_start, + timeframe=timeframe, + ) + if ( + not len(tsdb_history) + ): + # on empty db history + break - shm.push( - to_push, + time = tsdb_history['Epoch'] + frame_start = time[0] + frame_end = time[0] + print(f"LOADING MKTS HISTORY: {frame_start} - {frame_end}") + + if frame_start >= last_frame_start: + # no new data loaded was from tsdb, so we can exit. + break + + prepend_start = shm._first.value + to_push = tsdb_history[:prepend_start] # insert the history pre a "days worth" of samples # to leave some real-time buffer space at the end. - prepend=True, - # update_first=False, - # start=prepend_start, - field_map=marketstore.ohlc_key_map, - ) + shm.push( + to_push, + prepend=True, + field_map=marketstore.ohlc_key_map, + ) + last_frame_start = frame_start - # manually trigger step update to update charts/fsps - # which need an incremental update. - # NOTE: the way this works is super duper - # un-intuitive right now: - # - the broadcaster fires a msg to the fsp subsystem. - # - fsp subsys then checks for a sample step diff and - # possibly recomputes prepended history. - # - the fsp then sends back to the parent actor - # (usually a chart showing graphics for said fsp) - # which tells the chart to conduct a manual full - # graphics loop cycle. - for delay_s in sampler.subscribers: - await broadcast(delay_s) + log.info(f'Loaded {to_push.shape} datums from storage') - log.info(f'Loaded {to_push.shape} datums from storage') + # manually trigger step update to update charts/fsps + # which need an incremental update. + # NOTE: the way this works is super duper + # un-intuitive right now: + # - the broadcaster fires a msg to the fsp subsystem. + # - fsp subsys then checks for a sample step diff and + # possibly recomputes prepended history. + # - the fsp then sends back to the parent actor + # (usually a chart showing graphics for said fsp) + # which tells the chart to conduct a manual full + # graphics loop cycle. + for delay_s in sampler.subscribers: + await broadcast(delay_s) # TODO: write new data to tsdb to be ready to for next read. @@ -692,14 +736,15 @@ async def manage_history( # we expect the sub-actor to write readonly=False, - size=3*_secs_in_day, + size=4*_secs_in_day, ) # (for now) set the rt (hft) shm array with space to prepend - # only a days worth of 1s history. - days = 1 - rt_shm._first.value = days*_secs_in_day - rt_shm._last.value = days*_secs_in_day + # only a few days worth of 1s history. + days = 3 + start_index = days*_secs_in_day + rt_shm._first.value = start_index + rt_shm._last.value = start_index rt_zero_index = rt_shm.index - 1 if not opened: @@ -737,8 +782,6 @@ async def manage_history( 1: rt_shm, 60: hist_shm, }, - # some_data_ready=some_data_ready, - # task_status=task_status, ) # yield back after client connect with filled shm @@ -866,7 +909,6 @@ async def allocate_persistent_feed( # this task. msg = init_msg[symbol] msg['hist_shm_token'] = hist_shm.token - # msg['startup_hist_index'] = hist_shm.index - 1 msg['izero_hist'] = izero_hist msg['izero_rt'] = izero_rt msg['rt_shm_token'] = rt_shm.token @@ -937,6 +979,16 @@ async def allocate_persistent_feed( 'shm_write_opts', {} ).get('sum_tick_vlm', True) + # NOTE: if no high-freq sampled data has (yet) been loaded, + # seed the buffer with a history datum - this is most handy + # for many backends which don't sample @ 1s OHLC but do have + # slower data such as 1m OHLC. + if not len(rt_shm.array): + rt_shm.push(hist_shm.array[-3:-1]) + ohlckeys = ['open', 'high', 'low', 'close'] + rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1] + rt_shm.array['volume'][-2] = 0 + # start sample loop and shm incrementer task for OHLC style sampling # at the above registered step periods. try: