Compare commits

...

4 Commits

Author SHA1 Message Date
Gud Boi 9ebb977731 Tolerate various "bad data" cases in `markup_gaps()`
Namely such that when the previous-df-row by our shm-abs-'index' doesn't
exist we ignore certain cases which are likely due to borked-but-benign
samples written to the tsdb or rt shm buffers prior.

Particularly we now ignore,
- any `dt`/`prev_dt` values which are UNIX-epoch timestamped (val of 0).
- any row-is-first-row in the df; there is no previous.
- any missing previous datum by 'index', in which case we lookup the
  `wdts` prior row and use that instead.
  * this would indicate a missing sample for the time-step but we can
    still detect a "gap" by looking at the prior row, by df-abs-index
    `i`, and use its timestamp to determine the period/size of missing
    samples (which need to likely still be retrieved).
  * in this case i'm leaving in a pause-point for introspecting these
    rarer cases when `--pdb` is passed via CLI.

Relatedly in the `piker store` CLI ep,
- add `--pdb` flag to `piker store`, pass it verbatim as `debug_mode`.
- when `times` has only a single row, don't calc a `period_s` median.
- only trace `null_segs` when in debug mode.
- always markup/dedupe gaps for `period_s==60`
2026-01-21 21:34:45 -05:00
Gud Boi 56b69f97b3 ib: up API timeout default for remote host conns 2026-01-21 20:05:07 -05:00
Gud Boi e2ff43f5c3 Fix `Qt6` types for new sub-namespaces 2026-01-21 20:02:10 -05:00
Gud Boi 0d76323a90 binance: mk `AggTrade.nq` optional..
Oof! my bad.
Turns out spot pairs don't provide the `.nq` field looks like..
i guess i should not just test `.perp.` pairs all the time!

Bp
2026-01-21 19:57:01 -05:00
4 changed files with 58 additions and 29 deletions

View File

@ -102,12 +102,12 @@ class AggTrade(Struct, frozen=True):
a: int # Aggregate trade ID
p: float # Price
q: float # Quantity with all the market trades
nq: float # Normal quantity without the trades involving RPI orders
f: int # First trade ID
l: int # noqa Last trade ID
T: int # Trade time
m: bool # Is the buyer the market maker?
M: bool | None = None # Ignore
M: bool|None = None # Ignore
nq: float|None = None # Normal quantity without the trades involving RPI orders
async def stream_messages(

View File

@ -1187,7 +1187,7 @@ async def load_aio_clients(
# the API TCP in `ib_insync` connection can be flaky af so instead
# retry a few times to get the client going..
connect_retries: int = 3,
connect_timeout: float = 10,
connect_timeout: float = 30, # in case a remote-host
disconnect_on_exit: bool = True,
) -> dict[str, Client]:

View File

@ -242,6 +242,7 @@ def anal(
trio.run(main)
# TODO, move to `.tsp._annotate`
async def markup_gaps(
fqme: str,
timeframe: float,
@ -288,18 +289,38 @@ async def markup_gaps(
)
# XXX: probably a gap in the (newly sorted or de-duplicated)
# dt-df, so we might need to re-index first..
dt: pl.Series = row['dt']
dt_prev: pl.Series = row['dt_prev']
if prev_r.is_empty():
await tractor.pause()
# XXX, filter out any special ignore cases,
# - UNIX-epoch stamped datums
# - first row
if (
dt_prev.dt.epoch()[0] == 0
or
dt.dt.epoch()[0] == 0
):
log.warning('Skipping row with UNIX epoch timestamp ??')
continue
if wdts[0]['index'][0] == iend: # first row
log.warning('Skipping first-row (has no previous obvi) !!')
continue
# XXX, if the previous-row by shm-index is missing,
# meaning there is a missing sample (set), get the prior
# row by df index and attempt to use it?
i_wdts: pl.DataFrame = wdts.with_row_index(name='i')
i_row: int = i_wdts.filter(pl.col('index') == iend)['i'][0]
prev_row_by_i = wdts[i_row]
prev_r: pl.DataFrame = prev_row_by_i
# debug any missing pre-row
if tractor._state.is_debug_mode():
await tractor.pause()
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
@ -358,6 +379,7 @@ def ldshm(
fqme: str,
write_parquet: bool = True,
reload_parquet_to_shm: bool = True,
pdb: bool = False, # --pdb passed?
) -> None:
'''
@ -377,7 +399,7 @@ def ldshm(
open_piker_runtime(
'polars_boi',
enable_modules=['piker.data._sharedmem'],
debug_mode=True,
debug_mode=pdb,
),
open_storage_client() as (
mod,
@ -397,17 +419,19 @@ def ldshm(
times: np.ndarray = shm.array['time']
d1: float = float(times[-1] - times[-2])
d2: float = float(times[-2] - times[-3])
med: float = np.median(np.diff(times))
if (
d1 < 1.
and d2 < 1.
and med < 1.
):
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
d2: float = 0
# XXX, take a median sample rate if sufficient data
if times.size > 2:
d2: float = float(times[-2] - times[-3])
med: float = np.median(np.diff(times))
if (
d1 < 1.
and d2 < 1.
and med < 1.
):
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
period_s: float = float(max(d1, d2, med))
null_segs: tuple = tsp.get_null_segs(
@ -417,7 +441,9 @@ def ldshm(
# TODO: call null-seg fixer somehow?
if null_segs:
await tractor.pause()
if tractor._state.is_debug_mode():
await tractor.pause()
# async with (
# trio.open_nursery() as tn,
# mod.open_history_client(
@ -498,8 +524,11 @@ def ldshm(
if (
not venue_gaps.is_empty()
or (
period_s < 60
and not step_gaps.is_empty()
not step_gaps.is_empty()
# XXX, i presume i put this bc i was guarding
# for ib venue gaps?
# and
# period_s < 60
)
):
# write repaired ts to parquet-file?

View File

@ -237,8 +237,8 @@ class LevelLabel(YAxisLabel):
class L1Label(LevelLabel):
text_flags = (
QtCore.Qt.TextDontClip
| QtCore.Qt.AlignLeft
QtCore.Qt.TextFlag.TextDontClip
| QtCore.Qt.AlignmentFlag.AlignLeft
)
def set_label_str(