Compare commits

..

3 Commits

Author SHA1 Message Date
Gud Boi 176090b234 Add vlm-based "smart" OHLCV de-duping & bar validation
Using `claude`, add a `.tsp._dedupe_smart` module that attemps "smarter"
duplicate bars by attempting to distinguish between erroneous bars
partially written during concurrent backfill race conditions vs.
**actual** data quality issues from historical providers.

Problem:
--------
Concurrent writes (live updates vs. backfilling) can result in create
duplicate timestamped ohlcv vars with different values. Some
potential scenarios include,

- a market live feed is cancelled during live update resulting in the
  "last" datum being partially updated with all the ticks for the
  time step.
- when the feed is rebooted during charting, the backfiller will not
  finalize this bar since rn it presumes it should only fill data for
  time steps not already in the tsdb storage.

Our current naive  `.unique()` approach obvi keeps the incomplete bar
and a "smarter" approach is to compare the provider's final vlm
amount vs. the maybe-cancelled tsdb's bar; a higher vlm value from
the provider likely indicates the cancelled-during-live-write and
**not** a datum discrepancy from said data provider.

Analysis (with `claude`) of `zecusdt` data revealed:
- 1000 duplicate timestamps
- 999 identical bars (pure duplicates from 2022 backfill overlap)
- 1 volume-monotonic conflict (live partial vs backfill complete)

A soln from `claude` -> `tsp._dedupe_smart.dedupe_ohlcv_smart()`
which:
- sorts by vlm **before** deduplication and keep the most complete
  bar based on vlm monotonicity as well as the following OHLCV
  validation assumptions:
  * volume should always increase
  * high should be non-decreasing,
  * low should be non-increasing
  * open should be identical
- Separates valid race conditions from provider data quality issues
  and reports and returns both dfs.

Change summary by `claude`:
- `.tsp._dedupe_smart`: new module with validation logic
- `.tsp.__init__`: expose `dedupe_ohlcv_smart()`
- `.storage.cli`: integrate smart dedupe, add logging for:
  * duplicate counts (identical vs monotonic races)
  * data quality violations (non-monotonic, invalid OHLC ranges)
  * warnings for provider data issues
- Remove `assert not diff` (duplicates are valid now)

Verified on `zecusdt`: correctly keeps index 3143645
(volume=287.777) over 3143644 (volume=140.299) for
conflicting 2026-01-16 18:54 UTC bar.

`claude`'s Summary of reasoning
-------------------------------
- volume monotonicity is critical: a bar's volume only increases
  during its time window.
- a backfilled bar should always have volume >= live updated.
- violations indicate any of:
  * Provider data corruption
  * Non-OHLCV aggregation semantics
  * Timestamp misalignment

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-18 21:00:17 -05:00
Gud Boi f3530b2f6b Add `pexpect`-based `pdbp`-REPL offline helper
Add a new `snippets/claude_debug_helper.py` to
provide a programmatic interface to `tractor.pause()` debugger
sessions for incremental data inspection matching the interactive UX
but able to be run by `claude` "offline" since it can't seem to feed
stdin (so it claims) to the `pdb` instance due to lack of ability to
allocate a tty internally.

The script-wrapper is based on `tractor`'s `tests/devx/` suite's use of
`pexpect` patterns for driving `pdbp` prompts and thus enables
automated-offline execution of REPL-inspection commands **without**
using incremental-realtime output capture (like a human would use it).

Features:
- `run_pdb_commands()`: batch command execution
- `InteractivePdbSession`: context manager for step-by-step REPL interaction
- `expect()` wrapper: timeout handling with buffer display
- Proper stdin/stdout handling via `pexpect.spawn()`

Example usage:
```python
from debug_helper import InteractivePdbSession

with InteractivePdbSession(
    cmd='piker store ldshm zecusdt.usdtm.perp.binance'
) as session:
    session.run('deduped.shape')
    session.run('step_gaps.shape')
```

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-18 18:18:34 -05:00
Gud Boi 2d4d7cca57 Fix polars 1.36.0 duration API
Polars tightened type safety for `.dt` accessor methods requiring
`total_*` methods for duration types vs datetime component accessors
like `day()` which now only work on datetime dtypes.

`detect_time_gaps()` in `.tsp._anal` was calling `.dt.day()`
on `dt_diff` column (a duration from `.diff()`) which throws
`InvalidOperationError` on modern polars.

Changes:
- use f-string to add pluralization to map time unit strings to
  `total_<unit>s` form for the new duration API.
