diff --git a/piker/data/history.py b/piker/data/history.py
index 048769fa..0c2ecc25 100644
--- a/piker/data/history.py
+++ b/piker/data/history.py
@@ -1,18 +1,19 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as published by
-# the Free Software Foundation, either version 3 of the License, or
-# (at your option) any later version.
+# This program is free software: you can redistribute it and/or
+# modify it under the terms of the GNU Affero General Public
+# License as published by the Free Software Foundation, either
+# version 3 of the License, or (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU Affero General Public License for more details.
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+# Affero General Public License for more details.
-# You should have received a copy of the GNU Affero General Public License
-# along with this program. If not, see .
+# You should have received a copy of the GNU Affero General Public
+# License along with this program. If not, see
+# .
'''
Historical data business logic for load, backfill and tsdb storage.
@@ -39,6 +40,7 @@ from pendulum import (
from_timestamp,
)
import numpy as np
+import polars as pl
from ..accounting import (
MktPair,
@@ -54,6 +56,7 @@ from ._source import def_iohlcv_fields
from ._sampling import (
open_sample_stream,
)
+from . import tsp
from ..brokers._util import (
DataUnavailable,
)
@@ -197,7 +200,7 @@ async def start_backfill(
# do a decently sized backfill and load it into storage.
periods = {
- 1: {'days': 6},
+ 1: {'days': 2},
60: {'years': 6},
}
period_duration: int = periods[timeframe]
@@ -246,13 +249,16 @@ async def start_backfill(
# broker says there never was or is no more history to pull
except DataUnavailable:
log.warning(
- f'NO-MORE-DATA: backend {mod.name} halted history!?'
+ f'NO-MORE-DATA: backend {mod.name} halted history:\n'
+ f'{timeframe}@{mkt.fqme}'
)
# ugh, what's a better way?
# TODO: fwiw, we probably want a way to signal a throttle
# condition (eg. with ib) so that we can halt the
# request loop until the condition is resolved?
+ if timeframe > 1:
+ await tractor.pause()
return
# TODO: drop this? see todo above..
@@ -300,9 +306,11 @@ async def start_backfill(
array,
prepend_until_dt=backfill_until_dt,
)
- ln = len(to_push)
+ ln: int = len(to_push)
if ln:
- log.info(f'{ln} bars for {next_start_dt} -> {last_start_dt}')
+ log.info(
+ f'{ln} bars for {next_start_dt} -> {last_start_dt}'
+ )
else:
log.warning(
@@ -388,14 +396,29 @@ async def start_backfill(
without_src=True,
)
else:
- col_sym_key: str = mkt.get_fqme(delim_char='')
+ col_sym_key: str = mkt.get_fqme(
+ delim_char='',
+ )
- # TODO: implement parquet append!?
await storage.write_ohlcv(
col_sym_key,
shm.array,
timeframe,
)
+ df: pl.DataFrame = await storage.as_df(
+ fqme=mkt.fqme,
+ period=timeframe,
+ load_from_offline=False,
+ )
+ (
+ df,
+ gaps,
+ deduped,
+ diff,
+ ) = tsp.dedupe(df)
+ if diff:
+ tsp.sort_diff(df)
+
else:
# finally filled gap
log.info(
@@ -634,12 +657,19 @@ async def tsdb_backfill(
async with mod.open_history_client(
mkt,
) as (get_hist, config):
- log.info(f'{mod} history client returned backfill config: {config}')
+ log.info(
+ f'`{mod}` history client returned backfill config:\n'
+ f'{config}\n'
+ )
# get latest query's worth of history all the way
# back to what is recorded in the tsdb
try:
- array, mr_start_dt, mr_end_dt = await get_hist(
+ (
+ array,
+ mr_start_dt,
+ mr_end_dt,
+ ) = await get_hist(
timeframe,
end_dt=None,
)
@@ -649,6 +679,7 @@ async def tsdb_backfill(
# there's no backfilling possible.
except DataUnavailable:
task_status.started()
+ await tractor.pause()
return
# TODO: fill in non-zero epoch time values ALWAYS!
@@ -699,9 +730,8 @@ async def tsdb_backfill(
)
except TimeseriesNotFound:
log.warning(
- f'No timeseries yet for {fqme}'
+ f'No timeseries yet for {timeframe}@{fqme}'
)
-
else:
(
tsdb_history,
@@ -784,25 +814,24 @@ async def tsdb_backfill(
f'timeframe of {timeframe} seconds..\n'
'So yuh.. dun do dat brudder.'
)
+
# if there is a gap to backfill from the first
# history frame until the last datum loaded from the tsdb
# continue that now in the background
bf_done = await tn.start(
partial(
start_backfill,
- get_hist,
- mod,
- mkt,
- shm,
- timeframe,
-
+ get_hist=get_hist,
+ mod=mod,
+ mkt=mkt,
+ shm=shm,
+ timeframe=timeframe,
backfill_from_shm_index=backfill_gap_from_shm_index,
backfill_from_dt=mr_start_dt,
-
sampler_stream=sampler_stream,
-
backfill_until_dt=last_tsdb_dt,
storage=storage,
+ write_tsdb=True,
)
)
@@ -824,8 +853,11 @@ async def tsdb_backfill(
finally:
return
- # IF we need to continue backloading incrementally from the
- # tsdb client..
+ # XXX NOTE: this is legacy from when we were using
+ # marketstore and we needed to continue backloading
+ # incrementally from the tsdb client.. (bc it couldn't
+ # handle a single large query with gRPC for some
+ # reason.. classic goolag pos)
tn.start_soon(
back_load_from_tsdb,
@@ -994,19 +1026,18 @@ async def manage_history(
log.info(f'Connected to sampler stream: {sample_stream}')
for timeframe in [60, 1]:
- await tn.start(
+ await tn.start(partial(
tsdb_backfill,
- mod,
- storemod,
- tn,
+ mod=mod,
+ storemod=storemod,
+ tn=tn,
# bus,
- client,
- mkt,
- tf2mem[timeframe],
- timeframe,
-
- sample_stream,
- )
+ storage=client,
+ mkt=mkt,
+ shm=tf2mem[timeframe],
+ timeframe=timeframe,
+ sampler_stream=sample_stream,
+ ))
# indicate to caller that feed can be delivered to
# remote requesting client since we've loaded history