207 lines
6.0 KiB
Python
207 lines
6.0 KiB
Python
'''
|
|
Smart OHLCV deduplication with data quality validation.
|
|
|
|
Handles concurrent write conflicts by keeping the most complete bar
|
|
(highest volume) while detecting data quality anomalies.
|
|
|
|
'''
|
|
import polars as pl
|
|
|
|
from ._anal import with_dts
|
|
|
|
|
|
def dedupe_ohlcv_smart(
|
|
src_df: pl.DataFrame,
|
|
time_col: str = 'time',
|
|
volume_col: str = 'volume',
|
|
sort: bool = True,
|
|
|
|
) -> tuple[
|
|
pl.DataFrame, # with dts
|
|
pl.DataFrame, # deduped (keeping higher volume bars)
|
|
int, # count of dupes removed
|
|
pl.DataFrame|None, # valid race conditions
|
|
pl.DataFrame|None, # data quality violations
|
|
]:
|
|
'''
|
|
Smart OHLCV deduplication keeping most complete bars.
|
|
|
|
For duplicate timestamps, keeps bar with highest volume under
|
|
the assumption that higher volume indicates more complete/final
|
|
data from backfill vs partial live updates.
|
|
|
|
Returns
|
|
-------
|
|
Tuple of:
|
|
- wdts: original dataframe with datetime columns added
|
|
- deduped: deduplicated frame keeping highest-volume bars
|
|
- diff: number of duplicate rows removed
|
|
- valid_races: duplicates meeting expected race condition pattern
|
|
(volume monotonic, OHLC ranges valid)
|
|
- data_quality_issues: duplicates violating expected relationships
|
|
indicating provider data problems
|
|
|
|
'''
|
|
wdts: pl.DataFrame = with_dts(src_df)
|
|
|
|
# Find duplicate timestamps
|
|
dupes: pl.DataFrame = wdts.filter(
|
|
pl.col(time_col).is_duplicated()
|
|
)
|
|
|
|
if dupes.is_empty():
|
|
# No duplicates, return as-is
|
|
return (wdts, wdts, 0, None, None)
|
|
|
|
# Analyze duplicate groups for validation
|
|
dupe_analysis: pl.DataFrame = (
|
|
dupes
|
|
.sort([time_col, 'index'])
|
|
.group_by(time_col, maintain_order=True)
|
|
.agg([
|
|
pl.col('index').alias('indices'),
|
|
pl.col('volume').alias('volumes'),
|
|
pl.col('high').alias('highs'),
|
|
pl.col('low').alias('lows'),
|
|
pl.col('open').alias('opens'),
|
|
pl.col('close').alias('closes'),
|
|
pl.col('dt').first().alias('dt'),
|
|
pl.len().alias('count'),
|
|
])
|
|
)
|
|
|
|
# Validate OHLCV monotonicity for each duplicate group
|
|
def check_ohlcv_validity(row) -> dict[str, bool]:
|
|
'''
|
|
Check if duplicate bars follow expected race condition pattern.
|
|
|
|
For a valid live-update → backfill race:
|
|
- volume should be monotonically increasing
|
|
- high should be monotonically non-decreasing
|
|
- low should be monotonically non-increasing
|
|
- open should be identical (fixed at bar start)
|
|
|
|
Returns dict of violation flags.
|
|
|
|
'''
|
|
vols: list = row['volumes']
|
|
highs: list = row['highs']
|
|
lows: list = row['lows']
|
|
opens: list = row['opens']
|
|
|
|
violations: dict[str, bool] = {
|
|
'volume_non_monotonic': False,
|
|
'high_decreased': False,
|
|
'low_increased': False,
|
|
'open_mismatch': False,
|
|
'identical_bars': False,
|
|
}
|
|
|
|
# Check if all bars are identical (pure duplicate)
|
|
if (
|
|
len(set(vols)) == 1
|
|
and len(set(highs)) == 1
|
|
and len(set(lows)) == 1
|
|
and len(set(opens)) == 1
|
|
):
|
|
violations['identical_bars'] = True
|
|
return violations
|
|
|
|
# Check volume monotonicity
|
|
for i in range(1, len(vols)):
|
|
if vols[i] < vols[i-1]:
|
|
violations['volume_non_monotonic'] = True
|
|
break
|
|
|
|
# Check high monotonicity (can only increase or stay same)
|
|
for i in range(1, len(highs)):
|
|
if highs[i] < highs[i-1]:
|
|
violations['high_decreased'] = True
|
|
break
|
|
|
|
# Check low monotonicity (can only decrease or stay same)
|
|
for i in range(1, len(lows)):
|
|
if lows[i] > lows[i-1]:
|
|
violations['low_increased'] = True
|
|
break
|
|
|
|
# Check open consistency (should be fixed)
|
|
if len(set(opens)) > 1:
|
|
violations['open_mismatch'] = True
|
|
|
|
return violations
|
|
|
|
# Apply validation
|
|
dupe_analysis = dupe_analysis.with_columns([
|
|
pl.struct(['volumes', 'highs', 'lows', 'opens'])
|
|
.map_elements(
|
|
check_ohlcv_validity,
|
|
return_dtype=pl.Struct([
|
|
pl.Field('volume_non_monotonic', pl.Boolean),
|
|
pl.Field('high_decreased', pl.Boolean),
|
|
pl.Field('low_increased', pl.Boolean),
|
|
pl.Field('open_mismatch', pl.Boolean),
|
|
pl.Field('identical_bars', pl.Boolean),
|
|
])
|
|
)
|
|
.alias('validity')
|
|
])
|
|
|
|
# Unnest validity struct
|
|
dupe_analysis = dupe_analysis.unnest('validity')
|
|
|
|
# Separate valid races from data quality issues
|
|
valid_races: pl.DataFrame|None = (
|
|
dupe_analysis
|
|
.filter(
|
|
# Valid if no violations OR just identical bars
|
|
~pl.col('volume_non_monotonic')
|
|
& ~pl.col('high_decreased')
|
|
& ~pl.col('low_increased')
|
|
& ~pl.col('open_mismatch')
|
|
)
|
|
)
|
|
if valid_races.is_empty():
|
|
valid_races = None
|
|
|
|
data_quality_issues: pl.DataFrame|None = (
|
|
dupe_analysis
|
|
.filter(
|
|
# Issues if any non-identical violation exists
|
|
(
|
|
pl.col('volume_non_monotonic')
|
|
| pl.col('high_decreased')
|
|
| pl.col('low_increased')
|
|
| pl.col('open_mismatch')
|
|
)
|
|
& ~pl.col('identical_bars')
|
|
)
|
|
)
|
|
if data_quality_issues.is_empty():
|
|
data_quality_issues = None
|
|
|
|
# Deduplicate: keep highest volume bar for each timestamp
|
|
deduped: pl.DataFrame = (
|
|
wdts
|
|
.sort([time_col, volume_col])
|
|
.unique(
|
|
subset=[time_col],
|
|
keep='last',
|
|
maintain_order=False,
|
|
)
|
|
)
|
|
|
|
# Re-sort by time or index
|
|
if sort:
|
|
deduped = deduped.sort(by=time_col)
|
|
|
|
diff: int = wdts.height - deduped.height
|
|
|
|
return (
|
|
wdts,
|
|
deduped,
|
|
diff,
|
|
valid_races,
|
|
data_quality_issues,
|
|
)
|