- Handle singular/plural forms: 'day' -> 'days' -> 'total_days'
- Ensure trailing 's' before applying 'total_' prefix

Also updates inline comments explaining the polars type distinction
between datetime components vs duration totals.

Fixes `piker store ldshm` crashes on datasets with time gaps.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-18 14:19:51 -05:00
5 changed files with 514 additions and 6 deletions

View File

@ -441,11 +441,37 @@ def ldshm(
wdts,
deduped,
diff,
) = tsp.dedupe(
valid_races,
dq_issues,
) = tsp.dedupe_ohlcv_smart(
shm_df,
period=period_s,
)
# Report duplicate analysis
if diff > 0:
log.info(
f'Removed {diff} duplicate timestamp(s)\n'
)
if valid_races is not None:
identical: int = (
valid_races
.filter(pl.col('identical_bars'))
.height
)
monotonic: int = valid_races.height - identical
log.info(
f'Valid race conditions: {valid_races.height}\n'
f' - Identical bars: {identical}\n'
f' - Volume monotonic: {monotonic}\n'
)
if dq_issues is not None:
log.warning(
f'DATA QUALITY ISSUES from provider: '
f'{dq_issues.height} timestamp(s)\n'
f'{dq_issues}\n'
)
# detect gaps from in expected (uniform OHLC) sample period
step_gaps: pl.DataFrame = tsp.detect_time_gaps(
deduped,
@ -460,7 +486,8 @@ def ldshm(
# TODO: actually pull the exact duration
# expected for each venue operational period?
gap_dt_unit='days',
# gap_dt_unit='day',
gap_dt_unit='day',
gap_thresh=1,
)
@ -534,8 +561,13 @@ def ldshm(
tf2aids[period_s] = aids
else:
# allow interaction even when no ts problems.
assert not diff
# No significant gaps to handle, but may have had
# duplicates removed (valid race conditions are ok)
if diff > 0 and dq_issues is not None:
log.warning(
'Found duplicates with data quality issues '
'but no significant time gaps!\n'
)
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')

View File

@ -40,6 +40,9 @@ from ._anal import (
# `numpy` only
slice_from_time as slice_from_time,
)
from ._dedupe_smart import (
dedupe_ohlcv_smart as dedupe_ohlcv_smart,
)
from ._history import (
iter_dfs_from_shms as iter_dfs_from_shms,
manage_history as manage_history,

View File

@ -578,11 +578,22 @@ def detect_time_gaps(
# NOTE: this flag is to indicate that on this (sampling) time
# scale we expect to only be filtering against larger venue
# closures-scale time gaps.
#
# Map to total_ method since `dt_diff` is a duration type,
# not datetime - modern polars requires `total_*` methods
# for duration types (e.g. `total_days()` not `day()`)
# Ensure plural form for polars API (e.g. 'day' -> 'days')
unit_plural: str = (
gap_dt_unit
if gap_dt_unit.endswith('s')
else f'{gap_dt_unit}s'
)
duration_method: str = f'total_{unit_plural}'
return step_gaps.filter(
# Second by an arbitrary dt-unit step size
getattr(
pl.col('dt_diff').dt,
gap_dt_unit,
duration_method,
)().abs() > gap_thresh
)

View File

@ -0,0 +1,206 @@
'''
Smart OHLCV deduplication with data quality validation.
Handles concurrent write conflicts by keeping the most complete bar
(highest volume) while detecting data quality anomalies.
'''
import polars as pl
from ._anal import with_dts
def dedupe_ohlcv_smart(
src_df: pl.DataFrame,
time_col: str = 'time',
volume_col: str = 'volume',
sort: bool = True,
) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # deduped (keeping higher volume bars)
int, # count of dupes removed
pl.DataFrame|None, # valid race conditions
pl.DataFrame|None, # data quality violations
]:
'''
Smart OHLCV deduplication keeping most complete bars.
For duplicate timestamps, keeps bar with highest volume under
the assumption that higher volume indicates more complete/final
data from backfill vs partial live updates.
Returns
-------
Tuple of:
- wdts: original dataframe with datetime columns added
- deduped: deduplicated frame keeping highest-volume bars
- diff: number of duplicate rows removed
- valid_races: duplicates meeting expected race condition pattern
(volume monotonic, OHLC ranges valid)
- data_quality_issues: duplicates violating expected relationships
indicating provider data problems
'''
wdts: pl.DataFrame = with_dts(src_df)
# Find duplicate timestamps
dupes: pl.DataFrame = wdts.filter(
pl.col(time_col).is_duplicated()
)
if dupes.is_empty():
# No duplicates, return as-is
return (wdts, wdts, 0, None, None)
# Analyze duplicate groups for validation
dupe_analysis: pl.DataFrame = (
dupes
.sort([time_col, 'index'])
.group_by(time_col, maintain_order=True)
.agg([
pl.col('index').alias('indices'),
pl.col('volume').alias('volumes'),
pl.col('high').alias('highs'),
pl.col('low').alias('lows'),
pl.col('open').alias('opens'),
pl.col('close').alias('closes'),
pl.col('dt').first().alias('dt'),
pl.len().alias('count'),
])
)
# Validate OHLCV monotonicity for each duplicate group
def check_ohlcv_validity(row) -> dict[str, bool]:
'''
Check if duplicate bars follow expected race condition pattern.
For a valid live-update backfill race:
- volume should be monotonically increasing
- high should be monotonically non-decreasing
- low should be monotonically non-increasing
- open should be identical (fixed at bar start)
Returns dict of violation flags.
'''
vols: list = row['volumes']
highs: list = row['highs']
lows: list = row['lows']
opens: list = row['opens']
violations: dict[str, bool] = {
'volume_non_monotonic': False,
'high_decreased': False,
'low_increased': False,
'open_mismatch': False,
'identical_bars': False,
}
# Check if all bars are identical (pure duplicate)
if (
len(set(vols)) == 1
and len(set(highs)) == 1
and len(set(lows)) == 1
and len(set(opens)) == 1
):
violations['identical_bars'] = True
return violations
# Check volume monotonicity
for i in range(1, len(vols)):
if vols[i] < vols[i-1]:
violations['volume_non_monotonic'] = True
break
# Check high monotonicity (can only increase or stay same)
for i in range(1, len(highs)):
if highs[i] < highs[i-1]:
violations['high_decreased'] = True
break
# Check low monotonicity (can only decrease or stay same)
for i in range(1, len(lows)):
if lows[i] > lows[i-1]:
violations['low_increased'] = True
break
# Check open consistency (should be fixed)
if len(set(opens)) > 1:
violations['open_mismatch'] = True
return violations
# Apply validation
dupe_analysis = dupe_analysis.with_columns([
pl.struct(['volumes', 'highs', 'lows', 'opens'])
.map_elements(
check_ohlcv_validity,
return_dtype=pl.Struct([
pl.Field('volume_non_monotonic', pl.Boolean),
pl.Field('high_decreased', pl.Boolean),
pl.Field('low_increased', pl.Boolean),
pl.Field('open_mismatch', pl.Boolean),
pl.Field('identical_bars', pl.Boolean),
])
)
.alias('validity')
])
# Unnest validity struct
dupe_analysis = dupe_analysis.unnest('validity')
# Separate valid races from data quality issues
valid_races: pl.DataFrame|None = (
dupe_analysis
.filter(
# Valid if no violations OR just identical bars
~pl.col('volume_non_monotonic')
& ~pl.col('high_decreased')
& ~pl.col('low_increased')
& ~pl.col('open_mismatch')
)
)
if valid_races.is_empty():
valid_races = None
data_quality_issues: pl.DataFrame|None = (
dupe_analysis
.filter(
# Issues if any non-identical violation exists
(
pl.col('volume_non_monotonic')
| pl.col('high_decreased')
| pl.col('low_increased')
| pl.col('open_mismatch')
)
& ~pl.col('identical_bars')
)
)
if data_quality_issues.is_empty():
data_quality_issues = None
# Deduplicate: keep highest volume bar for each timestamp
deduped: pl.DataFrame = (
wdts
.sort([time_col, volume_col])
.unique(
subset=[time_col],
keep='last',
maintain_order=False,
)
)
# Re-sort by time or index
if sort:
deduped = deduped.sort(by=time_col)
diff: int = wdts.height - deduped.height
return (
wdts,
deduped,
diff,
valid_races,
data_quality_issues,
)

View File

@ -0,0 +1,256 @@
#!/usr/bin/env python
'''
Programmatic debugging helper for `pdbp` REPL human-like
interaction but built to allow `claude` to interact with
crashes and `tractor.pause()` breakpoints along side a human dev.
Originally written by `clauded` during a backfiller inspection
session with @goodboy trying to resolve duplicate/gappy ohlcv ts
issues discovered while testing the new `nativedb` tsdb.
Allows `claude` to run `pdb` commands and capture output in an "offline"
manner but generating similar output as if it was iteracting with
the debug REPL.
The use of `pexpect` is heavily based on tractor's REPL UX test
suite(s), namely various `tests/devx/test_debugger.py` patterns.
'''
import sys
import os
import time
import pexpect
from pexpect.exceptions import (
TIMEOUT,
EOF,
)
PROMPT: str = r'\(Pdb\+\)'
def expect(
child: pexpect.spawn,
patt: str,
**kwargs,
) -> None:
'''
Expect wrapper that prints last console data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before: str = (
str(child.before.decode())
if isinstance(child.before, bytes)
else str(child.before)
)
print(
f'TIMEOUT waiting for pattern: {patt}\n'
f'Last seen output:\n{before}'
)
raise
def run_pdb_commands(
commands: list[str],
initial_cmd: str = 'piker store ldshm xmrusdt.usdtm.perp.binance',
timeout: int = 30,
print_output: bool = True,
) -> dict[str, str]:
'''
Spawn piker process, wait for pdb prompt, execute commands.
Returns dict mapping command -> output.
'''
results: dict[str, str] = {}
# Disable colored output for easier parsing
os.environ['PYTHON_COLORS'] = '0'
# Spawn the process
if print_output:
print(f'Spawning: {initial_cmd}')
child: pexpect.spawn = pexpect.spawn(
initial_cmd,
timeout=timeout,
encoding='utf-8',
echo=False,
)
# Wait for pdb prompt
try:
expect(child, PROMPT, timeout=timeout)
if print_output:
print('Reached pdb prompt!')
# Execute each command
for cmd in commands:
if print_output:
print(f'\n>>> {cmd}')
child.sendline(cmd)
time.sleep(0.1)
# Wait for next prompt
expect(child, PROMPT, timeout=timeout)
# Capture output (everything before the prompt)
output: str = (
str(child.before.decode())
if isinstance(child.before, bytes)
else str(child.before)
)
results[cmd] = output
if print_output:
print(output)
# Quit debugger gracefully
child.sendline('quit')
try:
child.expect(EOF, timeout=5)
except (TIMEOUT, EOF):
pass
except TIMEOUT as e:
print(f'Timeout: {e}')
if child.before:
before: str = (
str(child.before.decode())
if isinstance(child.before, bytes)
else str(child.before)
)
print(f'Buffer:\n{before}')
results['_error'] = str(e)
finally:
if child.isalive():
child.close(force=True)
return results
class InteractivePdbSession:
'''
Interactive pdb session manager for incremental debugging.
'''
def __init__(
self,
cmd: str = 'piker store ldshm xmrusdt.usdtm.perp.binance',
timeout: int = 30,
):
self.cmd: str = cmd
self.timeout: int = timeout
self.child: pexpect.spawn|None = None
self.history: list[tuple[str, str]] = []
def start(self) -> None:
'''
Start the piker process and wait for first prompt.
'''
os.environ['PYTHON_COLORS'] = '0'
print(f'Starting: {self.cmd}')
self.child = pexpect.spawn(
self.cmd,
timeout=self.timeout,
encoding='utf-8',
echo=False,
)
# Wait for initial prompt
expect(self.child, PROMPT, timeout=self.timeout)
print('Ready at pdb prompt!')
def run(
self,
cmd: str,
print_output: bool = True,
) -> str:
'''
Execute a single pdb command and return output.
'''
if not self.child or not self.child.isalive():
raise RuntimeError('Session not started or dead')
if print_output:
print(f'\n>>> {cmd}')
self.child.sendline(cmd)
time.sleep(0.1)
# Wait for next prompt
expect(self.child, PROMPT, timeout=self.timeout)
output: str = (
str(self.child.before.decode())
if isinstance(self.child.before, bytes)
else str(self.child.before)
)
self.history.append((cmd, output))
if print_output:
print(output)
return output
def quit(self) -> None:
'''
Exit the debugger and cleanup.
'''
if self.child and self.child.isalive():
self.child.sendline('quit')
try:
self.child.expect(EOF, timeout=5)
except (TIMEOUT, EOF):
pass
self.child.close(force=True)
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.quit()
if __name__ == '__main__':
# Example inspection commands
inspect_cmds: list[str] = [
'locals().keys()',
'type(deduped)',
'deduped.shape',
(
'step_gaps.shape '
'if "step_gaps" in locals() '
'else "N/A"'
),
(
'venue_gaps.shape '
'if "venue_gaps" in locals() '
'else "N/A"'
),
]
# Allow commands from CLI args
if len(sys.argv) > 1:
inspect_cmds = sys.argv[1:]
# Interactive session example
with InteractivePdbSession() as session:
for cmd in inspect_cmds:
session.run(cmd)
print('\n=== Session Complete ===')