Compare commits

..

No commits in common. "ed9c211b960ae5dc9e08ce8930badd95989be378" and "6f8a361e8024ded3100598067adb35d0b6f594f2" have entirely different histories.

3 changed files with 13 additions and 57 deletions

View File

@ -275,15 +275,9 @@ async def open_history_client(
f'{times}'
)
# XXX, debug any case where the latest 1m bar we get is
# already another "sample's-step-old"..
if end_dt is None:
inow: int = round(time.time())
if (
_time_step := (inow - times[-1])
>
timeframe * 2
):
if (inow - times[-1]) > 60:
await tractor.pause()
start_dt = from_timestamp(times[0])

View File

@ -447,13 +447,7 @@ def ldshm(
)
# last chance manual overwrites in REPL
# await tractor.pause()
if not aids:
log.warning(
f'No gaps were found !?\n'
f'fqme: {fqme!r}\n'
f'timeframe: {period_s!r}\n'
f"WELL THAT'S GOOD NOOZ!\n"
)
assert aids
tf2aids[period_s] = aids
else:

View File

@ -49,7 +49,6 @@ from pendulum import (
Duration,
duration as mk_duration,
from_timestamp,
timezone,
)
import numpy as np
import polars as pl
@ -58,7 +57,9 @@ from piker.brokers import NoData
from piker.accounting import (
MktPair,
)
from piker.log import get_logger
from piker.data._util import (
log,
)
from ..data._sharedmem import (
maybe_open_shm_array,
ShmArray,
@ -96,9 +97,6 @@ if TYPE_CHECKING:
# from .feed import _FeedsBus
log = get_logger()
# `ShmArray` buffer sizing configuration:
_mins_in_day = int(60 * 24)
# how much is probably dependent on lifestyle
@ -403,9 +401,7 @@ async def start_backfill(
# based on the sample step size, maybe load a certain amount history
update_start_on_prepend: bool = False
if (
_until_was_none := (backfill_until_dt is None)
):
if backfill_until_dt is None:
# TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow
@ -439,8 +435,6 @@ async def start_backfill(
last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index
est = timezone('EST')
while last_start_dt > backfill_until_dt:
log.info(
f'Requesting {timeframe}s frame:\n'
@ -454,10 +448,9 @@ async def start_backfill(
next_end_dt,
) = await get_hist(
timeframe,
end_dt=(end_dt_param := last_start_dt),
end_dt=last_start_dt,
)
except NoData as nodata:
_nodata = nodata
except NoData as _daterr:
orig_last_start_dt: datetime = last_start_dt
gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
@ -525,32 +518,8 @@ async def start_backfill(
==
next_start_dt.timestamp()
)
assert (
(last_time := time[-1])
==
next_end_dt.timestamp()
)
frame_last_dt = from_timestamp(last_time)
if (
frame_last_dt.add(seconds=timeframe)
<
end_dt_param
):
est_frame_last_dt = est.convert(frame_last_dt)
est_end_dt_param = est.convert(end_dt_param)
log.warning(
f'Provider frame ending BEFORE requested end_dt={end_dt_param} ??\n'
f'frame_last_dt (EST): {est_frame_last_dt!r}\n'
f'end_dt_param (EST): {est_end_dt_param!r}\n'
f'\n'
f'Likely contains,\n'
f'- a venue closure.\n'
f'- (maybe?) missing data ?\n'
)
# ?TODO, check against venue closure hours
# if/when provided by backend?
await tractor.pause()
assert time[-1] == next_end_dt.timestamp()
expected_dur: Interval = (
last_start_dt.subtract(
@ -612,11 +581,10 @@ async def start_backfill(
'0 BARS TO PUSH after diff!?\n'
f'{next_start_dt} -> {last_start_dt}'
)
await tractor.pause()
# Check if we're about to exceed buffer capacity BEFORE
# attempting the push
if (next_prepend_index - ln) < 0:
if next_prepend_index - ln < 0:
log.warning(
f'Backfill would exceed buffer capacity!\n'
f'next_prepend_index: {next_prepend_index}\n'
@ -687,7 +655,7 @@ async def start_backfill(
},
})
# XXX, can't push the entire frame? so
# can't push the entire frame? so
# push only the amount that can fit..
break
@ -747,8 +715,8 @@ async def start_backfill(
) = dedupe(df)
if diff:
log.warning(
f'Found {diff!r} duplicates in tsdb! '
f'=> Overwriting with `deduped` data !! <=\n'
f'Found {diff} duplicates in tsdb, '
f'overwriting with deduped data\n'
)
await storage.write_ohlcv(
col_sym_key,