hist_backfill_fixes: working-around (some) conc issues in the tsdb backfiller #62

Merged
goodboy merged 44 commits from hist_backfill_fixes into main 2026-02-23 17:22:25 +00:00
19 changed files with 3272 additions and 1730 deletions

View File

@ -275,9 +275,15 @@ async def open_history_client(
f'{times}'
)
# XXX, debug any case where the latest 1m bar we get is
# already another "sample's-step-old"..
if end_dt is None:
inow: int = round(time.time())
if (inow - times[-1]) > 60:
if (
_time_step := (inow - times[-1])
>
timeframe * 2
):
await tractor.pause()
start_dt = from_timestamp(times[0])

View File

@ -250,7 +250,9 @@ async def vnc_click_hack(
'connection': 'r'
}[reset_type]
with tractor.devx.open_crash_handler():
with tractor.devx.open_crash_handler(
ignore={TimeoutError,},
):
client = await AsyncVNCClient.connect(
VNCConfig(
host=host,
@ -331,7 +333,14 @@ def i3ipc_xdotool_manual_click_hack() -> None:
'''
focussed, matches = i3ipc_fin_wins_titled()
try:
orig_win_id = focussed.window
except AttributeError:
# XXX if .window cucks we prolly aren't intending to
# use this and/or just woke up from suspend..
log.exception('xdotool invalid usage ya ??\n')
return
try:
for name, con in matches:
print(f'Resetting data feed for {name}')

View File

@ -1187,7 +1187,7 @@ async def load_aio_clients(
# the API TCP in `ib_insync` connection can be flaky af so instead
# retry a few times to get the client going..
connect_retries: int = 3,
connect_timeout: float = 10,
connect_timeout: float = 30, # in case a remote-host
disconnect_on_exit: bool = True,
) -> dict[str, Client]:

View File

@ -262,7 +262,38 @@ async def open_history_client(
vlm = bars_array['volume']
vlm[vlm < 0] = 0
return bars_array, first_dt, last_dt
# XXX, if a start-limit was passed ensure we only
# return history that far back!
if (
start_dt
and
first_dt < start_dt
):
trimmed_bars = bars_array[
bars_array['time'] >= start_dt.timestamp()
]
if (
trimmed_first_dt := from_timestamp(trimmed_bars['time'][0])
!=
start_dt
):
# TODO! rm this once we're more confident it never hits!
breakpoint()
raise RuntimeError(
f'OHLC-bars array start is gt `start_dt` limit !!\n'
f'start_dt: {start_dt}\n'
f'first_dt: {first_dt}\n'
f'trimmed_first_dt: {trimmed_first_dt}\n'
)
# XXX, overwrite with start_dt-limited frame
bars_array = trimmed_bars
return (
bars_array,
first_dt,
last_dt,
)
# TODO: it seems like we can do async queries for ohlc
# but getting the order right still isn't working and I'm not
@ -451,6 +482,8 @@ async def get_bars(
dt_duration,
) = await proxy.bars(
fqme=fqme,
# XXX TODO! lol we're not using this..
# start_dt=start_dt,
end_dt=end_dt,
sample_period_s=timeframe,
@ -1082,6 +1115,7 @@ async def stream_quotes(
con: Contract = details.contract
first_ticker: Ticker|None = None
first_quote: dict[str, Any] = {}
timeout: float = 1.6
with trio.move_on_after(timeout) as quote_cs:
@ -1134,15 +1168,14 @@ async def stream_quotes(
first_quote,
))
# it's not really live but this will unblock
# the brokerd feed task to tell the ui to update?
feed_is_live.set()
# block and let data history backfill code run.
# XXX obvi given the venue is closed, we never expect feed
# to come up; a taskc should be the only way to
# terminate this task.
await trio.sleep_forever()
#
# ^^XXX^^TODO! INSTEAD impl a `trio.sleep()` for the
# duration until the venue opens!!
# ?TODO, we could instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
@ -1166,6 +1199,9 @@ async def stream_quotes(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
# signal `.data.feed` layer that mkt quotes are LIVE
feed_is_live.set()
cs: trio.CancelScope|None = None
startup: bool = True
iter_quotes: trio.abc.Channel
@ -1213,55 +1249,12 @@ async def stream_quotes(
tn.start_soon(reset_on_feed)
async with aclosing(iter_quotes):
# if syminfo.get('no_vlm', False):
if not init_msg.shm_write_opts['has_vlm']:
# generally speaking these feeds don't
# include vlm data.
atype: str = mkt.dst.atype
log.info(
f'No-vlm {mkt.fqme}@{atype}, skipping quote poll'
)
else:
# wait for real volume on feed (trading might be
# closed)
while True:
ticker = await iter_quotes.receive()
# for a real volume contract we rait for
# the first "real" trade to take place
if (
# not calc_price
# and not ticker.rtTime
False
# not ticker.rtTime
):
# spin consuming tickers until we
# get a real market datum
log.debug(f"New unsent ticker: {ticker}")
continue
else:
log.debug("Received first volume tick")
# ugh, clear ticks since we've
# consumed them (ahem, ib_insync is
# truly stateful trash)
# ticker.ticks = []
# XXX: this works because we don't use
# ``aclosing()`` above?
break
quote = normalize(ticker)
log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live
# quotes are now active desptie not having
# necessarily received a first vlm/clearing
# tick.
ticker = await iter_quotes.receive()
feed_is_live.set()
quote = normalize(ticker)
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})

View File

@ -80,14 +80,14 @@ class Sampler:
This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see
``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``.
`.service.maybe_open_samplerd()` and the below
`register_with_sampler()`.
'''
service_nursery: None|trio.Nursery = None
# TODO: we could stick these in a composed type to avoid
# angering the "i hate module scoped variables crowd" (yawn).
# TODO: we could stick these in a composed type to avoid angering
# the "i hate module scoped variables crowd" (yawn).
ohlcv_shms: dict[float, list[ShmArray]] = {}
# holds one-task-per-sample-period tasks which are spawned as-needed by
@ -337,7 +337,7 @@ async def register_with_sampler(
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates?
) -> None:
) -> set[int]:
get_console_log(tractor.current_actor().loglevel)
incr_was_started: bool = False
@ -364,7 +364,12 @@ async def register_with_sampler(
# insert the base 1s period (for OHLC style sampling) into
# the increment buffer set to update and shift every second.
if shms_by_period is not None:
if (
shms_by_period is not None
# and
# feed_is_live.is_set()
# ^TODO? pass it in instead?
):
from ._sharedmem import (
attach_shm_array,
_Token,
@ -378,12 +383,17 @@ async def register_with_sampler(
readonly=False,
)
shms_by_period[period] = shm
Sampler.ohlcv_shms.setdefault(period, []).append(shm)
Sampler.ohlcv_shms.setdefault(
period,
[],
).append(shm)
assert Sampler.ohlcv_shms
# unblock caller
await ctx.started(set(Sampler.ohlcv_shms.keys()))
await ctx.started(
set(Sampler.ohlcv_shms.keys())
)
if open_index_stream:
try:
@ -535,6 +545,8 @@ async def open_sample_stream(
# yield bistream
# else:
ctx: tractor.Context
shm_periods: set[int] # in `int`-seconds
async with (
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
@ -549,10 +561,10 @@ async def open_sample_stream(
'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts,
},
) as (ctx, first)
) as (ctx, shm_periods)
):
if ensure_is_active:
assert len(first) > 1
assert len(shm_periods) > 1
async with (
ctx.open_stream(

View File

@ -520,7 +520,10 @@ def open_shm_array(
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
stack = tractor.current_actor().lifetime_stack
stack = tractor.current_actor(
err_on_no_runtime=False,
).lifetime_stack
if stack:
stack.callback(shmarr.close)
stack.callback(shmarr.destroy)
@ -607,7 +610,10 @@ def attach_shm_array(
_known_tokens[key] = token
# "close" attached shm on actor teardown
tractor.current_actor().lifetime_stack.callback(sha.close)
if (actor := tractor.current_actor(
err_on_no_runtime=False,
)):
actor.lifetime_stack.callback(sha.close)
return sha

View File

@ -43,7 +43,6 @@ from typing import (
import numpy as np
from .. import config
from ..service import (
check_for_service,
@ -152,7 +151,10 @@ class StorageConnectionError(ConnectionError):
'''
def get_storagemod(name: str) -> ModuleType:
def get_storagemod(
name: str,
) -> ModuleType:
mod: ModuleType = import_module(
'.' + name,
'piker.storage',
@ -167,7 +169,10 @@ def get_storagemod(name: str) -> ModuleType:
async def open_storage_client(
backend: str|None = None,
) -> tuple[ModuleType, StorageClient]:
) -> tuple[
ModuleType,
StorageClient,
]:
'''
Load the ``StorageClient`` for named backend.
@ -267,7 +272,10 @@ async def open_tsdb_client(
from ..data.feed import maybe_open_feed
async with (
open_storage_client() as (_, storage),
open_storage_client() as (
_,
storage,
),
maybe_open_feed(
[fqme],
@ -275,7 +283,7 @@ async def open_tsdb_client(
) as feed,
):
profiler(f'opened feed for {fqme}')
profiler(f'opened feed for {fqme!r}')
# to_append = feed.hist_shm.array
# to_prepend = None

View File

@ -19,16 +19,10 @@ Storage middle-ware CLIs.
"""
from __future__ import annotations
# from datetime import datetime
# from contextlib import (
# AsyncExitStack,
# )
from pathlib import Path
from math import copysign
import time
from types import ModuleType
from typing import (
Any,
TYPE_CHECKING,
)
@ -47,7 +41,6 @@ from piker.data import (
ShmArray,
)
from piker import tsp
from piker.data._formatters import BGM
from . import log
from . import (
__tsdbs__,
@ -242,122 +235,12 @@ def anal(
trio.run(main)
async def markup_gaps(
fqme: str,
timeframe: float,
actl: AnnotCtl,
wdts: pl.DataFrame,
gaps: pl.DataFrame,
) -> dict[int, dict]:
'''
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
with rectangles.
'''
aids: dict[int] = {}
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# the gap's RIGHT-most bar's OPEN value
# at that time (sample) step.
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# dt_end_t: float = dt.timestamp()
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's LEFT-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = wdts.filter(
pl.col('index') == iend - 1
)
# XXX: probably a gap in the (newly sorted or de-duplicated)
# dt-df, so we might need to re-index first..
if prev_r.is_empty():
await tractor.pause()
istart: int = prev_r['index'][0]
# dt_start_t: float = dt_prev.timestamp()
# start_t: float = prev_r['time']
# assert (
# dt_start_t
# ==
# start_t
# )
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
# gap_w: float = abs((iend - istart))
# if gap_w < 6:
# margin: float = 6
# iend += margin
# istart -= margin
rect_gap: float = BGM*3/8
opn: float = row['open'][0]
ro: tuple[float, float] = (
# dt_end_t,
iend + rect_gap + 1,
opn,
)
cls: float = prev_r['close'][0]
lc: tuple[float, float] = (
# dt_start_t,
istart - rect_gap, # + 1 ,
cls,
)
color: str = 'dad_blue'
diff: float = cls - opn
sgn: float = copysign(1, diff)
color: str = {
-1: 'buy_green',
1: 'sell_red',
}[sgn]
rect_kwargs: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
start_pos=lc,
end_pos=ro,
color=color,
)
aid: int = await actl.add_rect(**rect_kwargs)
assert aid
aids[aid] = rect_kwargs
# tell chart to redraw all its
# graphics view layers Bo
await actl.redraw(
fqme=fqme,
timeframe=timeframe,
)
return aids
@store.command()
def ldshm(
fqme: str,
write_parquet: bool = True,
reload_parquet_to_shm: bool = True,
pdb: bool = False, # --pdb passed?
) -> None:
'''
@ -377,7 +260,7 @@ def ldshm(
open_piker_runtime(
'polars_boi',
enable_modules=['piker.data._sharedmem'],
debug_mode=True,
debug_mode=pdb,
),
open_storage_client() as (
mod,
@ -397,6 +280,9 @@ def ldshm(
times: np.ndarray = shm.array['time']
d1: float = float(times[-1] - times[-2])
d2: float = 0
# XXX, take a median sample rate if sufficient data
if times.size > 2:
d2: float = float(times[-2] - times[-3])
med: float = np.median(np.diff(times))
if (
@ -407,7 +293,6 @@ def ldshm(
raise ValueError(
f'Something is wrong with time period for {shm}:\n{times}'
)
period_s: float = float(max(d1, d2, med))
null_segs: tuple = tsp.get_null_segs(
@ -417,6 +302,8 @@ def ldshm(
# TODO: call null-seg fixer somehow?
if null_segs:
if tractor._state.is_debug_mode():
await tractor.pause()
# async with (
# trio.open_nursery() as tn,
@ -441,9 +328,35 @@ def ldshm(
wdts,
deduped,
diff,
) = tsp.dedupe(
valid_races,
dq_issues,
) = tsp.dedupe_ohlcv_smart(
shm_df,
period=period_s,
)
# Report duplicate analysis
if diff > 0:
log.info(
f'Removed {diff} duplicate timestamp(s)\n'
)
if valid_races is not None:
identical: int = (
valid_races
.filter(pl.col('identical_bars'))
.height
)
monotonic: int = valid_races.height - identical
log.info(
f'Valid race conditions: {valid_races.height}\n'
f' - Identical bars: {identical}\n'
f' - Volume monotonic: {monotonic}\n'
)
if dq_issues is not None:
log.warning(
f'DATA QUALITY ISSUES from provider: '
f'{dq_issues.height} timestamp(s)\n'
f'{dq_issues}\n'
)
# detect gaps from in expected (uniform OHLC) sample period
@ -460,7 +373,8 @@ def ldshm(
# TODO: actually pull the exact duration
# expected for each venue operational period?
gap_dt_unit='days',
# gap_dt_unit='day',
gap_dt_unit='day',
gap_thresh=1,
)
@ -471,8 +385,11 @@ def ldshm(
if (
not venue_gaps.is_empty()
or (
period_s < 60
and not step_gaps.is_empty()
not step_gaps.is_empty()
# XXX, i presume i put this bc i was guarding
# for ib venue gaps?
# and
# period_s < 60
)
):
# write repaired ts to parquet-file?
@ -521,7 +438,7 @@ def ldshm(
do_markup_gaps: bool = True
if do_markup_gaps:
new_df: pl.DataFrame = tsp.np2pl(new)
aids: dict = await markup_gaps(
aids: dict = await tsp._annotate.markup_gaps(
fqme,
period_s,
actl,
@ -530,12 +447,23 @@ def ldshm(
)
# last chance manual overwrites in REPL
# await tractor.pause()
assert aids
if not aids:
log.warning(
f'No gaps were found !?\n'
f'fqme: {fqme!r}\n'
f'timeframe: {period_s!r}\n'
f"WELL THAT'S GOOD NOOZ!\n"
)
tf2aids[period_s] = aids
else:
# allow interaction even when no ts problems.
assert not diff
# No significant gaps to handle, but may have had
# duplicates removed (valid race conditions are ok)
if diff > 0 and dq_issues is not None:
log.warning(
'Found duplicates with data quality issues '
'but no significant time gaps!\n'
)
await tractor.pause()
log.info('Exiting TSP shm anal-izer!')

File diff suppressed because it is too large Load Diff

View File

@ -275,6 +275,18 @@ def get_null_segs(
# diff of abs index steps between each zeroed row
absi_zdiff: np.ndarray = np.diff(absi_zeros)
if zero_t.size < 2:
try:
breakpoint()
except RuntimeError:
# XXX, if greenback not active from
# piker store ldshm cmd..
log.exception(
"Can't debug single-sample null!\n"
)
return None
# scan for all frame-indices where the
# zeroed-row-abs-index-step-diff is greater then the
# expected increment of 1.
@ -487,7 +499,8 @@ def iter_null_segs(
start_dt = None
if (
absi_start is not None
and start_t != 0
and
start_t != 0
):
fi_start: int = absi_start - absi_first
start_row: Seq = frame[fi_start]
@ -501,8 +514,8 @@ def iter_null_segs(
yield (
absi_start, absi_end, # abs indices
fi_start, fi_end, # relative "frame" indices
start_t, end_t,
start_dt, end_dt,
start_t, end_t, # epoch times
start_dt, end_dt, # dts
)
@ -578,11 +591,22 @@ def detect_time_gaps(
# NOTE: this flag is to indicate that on this (sampling) time
# scale we expect to only be filtering against larger venue
# closures-scale time gaps.
#
# Map to total_ method since `dt_diff` is a duration type,
# not datetime - modern polars requires `total_*` methods
# for duration types (e.g. `total_days()` not `day()`)
# Ensure plural form for polars API (e.g. 'day' -> 'days')
unit_plural: str = (
gap_dt_unit
if gap_dt_unit.endswith('s')
else f'{gap_dt_unit}s'
)
duration_method: str = f'total_{unit_plural}'
return step_gaps.filter(
# Second by an arbitrary dt-unit step size
getattr(
pl.col('dt_diff').dt,
gap_dt_unit,
duration_method,
)().abs() > gap_thresh
)

View File

@ -0,0 +1,306 @@
# piker: trading gear for hackers

Ahh yeah, prolly worth noting that though it’s not documented yet, this is a a new .tsp mod which provides a bunch of new functionality for annotating time-gaps via a remote API started a while back when i first started digging into time-gap issues in the backfiller during the nativedb first-draft.

I’m not going to go through everything in detail here since it’s all going to be much more refined and formalized in #75 (and follow up).

Ahh yeah, prolly worth noting that though it's not documented yet, this is a a new `.tsp` mod which provides a bunch of new functionality for annotating time-gaps via a remote API started a while back when i first started digging into time-gap issues in the backfiller during the `nativedb` first-draft. I'm not going to go through everything in detail here since it's all going to be much more refined and formalized in #75 (and follow up).
# Copyright (C) 2018-present Tyler Goodlet (in stewardship of pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Time-series (remote) annotation APIs.
"""
from __future__ import annotations
from math import copysign
from typing import (
Any,
TYPE_CHECKING,
)
import polars as pl
import tractor
from piker.data._formatters import BGM
from piker.storage import log
from piker.ui._style import get_fonts
if TYPE_CHECKING:
from piker.ui._remote_ctl import AnnotCtl
def humanize_duration(
seconds: float,
) -> str:
'''
Convert duration in seconds to short human-readable form.
Uses smallest appropriate time unit:
- d: days
- h: hours
- m: minutes
- s: seconds
Examples:
- 86400 -> "1d"
- 28800 -> "8h"
- 180 -> "3m"
- 45 -> "45s"
'''
abs_secs: float = abs(seconds)
if abs_secs >= 86400:
days: float = abs_secs / 86400
if days >= 10 or days == int(days):
return f'{int(days)}d'
return f'{days:.1f}d'
elif abs_secs >= 3600:
hours: float = abs_secs / 3600
if hours >= 10 or hours == int(hours):
return f'{int(hours)}h'
return f'{hours:.1f}h'
elif abs_secs >= 60:
mins: float = abs_secs / 60
if mins >= 10 or mins == int(mins):
return f'{int(mins)}m'
return f'{mins:.1f}m'
else:
if abs_secs >= 10 or abs_secs == int(abs_secs):
return f'{int(abs_secs)}s'
return f'{abs_secs:.1f}s'
async def markup_gaps(
fqme: str,
timeframe: float,
actl: AnnotCtl,
wdts: pl.DataFrame,
gaps: pl.DataFrame,
# XXX, switch on to see txt showing a "humanized" label of each
# gap's duration.
show_txt: bool = False,
) -> dict[int, dict]:
'''
Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
with rectangles.
'''
# XXX: force chart redraw FIRST to ensure PlotItem coordinate
# system is properly initialized before we position annotations!
# Without this, annotations may be misaligned on first creation
# due to Qt/pyqtgraph initialization race conditions.
await actl.redraw(
fqme=fqme,
timeframe=timeframe,
)
aids: dict[int] = {}
for i in range(gaps.height):
row: pl.DataFrame = gaps[i]
# the gap's RIGHT-most bar's OPEN value
# at that time (sample) step.
iend: int = row['index'][0]
# dt: datetime = row['dt'][0]
# dt_prev: datetime = row['dt_prev'][0]
# dt_end_t: float = dt.timestamp()
# TODO: can we eventually remove this
# once we figure out why the epoch cols
# don't match?
# TODO: FIX HOW/WHY these aren't matching
# and are instead off by 4hours (EST
# vs. UTC?!?!)
# end_t: float = row['time']
# assert (
# dt.timestamp()
# ==
# end_t
# )
# the gap's LEFT-most bar's CLOSE value
# at that time (sample) step.
prev_r: pl.DataFrame = wdts.filter(
pl.col('index') == iend - 1
)
# XXX: probably a gap in the (newly sorted or de-duplicated)
# dt-df, so we might need to re-index first..
dt: pl.Series = row['dt']
dt_prev: pl.Series = row['dt_prev']
if prev_r.is_empty():
# XXX, filter out any special ignore cases,
# - UNIX-epoch stamped datums
# - first row
if (
dt_prev.dt.epoch()[0] == 0
or
dt.dt.epoch()[0] == 0
):
log.warning('Skipping row with UNIX epoch timestamp ??')
continue
if wdts[0]['index'][0] == iend: # first row
log.warning('Skipping first-row (has no previous obvi) !!')
continue
# XXX, if the previous-row by shm-index is missing,
# meaning there is a missing sample (set), get the prior
# row by df index and attempt to use it?
i_wdts: pl.DataFrame = wdts.with_row_index(name='i')
i_row: int = i_wdts.filter(pl.col('index') == iend)['i'][0]
prev_row_by_i = wdts[i_row]
prev_r: pl.DataFrame = prev_row_by_i
# debug any missing pre-row
if tractor._state.is_debug_mode():
await tractor.pause()
istart: int = prev_r['index'][0]
# TODO: implement px-col width measure
# and ensure at least as many px-cols
# shown per rect as configured by user.
# gap_w: float = abs((iend - istart))
# if gap_w < 6:
# margin: float = 6
# iend += margin
# istart -= margin
opn: float = row['open'][0]
cls: float = prev_r['close'][0]
# get gap duration for humanized label
gap_dur_s: float = row['s_diff'][0]
gap_label: str = humanize_duration(gap_dur_s)
# XXX: get timestamps for server-side index lookup
start_time: float = prev_r['time'][0]
end_time: float = row['time'][0]
# BGM=0.16 is the normal diff from overlap between bars, SO
# just go slightly "in" from that "between them".
from_idx: int = BGM - .06 # = .10
lc: tuple[float, float] = (
istart + 1 - from_idx,
cls,
)
ro: tuple[float, float] = (
iend + from_idx,
opn,
)
diff: float = cls - opn
sgn: float = copysign(1, diff)
up_gap: bool = sgn == -1
down_gap: bool = sgn == 1
flat: bool = sgn == 0
color: str = 'dad_blue'
# TODO? mks more sense to have up/down coloring?
# color: str = {
# -1: 'lilypad_green', # up-gap
# 1: 'wine', # down-gap
# }[sgn]
rect_kwargs: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
start_pos=lc,
end_pos=ro,
color=color,
start_time=start_time,
end_time=end_time,
)
# add up/down rects
aid: int|None = await actl.add_rect(**rect_kwargs)
if aid is None:
log.error(
f'Failed to add rect for,\n'
f'{rect_kwargs!r}\n'
f'\n'
f'Skipping to next gap!\n'
)
continue
assert aid
aids[aid] = rect_kwargs
direction: str = (
'down' if down_gap
else 'up'
)
# TODO! mk this a `msgspec.Struct` which we deserialize
# on the server side!
# XXX: send timestamp for server-side index lookup
# to ensure alignment with current shm state
gap_time: float = row['time'][0]
arrow_kwargs: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
x=iend, # fallback if timestamp lookup fails
y=cls,
time=gap_time, # for server-side index lookup
color=color,
alpha=169,
pointing=direction,
# TODO: expose these as params to markup_gaps()?
headLen=10,
headWidth=2.222,
pxMode=True,
)
aid: int = await actl.add_arrow(
**arrow_kwargs
)
# add duration label to RHS of arrow
if up_gap:
anchor = (0, 0)
# ^XXX? i dun get dese dims.. XD
elif down_gap:
anchor = (0, 1) # XXX y, x?
else: # no-gap?
assert flat
anchor = (0, 0) # up from bottom
# use a slightly smaller font for gap label txt.
font, small_font = get_fonts()
font_size: int = small_font.px_size - 1
assert isinstance(font_size, int)
if show_txt:
text_aid: int = await actl.add_text(
fqme=fqme,
timeframe=timeframe,
text=gap_label,
x=iend + 1, # fallback if timestamp lookup fails
y=cls,
time=gap_time, # server-side index lookup
color=color,
anchor=anchor,
font_size=font_size,
)
aids[text_aid] = {'text': gap_label}
# tell chart to redraw all its
# graphics view layers Bo
await actl.redraw(
fqme=fqme,
timeframe=timeframe,
)
return aids

View File

@ -0,0 +1,206 @@
'''
Smart OHLCV deduplication with data quality validation.
Handles concurrent write conflicts by keeping the most complete bar
(highest volume) while detecting data quality anomalies.
'''
import polars as pl
from ._anal import with_dts
def dedupe_ohlcv_smart(

Using a heuristic for which bar(s) are likely most correct given known race conditions around OHLCV sampling under real-time-write usage.

Using a heuristic for which bar(s) are likely most correct given known race conditions around OHLCV sampling under real-time-write usage.
src_df: pl.DataFrame,
time_col: str = 'time',
volume_col: str = 'volume',
sort: bool = True,
) -> tuple[
pl.DataFrame, # with dts
pl.DataFrame, # deduped (keeping higher volume bars)
int, # count of dupes removed
pl.DataFrame|None, # valid race conditions
pl.DataFrame|None, # data quality violations
]:
'''
Smart OHLCV deduplication keeping most complete bars.
For duplicate timestamps, keeps bar with highest volume under
the assumption that higher volume indicates more complete/final
data from backfill vs partial live updates.
Returns
-------
Tuple of:
- wdts: original dataframe with datetime columns added
- deduped: deduplicated frame keeping highest-volume bars
- diff: number of duplicate rows removed
- valid_races: duplicates meeting expected race condition pattern
(volume monotonic, OHLC ranges valid)
- data_quality_issues: duplicates violating expected relationships
indicating provider data problems
'''
wdts: pl.DataFrame = with_dts(src_df)
# Find duplicate timestamps
dupes: pl.DataFrame = wdts.filter(
pl.col(time_col).is_duplicated()
)
if dupes.is_empty():
# No duplicates, return as-is
return (wdts, wdts, 0, None, None)
# Analyze duplicate groups for validation
dupe_analysis: pl.DataFrame = (
dupes
.sort([time_col, 'index'])
.group_by(time_col, maintain_order=True)
.agg([
pl.col('index').alias('indices'),
pl.col('volume').alias('volumes'),
pl.col('high').alias('highs'),
pl.col('low').alias('lows'),
pl.col('open').alias('opens'),
pl.col('close').alias('closes'),
pl.col('dt').first().alias('dt'),
pl.len().alias('count'),
])
)
# Validate OHLCV monotonicity for each duplicate group
def check_ohlcv_validity(row) -> dict[str, bool]:
'''
Check if duplicate bars follow expected race condition pattern.
For a valid live-update backfill race:
- volume should be monotonically increasing
- high should be monotonically non-decreasing
- low should be monotonically non-increasing
- open should be identical (fixed at bar start)
Returns dict of violation flags.
'''
vols: list = row['volumes']
highs: list = row['highs']
lows: list = row['lows']
opens: list = row['opens']
violations: dict[str, bool] = {
'volume_non_monotonic': False,
'high_decreased': False,
'low_increased': False,
'open_mismatch': False,
'identical_bars': False,
}
# Check if all bars are identical (pure duplicate)
if (
len(set(vols)) == 1
and len(set(highs)) == 1
and len(set(lows)) == 1
and len(set(opens)) == 1
):
violations['identical_bars'] = True
return violations
# Check volume monotonicity
for i in range(1, len(vols)):
if vols[i] < vols[i-1]:
violations['volume_non_monotonic'] = True
break
# Check high monotonicity (can only increase or stay same)
for i in range(1, len(highs)):
if highs[i] < highs[i-1]:
violations['high_decreased'] = True
break
# Check low monotonicity (can only decrease or stay same)
for i in range(1, len(lows)):
if lows[i] > lows[i-1]:
violations['low_increased'] = True
break
# Check open consistency (should be fixed)
if len(set(opens)) > 1:
violations['open_mismatch'] = True
return violations
# Apply validation
dupe_analysis = dupe_analysis.with_columns([
pl.struct(['volumes', 'highs', 'lows', 'opens'])
.map_elements(
check_ohlcv_validity,
return_dtype=pl.Struct([
pl.Field('volume_non_monotonic', pl.Boolean),
pl.Field('high_decreased', pl.Boolean),
pl.Field('low_increased', pl.Boolean),
pl.Field('open_mismatch', pl.Boolean),
pl.Field('identical_bars', pl.Boolean),
])
)
.alias('validity')
])
# Unnest validity struct
dupe_analysis = dupe_analysis.unnest('validity')
# Separate valid races from data quality issues
valid_races: pl.DataFrame|None = (
dupe_analysis
.filter(
# Valid if no violations OR just identical bars
~pl.col('volume_non_monotonic')
& ~pl.col('high_decreased')
& ~pl.col('low_increased')
& ~pl.col('open_mismatch')
)
)
if valid_races.is_empty():
valid_races = None
data_quality_issues: pl.DataFrame|None = (
dupe_analysis
.filter(
# Issues if any non-identical violation exists
(
pl.col('volume_non_monotonic')
| pl.col('high_decreased')
| pl.col('low_increased')
| pl.col('open_mismatch')
)
& ~pl.col('identical_bars')
)
)
if data_quality_issues.is_empty():
data_quality_issues = None
# Deduplicate: keep highest volume bar for each timestamp
deduped: pl.DataFrame = (
wdts
.sort([time_col, volume_col])
.unique(
subset=[time_col],
keep='last',
maintain_order=False,
)
)
# Re-sort by time or index
if sort:
deduped = deduped.sort(by=time_col)
diff: int = wdts.height - deduped.height
return (
wdts,
deduped,
diff,
valid_races,
data_quality_issues,
)

1600
piker/tsp/_history.py 100644

File diff suppressed because it is too large Load Diff

View File

@ -21,6 +21,7 @@ Higher level annotation editors.
from __future__ import annotations
from collections import defaultdict
from typing import (
Literal,
Sequence,
TYPE_CHECKING,
)
@ -71,9 +72,18 @@ log = get_logger(__name__)
class ArrowEditor(Struct):
'''
Annotate a chart-view with arrows most often used for indicating,
- order txns/clears,
- positions directions,
- general points-of-interest like nooz events.
'''
godw: GodWidget = None # type: ignore # noqa
_arrows: dict[str, list[pg.ArrowItem]] = {}
_arrows: dict[
str,
list[pg.ArrowItem]
] = {}
def add(
self,
@ -81,8 +91,19 @@ class ArrowEditor(Struct):
uid: str,
x: float,
y: float,
color: str = 'default',
pointing: str | None = None,
color: str|None = None,
pointing: Literal[
'up',
'down',
None,
] = None,
alpha: int = 255,
zval: float = 1e9,
headLen: float|None = None,
headWidth: float|None = None,
tailLen: float|None = None,
tailWidth: float|None = None,
pxMode: bool = True,
) -> pg.ArrowItem:
'''
@ -98,29 +119,83 @@ class ArrowEditor(Struct):
# scale arrow sizing to dpi-aware font
size = _font.font.pixelSize() * 0.8
# allow caller override of head dimensions
if headLen is None:
headLen = size
if headWidth is None:
headWidth = size/2
# tail params default to None (no tail)
if tailWidth is None:
tailWidth = 3
color = color or 'default'
color = QColor(hcolor(color))
color.setAlpha(alpha)
pen = fn.mkPen(color, width=1)
brush = fn.mkBrush(color)
arrow = pg.ArrowItem(
angle=angle,
baseAngle=0,
headLen=size,
headWidth=size/2,
tailLen=None,
pxMode=True,
headLen=headLen,
headWidth=headWidth,
tailLen=tailLen,
tailWidth=tailWidth,
pxMode=pxMode,
# coloring
pen=pg.mkPen(hcolor('papas_special')),
brush=pg.mkBrush(hcolor(color)),
pen=pen,
brush=brush,
)
arrow.setZValue(zval)
arrow.setPos(x, y)
self._arrows.setdefault(uid, []).append(arrow)
plot.addItem(arrow) # render to view
# render to view
plot.addItem(arrow)
# register for removal
arrow._uid = uid
self._arrows.setdefault(
uid, []
).append(arrow)
return arrow
def remove(self, arrow) -> bool:
def remove(
self,
arrow: pg.ArrowItem,
) -> None:
'''
Remove a *single arrow* from all chart views to which it was
added.
'''
uid: str = arrow._uid
arrows: list[pg.ArrowItem] = self._arrows[uid]
log.info(
f'Removing arrow from views\n'
f'uid: {uid!r}\n'
f'{arrow!r}\n'
)
for linked in self.godw.iter_linked():
linked.chart.plotItem.removeItem(arrow)
if not (chart := linked.chart):
continue
chart.plotItem.removeItem(arrow)
try:
arrows.remove(arrow)
except ValueError:
log.warning(
f'Arrow was already removed?\n'
f'uid: {uid!r}\n'
f'{arrow!r}\n'
)
def remove_all(self) -> set[pg.ArrowItem]:
'''
Remove all arrows added by this editor from all
chart-views.
'''
for uid, arrows in self._arrows.items():
for arrow in arrows:
self.remove(arrow)
class LineEditor(Struct):
@ -266,6 +341,9 @@ class LineEditor(Struct):
return lines
# compat with ArrowEditor
remove = remove_line
def as_point(
pair: Sequence[float, float] | QPointF,
@ -614,3 +692,6 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
):
scen.removeItem(self._label_proxy)
# compat with ArrowEditor
remove = delete

View File

@ -27,10 +27,12 @@ from contextlib import (
from functools import partial
from pprint import pformat
from typing import (
# Any,
AsyncContextManager,
Literal,
)
from uuid import uuid4
import pyqtgraph as pg
import tractor
import trio
from tractor import trionics
@ -47,12 +49,16 @@ from piker.brokers import SymbolNotFound
from piker.ui.qt import (
QGraphicsItem,
)
from PyQt6.QtGui import QFont
from ._display import DisplayState
from ._interaction import ChartView
from ._editors import SelectRect
from ._editors import (
SelectRect,
ArrowEditor,
)
from ._chart import ChartPlotWidget
from ._dataviz import Viz
from ._style import hcolor
log = get_logger(__name__)
@ -83,8 +89,40 @@ _ctxs: IpcCtxTable = {}
# the "annotations server" which actually renders to a Qt canvas).
# type AnnotsTable = dict[int, QGraphicsItem]
AnnotsTable = dict[int, QGraphicsItem]
EditorsTable = dict[int, ArrowEditor]
_annots: AnnotsTable = {}
_editors: EditorsTable = {}
def rm_annot(
annot: ArrowEditor|SelectRect|pg.TextItem
) -> bool:
global _editors
match annot:
case pg.ArrowItem():
editor = _editors[annot._uid]
editor.remove(annot)
# ^TODO? only remove each arrow or all?
# if editor._arrows:
# editor.remove_all()
# else:
# log.warning(
# f'Annot already removed!\n'
# f'{annot!r}\n'
# )
return True
case SelectRect():
annot.delete()
return True
case pg.TextItem():
scene = annot.scene()
if scene:
scene.removeItem(annot)
return True
return False
async def serve_rc_annots(
@ -95,6 +133,12 @@ async def serve_rc_annots(
annots: AnnotsTable,
) -> None:
'''
A small viz(ualization) server for remote ctl of chart
annotations.
'''
global _editors
async for msg in annot_req_stream:
match msg:
case {
@ -104,14 +148,77 @@ async def serve_rc_annots(
'meth': str(meth),
'kwargs': dict(kwargs),
}:
ds: DisplayState = _dss[fqme]
try:
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
except KeyError:
msg: str = (
f'No chart for timeframe={timeframe}s, '
f'skipping rect annotation'
)
log.exeception(msg)
await annot_req_stream.send({'error': msg})
continue
cv: ChartView = chart.cv
# NEW: if timestamps provided, lookup current indices
# from shm to ensure alignment with current buffer
# state
start_time = kwargs.pop('start_time', None)
end_time = kwargs.pop('end_time', None)
if (
start_time is not None
and end_time is not None
):
viz: Viz = chart.get_viz(fqme)
shm = viz.shm
arr = shm.array
# lookup start index
start_matches = arr[arr['time'] == start_time]
if len(start_matches) == 0:
msg: str = (
f'No shm entry for start_time={start_time}, '
f'skipping rect'
)
log.error(msg)
await annot_req_stream.send({'error': msg})
continue
# lookup end index
end_matches = arr[arr['time'] == end_time]
if len(end_matches) == 0:
msg: str = (
f'No shm entry for end_time={end_time}, '
f'skipping rect'
)
log.error(msg)
await annot_req_stream.send({'error': msg})
continue
# get close price from start bar, open from end
# bar
start_idx = float(start_matches[0]['index'])
end_idx = float(end_matches[0]['index'])
start_close = float(start_matches[0]['close'])
end_open = float(end_matches[0]['open'])
# reconstruct start_pos and end_pos with
# looked-up indices
from_idx: float = 0.16 - 0.06 # BGM offset
kwargs['start_pos'] = (
start_idx + 1 - from_idx,
start_close,
)
kwargs['end_pos'] = (
end_idx + from_idx,
end_open,
)
# annot type lookup from cmd
rect = SelectRect(
viewbox=cv,
@ -130,21 +237,207 @@ async def serve_rc_annots(
# delegate generically to the requested method
getattr(rect, meth)(**kwargs)
rect.show()
# XXX: store absolute coords for repositioning
# during viz redraws (eg backfill updates)
rect._meth = meth
rect._kwargs = kwargs
aid: int = id(rect)
annots[aid] = rect
aids: set[int] = ctxs[ipc_key][1]
aids.add(aid)
await annot_req_stream.send(aid)
case {
'cmd': 'ArrowEditor',
'fqme': fqme,
'timeframe': timeframe,
'meth': 'add'|'remove' as meth,
'kwargs': {
'x': float(x),
'y': float(y),
'pointing': pointing,
'color': color,
'aid': str()|None as aid,
'alpha': int(alpha),
'headLen': int()|float()|None as headLen,
'headWidth': int()|float()|None as headWidth,
'tailLen': int()|float()|None as tailLen,
'tailWidth': int()|float()|None as tailWidth,
'pxMode': bool(pxMode),
'time': int()|float()|None as timestamp,
},
# ?TODO? split based on method fn-sigs?
# 'pointing',
}:
ds: DisplayState = _dss[fqme]
try:
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
except KeyError:
log.warning(
f'No chart for timeframe={timeframe}s, '
f'skipping arrow annotation'
)
# return -1 to indicate failure
await annot_req_stream.send(-1)
continue
cv: ChartView = chart.cv
godw = chart.linked.godwidget
# NEW: if timestamp provided, lookup current index
# from shm to ensure alignment with current buffer
# state
if timestamp is not None:
viz: Viz = chart.get_viz(fqme)
shm = viz.shm
arr = shm.array
# find index where time matches timestamp
matches = arr[arr['time'] == timestamp]
if len(matches) == 0:
log.error(
f'No shm entry for timestamp={timestamp}, '
f'skipping arrow annotation'
)
await annot_req_stream.send(-1)
continue
# use the matched row's index as x
x = float(matches[0]['index'])
arrows = ArrowEditor(godw=godw)
# `.add/.remove()` API
if meth != 'add':
# await tractor.pause()
raise ValueError(
f'Invalid arrow-edit request ?\n'
f'{msg!r}\n'
)
aid: str = str(uuid4())
arrow: pg.ArrowItem = arrows.add(
plot=chart.plotItem,
uid=aid,
x=x,
y=y,
pointing=pointing,
color=color,
alpha=alpha,
headLen=headLen,
headWidth=headWidth,
tailLen=tailLen,
tailWidth=tailWidth,
pxMode=pxMode,
)
# XXX: store absolute coords for repositioning
# during viz redraws (eg backfill updates)
arrow._abs_x = x
arrow._abs_y = y
annots[aid] = arrow
_editors[aid] = arrows
aids: set[int] = ctxs[ipc_key][1]
aids.add(aid)
await annot_req_stream.send(aid)
case {
'cmd': 'TextItem',
'fqme': fqme,
'timeframe': timeframe,
'kwargs': {
'text': str(text),
'x': int()|float() as x,
'y': int()|float() as y,
'color': color,
'anchor': list(anchor),
'font_size': int()|None as font_size,
'time': int()|float()|None as timestamp,
},
}:
ds: DisplayState = _dss[fqme]
try:
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
except KeyError:
log.warning(
f'No chart for timeframe={timeframe}s, '
f'skipping text annotation'
)
await annot_req_stream.send(-1)
continue
# NEW: if timestamp provided, lookup current index
# from shm to ensure alignment with current buffer
# state
if timestamp is not None:
viz: Viz = chart.get_viz(fqme)
shm = viz.shm
arr = shm.array
# find index where time matches timestamp
matches = arr[arr['time'] == timestamp]
if len(matches) == 0:
log.error(
f'No shm entry for timestamp={timestamp}, '
f'skipping text annotation'
)
await annot_req_stream.send(-1)
continue
# use the matched row's index as x, +1 for text
# offset
x = float(matches[0]['index']) + 1
# convert named color to hex
color_hex: str = hcolor(color)
# create text item
text_item: pg.TextItem = pg.TextItem(
text=text,
color=color_hex,
anchor=anchor,
# ?TODO, pin to github:main for this?
# legacy, can have scaling ish?
# ensureInBounds=True,
)
# apply font size (default to DpiAwareFont if not
# provided)
if font_size is None:
from ._style import get_fonts
font, font_small = get_fonts()
font_size = font_small.px_size - 1
qfont: QFont = text_item.textItem.font()
qfont.setPixelSize(font_size)
text_item.setFont(qfont)
text_item.setPos(x, y)
chart.plotItem.addItem(text_item)
# XXX: store absolute coords for repositioning
# during viz redraws (eg backfill updates)
text_item._abs_x = x
text_item._abs_y = y
aid: str = str(uuid4())
annots[aid] = text_item
aids: set[int] = ctxs[ipc_key][1]
aids.add(aid)
await annot_req_stream.send(aid)
case {
'cmd': 'remove',
'aid': int(aid),
'aid': int(aid)|str(aid),
}:
# NOTE: this is normally entered on
# a client's annotation de-alloc normally
# prior to detach or modify.
annot: QGraphicsItem = annots[aid]
annot.delete()
assert rm_annot(annot)
# respond to client indicating annot
# was indeed deleted.
@ -175,6 +468,38 @@ async def serve_rc_annots(
)
viz.reset_graphics()
# XXX: reposition all annotations to ensure they
# stay aligned with viz data after reset (eg during
# backfill when abs-index range changes)
n_repositioned: int = 0
for aid, annot in annots.items():
# arrows and text items use abs x,y coords
if (
hasattr(annot, '_abs_x')
and
hasattr(annot, '_abs_y')
):
annot.setPos(
annot._abs_x,
annot._abs_y,
)
n_repositioned += 1
# rects use method + kwargs
elif (
hasattr(annot, '_meth')
and
hasattr(annot, '_kwargs')
):
getattr(annot, annot._meth)(**annot._kwargs)
n_repositioned += 1
if n_repositioned:
log.info(
f'Repositioned {n_repositioned} annotation(s) '
f'after viz redraw'
)
case _:
log.error(
'Unknown remote annotation cmd:\n'
@ -188,6 +513,12 @@ async def remote_annotate(
) -> None:
global _dss, _ctxs
if not _dss:
raise RuntimeError(
'Race condition on chart-init state ??\n'
'Anoter actor is trying to annoate this chart '
'before it has fully spawned.\n'
)
assert _dss
_ctxs[ctx.cid] = (ctx, set())
@ -212,7 +543,7 @@ async def remote_annotate(
assert _ctx is ctx
for aid in aids:
annot: QGraphicsItem = _annots[aid]
annot.delete()
assert rm_annot(annot)
class AnnotCtl(Struct):
@ -257,13 +588,18 @@ class AnnotCtl(Struct):
from_acm: bool = False,
) -> int:
# NEW: optional timestamps for server-side index lookup
start_time: float|None = None,
end_time: float|None = None,
) -> int|None:
'''
Add a `SelectRect` annotation to the target view, return
the instances `id(obj)` from the remote UI actor.
'''
ipc: MsgStream = self._get_ipc(fqme)
with trio.fail_after(3):
await ipc.send({
'fqme': fqme,
'cmd': 'SelectRect',
@ -275,9 +611,15 @@ class AnnotCtl(Struct):
'end_pos': tuple(end_pos),
'color': color,
'update_label': False,
'start_time': start_time,
'end_time': end_time,
},
})
aid: int = await ipc.receive()
aid: int|dict = await ipc.receive()
match aid:
case {'error': str(msg)}:
log.error(msg)
return None
self._ipcs[aid] = ipc
if not from_acm:
self._annot_stack.push_async_callback(
@ -334,20 +676,130 @@ class AnnotCtl(Struct):
'timeframe': timeframe,
})
# TODO: do we even need this?
# async def modify(
# self,
# aid: int, # annotation id
# meth: str, # far end graphics object method to invoke
# params: dict[str, Any], # far end `meth(**kwargs)`
# ) -> bool:
# '''
# Modify an existing (remote) annotation's graphics
# paramters, thus changing it's appearance / state in real
# time.
async def add_arrow(
self,
fqme: str,
timeframe: float,
x: float,
y: float,
pointing: Literal[
'up',
'down',
],
# TODO: a `Literal['view', 'scene']` for this?
# domain: str = 'view', # or 'scene'
color: str = 'dad_blue',
alpha: int = 116,
headLen: float|None = None,
headWidth: float|None = None,
tailLen: float|None = None,
tailWidth: float|None = None,
pxMode: bool = True,
# '''
# raise NotImplementedError
from_acm: bool = False,
# NEW: optional timestamp for server-side index lookup
time: float|None = None,
) -> int|None:
'''
Add a `SelectRect` annotation to the target view, return
the instances `id(obj)` from the remote UI actor.
'''
ipc: MsgStream = self._get_ipc(fqme)
with trio.fail_after(3):
await ipc.send({
'fqme': fqme,
'cmd': 'ArrowEditor',
'timeframe': timeframe,
# 'meth': str(meth),
'meth': 'add',
'kwargs': {
'x': float(x),
'y': float(y),
'color': color,
'pointing': pointing, # up|down
'alpha': alpha,
'aid': None,
'headLen': headLen,
'headWidth': headWidth,
'tailLen': tailLen,
'tailWidth': tailWidth,
'pxMode': pxMode,
'time': time, # for server-side index lookup
},
})
aid: int|dict = await ipc.receive()
match aid:
case {'error': str(msg)}:
log.error(msg)
return None
self._ipcs[aid] = ipc
if not from_acm:
self._annot_stack.push_async_callback(
partial(
self.remove,
aid,
)
)
return aid
async def add_text(
self,
fqme: str,
timeframe: float,
text: str,
x: float,
y: float,
color: str|tuple = 'dad_blue',
anchor: tuple[float, float] = (0, 1),
font_size: int|None = None,
from_acm: bool = False,
# NEW: optional timestamp for server-side index lookup
time: float|None = None,
) -> int|None:
'''
Add a `pg.TextItem` annotation to the target view.
anchor: (x, y) where (0,0) is upper-left, (1,1) is lower-right
font_size: pixel size for font, defaults to `_font.font.pixelSize()`
'''
ipc: MsgStream = self._get_ipc(fqme)
with trio.fail_after(3):
await ipc.send({
'fqme': fqme,
'cmd': 'TextItem',
'timeframe': timeframe,
'kwargs': {
'text': text,
'x': float(x),
'y': float(y),
'color': color,
'anchor': tuple(anchor),
'font_size': font_size,
'time': time, # for server-side index lookup
},
})
aid: int|dict = await ipc.receive()
match aid:
case {'error': str(msg)}:
log.error(msg)
return None
self._ipcs[aid] = ipc
if not from_acm:
self._annot_stack.push_async_callback(
partial(
self.remove,
aid,
)
)
return aid
@acm
@ -374,7 +826,9 @@ async def open_annot_ctl(
# TODO: print the current discoverable actor UID set
# here as well?
if not maybe_portals:
raise RuntimeError('No chart UI actors found in service domain?')
raise RuntimeError(
'No chart actors found in service domain?'
)
for portal in maybe_portals:
ctx_mngrs.append(

View File

@ -107,7 +107,22 @@ class DpiAwareFont:
@property
def px_size(self) -> int:
return self._qfont.pixelSize()
size: int = self._qfont.pixelSize()
# XXX, when no Qt app has been spawned this will always be
# invalid..
# SO, just return any conf.toml value.
if size == -1:
if (conf_size := self._font_size) is None:
raise ValueError(
f'No valid `{type(_font).__name__}.px_size` set?\n'
f'\n'
f'-> `ui.font_size` is NOT set in `conf.toml`\n'
f'-> no Qt app is active ??\n'
)
return conf_size
return size
def configure_to_dpi(self, screen: QtGui.QScreen | None = None):
'''
@ -221,6 +236,20 @@ def _config_fonts_to_screen() -> None:
_font_small.configure_to_dpi()
def get_fonts() -> tuple[
DpiAwareFont,
DpiAwareFont,
]:
'''
Get the singleton font pair (of instances) from which all other
UI/UX should be "scaled around".
See `DpiAwareFont` for (internal) deats.
'''
return _font, _font_small
# TODO: re-compute font size when main widget switches screens?
# https://forum.qt.io/topic/54136/how-do-i-get-the-qscreen-my-widget-is-on-qapplication-desktop-screen-returns-a-qwidget-and-qobject_cast-qscreen-returns-null/3

View File

@ -98,6 +98,7 @@ python-downloads = 'manual'
# https://docs.astral.sh/uv/concepts/projects/dependencies/#default-groups
default-groups = [
'uis',
'repl',
]
# ------ tool.uv ------
@ -116,7 +117,6 @@ uis = [
dev = [
# https://docs.astral.sh/uv/concepts/projects/dependencies/#development-dependencies
"cython >=3.0.0, <4.0.0",
# nested deps-groups
# https://docs.astral.sh/uv/concepts/projects/dependencies/#nesting-groups
{include-group = 'uis'},
@ -130,10 +130,14 @@ repl = [
"greenback >=1.1.1, <2.0.0",
# @goodboy's preferred console toolz
"xonsh",
"xonsh>=0.22.2",
"prompt-toolkit ==3.0.40",
"pyperclip>=1.9.0",
# for @claude's `snippets/claude_debug_helper.py` it uses to do
# "offline" debug/crash REPL-in alongside a dev.
"pexpect>=4.9.0",
# ?TODO, new stuff to consider..
# "visidata" # console numerics
# "xxh" # for remote `xonsh`-ing
@ -191,6 +195,11 @@ pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
pyvnc = { git = "https://github.com/regulad/pyvnc.git" }
# to get fancy next-cmd/suggestion feats prior to 0.22.2 B)
# https://github.com/xonsh/xonsh/pull/6037
# https://github.com/xonsh/xonsh/pull/6048
# xonsh = { git = 'https://github.com/xonsh/xonsh.git', branch = 'main' }
# XXX since, we're like, always hacking new shite all-the-time. Bp
tractor = { git = "https://github.com/goodboy/tractor.git", branch ="piker_pin" }
# tractor = { git = "https://pikers.dev/goodboy/tractor", branch = "piker_pin" }

View File

@ -0,0 +1,256 @@
#!/usr/bin/env python

Ahh right and this is an “offline REPL runner” script claude wrote for itself to be able to grok a human dev’s experience introspecting crashes from pdb since apparently it can’t actually allow TTY takeover by such tools (and/or can’t sub-spawn one inside it’s process tree..)

This prolly should be better refined as well once we have claude more integrated in our workflow.

Ahh right and this is an "offline REPL runner" script `claude` wrote for itself to be able to grok a human dev's experience introspecting crashes from `pdb` since apparently it can't actually allow TTY takeover by such tools (and/or can't sub-spawn one inside it's process tree..) This prolly should be better refined as well once we have `claude` more integrated in our workflow.
'''
Programmatic debugging helper for `pdbp` REPL human-like
interaction but built to allow `claude` to interact with
crashes and `tractor.pause()` breakpoints along side a human dev.
Originally written by `clauded` during a backfiller inspection
session with @goodboy trying to resolve duplicate/gappy ohlcv ts
issues discovered while testing the new `nativedb` tsdb.
Allows `claude` to run `pdb` commands and capture output in an "offline"
manner but generating similar output as if it was iteracting with
the debug REPL.
The use of `pexpect` is heavily based on tractor's REPL UX test
suite(s), namely various `tests/devx/test_debugger.py` patterns.
'''
import sys
import os
import time
import pexpect
from pexpect.exceptions import (
TIMEOUT,
EOF,
)
PROMPT: str = r'\(Pdb\+\)'
def expect(
child: pexpect.spawn,
patt: str,
**kwargs,
) -> None:
'''
Expect wrapper that prints last console data before failing.
'''
try:
child.expect(
patt,
**kwargs,
)
except TIMEOUT:
before: str = (
str(child.before.decode())
if isinstance(child.before, bytes)
else str(child.before)
)
print(
f'TIMEOUT waiting for pattern: {patt}\n'
f'Last seen output:\n{before}'
)
raise
def run_pdb_commands(
commands: list[str],
initial_cmd: str = 'piker store ldshm xmrusdt.usdtm.perp.binance',
timeout: int = 30,
print_output: bool = True,
) -> dict[str, str]:
'''
Spawn piker process, wait for pdb prompt, execute commands.
Returns dict mapping command -> output.
'''
results: dict[str, str] = {}
# Disable colored output for easier parsing
os.environ['PYTHON_COLORS'] = '0'
# Spawn the process
if print_output:
print(f'Spawning: {initial_cmd}')
child: pexpect.spawn = pexpect.spawn(
initial_cmd,
timeout=timeout,
encoding='utf-8',
echo=False,
)
# Wait for pdb prompt
try:
expect(child, PROMPT, timeout=timeout)
if print_output:
print('Reached pdb prompt!')
# Execute each command
for cmd in commands:
if print_output:
print(f'\n>>> {cmd}')
child.sendline(cmd)
time.sleep(0.1)
# Wait for next prompt
expect(child, PROMPT, timeout=timeout)
# Capture output (everything before the prompt)
output: str = (
str(child.before.decode())
if isinstance(child.before, bytes)
else str(child.before)
)
results[cmd] = output
if print_output:
print(output)
# Quit debugger gracefully
child.sendline('quit')
try:
child.expect(EOF, timeout=5)
except (TIMEOUT, EOF):
pass
except TIMEOUT as e:
print(f'Timeout: {e}')
if child.before:
before: str = (
str(child.before.decode())
if isinstance(child.before, bytes)
else str(child.before)
)
print(f'Buffer:\n{before}')
results['_error'] = str(e)
finally:
if child.isalive():
child.close(force=True)
return results
class InteractivePdbSession:
'''
Interactive pdb session manager for incremental debugging.
'''
def __init__(
self,
cmd: str = 'piker store ldshm xmrusdt.usdtm.perp.binance',
timeout: int = 30,
):
self.cmd: str = cmd
self.timeout: int = timeout
self.child: pexpect.spawn|None = None
self.history: list[tuple[str, str]] = []
def start(self) -> None:
'''
Start the piker process and wait for first prompt.
'''
os.environ['PYTHON_COLORS'] = '0'
print(f'Starting: {self.cmd}')
self.child = pexpect.spawn(
self.cmd,
timeout=self.timeout,
encoding='utf-8',
echo=False,
)
# Wait for initial prompt
expect(self.child, PROMPT, timeout=self.timeout)
print('Ready at pdb prompt!')
def run(
self,
cmd: str,
print_output: bool = True,
) -> str:
'''
Execute a single pdb command and return output.
'''
if not self.child or not self.child.isalive():
raise RuntimeError('Session not started or dead')
if print_output:
print(f'\n>>> {cmd}')
self.child.sendline(cmd)
time.sleep(0.1)
# Wait for next prompt
expect(self.child, PROMPT, timeout=self.timeout)
output: str = (
str(self.child.before.decode())
if isinstance(self.child.before, bytes)
else str(self.child.before)
)
self.history.append((cmd, output))
if print_output:
print(output)
return output
def quit(self) -> None:
'''
Exit the debugger and cleanup.
'''
if self.child and self.child.isalive():
self.child.sendline('quit')
try:
self.child.expect(EOF, timeout=5)
except (TIMEOUT, EOF):
pass
self.child.close(force=True)
def __enter__(self):
self.start()
return self
def __exit__(self, *args):
self.quit()
if __name__ == '__main__':
# Example inspection commands
inspect_cmds: list[str] = [
'locals().keys()',
'type(deduped)',
'deduped.shape',
(
'step_gaps.shape '
'if "step_gaps" in locals() '
'else "N/A"'
),
(
'venue_gaps.shape '
'if "venue_gaps" in locals() '
'else "N/A"'
),
]
# Allow commands from CLI args
if len(sys.argv) > 1:
inspect_cmds = sys.argv[1:]
# Interactive session example
with InteractivePdbSession() as session:
for cmd in inspect_cmds:
session.run(cmd)
print('\n=== Session Complete ===')

41
uv.lock
View File

@ -1000,6 +1000,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/6e/23/e98758924d1b3aac11a626268eabf7f3cf177e7837c28d47bf84c64532d0/pendulum-3.1.0-py3-none-any.whl", hash = "sha256:f9178c2a8e291758ade1e8dd6371b1d26d08371b4c7730a6e9a3ef8b16ebae0f", size = 111799, upload-time = "2025-04-19T14:02:34.739Z" },
]
[[package]]
name = "pexpect"
version = "4.9.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "ptyprocess" },
]
sdist = { url = "https://files.pythonhosted.org/packages/42/92/cc564bf6381ff43ce1f4d06852fc19a2f11d180f23dc32d9588bee2f149d/pexpect-4.9.0.tar.gz", hash = "sha256:ee7d41123f3c9911050ea2c2dac107568dc43b2d3b0c7557a33212c398ead30f", size = 166450, upload-time = "2023-11-25T09:07:26.339Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/9e/c3/059298687310d527a58bb01f3b1965787ee3b40dce76752eda8b44e9a2c5/pexpect-4.9.0-py2.py3-none-any.whl", hash = "sha256:7236d1e080e4936be2dc3e326cec0af72acf9212a7e1d060210e70a47e253523", size = 63772, upload-time = "2023-11-25T06:56:14.81Z" },
]
[[package]]
name = "piker"
version = "0.1.0a0.dev0"
@ -1047,6 +1059,7 @@ dev = [
{ name = "greenback" },
{ name = "i3ipc" },
{ name = "pdbp" },
{ name = "pexpect" },
{ name = "prompt-toolkit" },
{ name = "pyperclip" },
{ name = "pyqt6" },
@ -1062,6 +1075,7 @@ lint = [
repl = [
{ name = "greenback" },
{ name = "pdbp" },
{ name = "pexpect" },
{ name = "prompt-toolkit" },
{ name = "pyperclip" },
{ name = "xonsh" },
@ -1116,6 +1130,7 @@ dev = [
{ name = "greenback", specifier = ">=1.1.1,<2.0.0" },
{ name = "i3ipc", specifier = ">=2.2.1" },
{ name = "pdbp", specifier = ">=1.8.2,<2.0.0" },
{ name = "pexpect", specifier = ">=4.9.0" },
{ name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pyqt6", specifier = ">=6.7.0,<7.0.0" },
@ -1123,15 +1138,16 @@ dev = [
{ name = "pytest" },
{ name = "qdarkstyle", specifier = ">=3.0.2,<4.0.0" },
{ name = "rapidfuzz", specifier = ">=3.2.0,<4.0.0" },
{ name = "xonsh" },
{ name = "xonsh", specifier = ">=0.22.2" },
]
lint = [{ name = "ruff", specifier = ">=0.9.6" }]
repl = [
{ name = "greenback", specifier = ">=1.1.1,<2.0.0" },
{ name = "pdbp", specifier = ">=1.8.2,<2.0.0" },
{ name = "pexpect", specifier = ">=4.9.0" },
{ name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "xonsh" },
{ name = "xonsh", specifier = ">=0.22.2" },
]
testing = [{ name = "pytest" }]
uis = [
@ -1297,6 +1313,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/5b/5a/bc7b4a4ef808fa59a816c17b20c4bef6884daebbdf627ff2a161da67da19/propcache-0.4.1-py3-none-any.whl", hash = "sha256:af2a6052aeb6cf17d3e46ee169099044fd8224cbaf75c76a2ef596e8163e2237", size = 13305, upload-time = "2025-10-08T19:49:00.792Z" },
]
[[package]]
name = "ptyprocess"
version = "0.7.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/20/e5/16ff212c1e452235a90aeb09066144d0c5a6a8c0834397e03f5224495c4e/ptyprocess-0.7.0.tar.gz", hash = "sha256:5c5d0a3b48ceee0b48485e0c26037c0acd7d29765ca3fbb5cb3831d347423220", size = 70762, upload-time = "2020-12-28T15:15:30.155Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/22/a6/858897256d0deac81a172289110f31629fc4cee19b6f01283303e18c8db3/ptyprocess-0.7.0-py2.py3-none-any.whl", hash = "sha256:4b41f3967fce3af57cc7e94b888626c18bf37a083e3651ca8feeb66d492fef35", size = 13993, upload-time = "2020-12-28T15:15:28.35Z" },
]
[[package]]
name = "pyarrow"
version = "22.0.0"
@ -1843,7 +1868,7 @@ source = { git = "https://github.com/pikers/tomlkit.git?branch=piker_pin#8e0239a
[[package]]
name = "tractor"
version = "0.1.0a6.dev0"
source = { git = "https://github.com/goodboy/tractor.git?branch=piker_pin#e232d9dd06f41b8dca997f0647f2083d27cc34f2" }
source = { git = "https://github.com/goodboy/tractor.git?branch=piker_pin#36307c59175a1d04fecc77ef2c28f5c943b5f3d1" }
dependencies = [
{ name = "bidict" },
{ name = "cffi" },
@ -2095,13 +2120,13 @@ wheels = [
[[package]]
name = "xonsh"
version = "0.20.0"
version = "0.22.4"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/56/af/7e2ba3885da44cbe03c7ff46f90ea917ba10d91dc74d68604001ea28055f/xonsh-0.20.0.tar.gz", hash = "sha256:d44a50ee9f288ff96bd0456f0a38988ef6d4985637140ea793beeef5ec5d2d38", size = 811907, upload-time = "2025-11-24T07:50:50.847Z" }
sdist = { url = "https://files.pythonhosted.org/packages/48/df/1fc9ed62b3d7c14612e1713e9eb7bd41d54f6ad1028a8fbb6b7cddebc345/xonsh-0.22.4.tar.gz", hash = "sha256:6be346563fec2db75778ba5d2caee155525e634e99d9cc8cc347626025c0b3fa", size = 826665, upload-time = "2026-02-17T07:53:39.424Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/e8/db/1c5c057c0b2a89b8919477726558685720ae0849ea1a98a3803e93550824/xonsh-0.20.0-py311-none-any.whl", hash = "sha256:65d27ba31d558f79010d6c652751449fd3ed4df1f1eda78040a6427fa0a0f03e", size = 646312, upload-time = "2025-11-24T07:50:49.488Z" },
{ url = "https://files.pythonhosted.org/packages/d2/a2/d6f7534f31489a4b8b54bd2a2496248f86f7c21a6a6ce9bfdcdd389fe4e7/xonsh-0.20.0-py312-none-any.whl", hash = "sha256:3148900e67b9c2796bef6f2eda003b0a64d4c6f50a0db23324f786d9e1af9353", size = 646323, upload-time = "2025-11-24T07:50:43.028Z" },
{ url = "https://files.pythonhosted.org/packages/bd/48/bcb1e4d329c3d522bc29b066b0b6ee86938ec392376a29c36fac0ad1c586/xonsh-0.20.0-py313-none-any.whl", hash = "sha256:c83daaf6eb2960180fc5a507459dbdf6c0d6d63e1733c43f4e43db77255c7278", size = 646830, upload-time = "2025-11-24T07:50:45.078Z" },
{ url = "https://files.pythonhosted.org/packages/2e/00/7cbc0c1fb64365a0a317c54ce3a151c9644eea5a509d9cbaae61c9fd1426/xonsh-0.22.4-py311-none-any.whl", hash = "sha256:38b29b29fa85aa756462d9d9bbcaa1d85478c2108da3de6cc590a69a4bcd1a01", size = 654375, upload-time = "2026-02-17T07:53:37.702Z" },
{ url = "https://files.pythonhosted.org/packages/2e/c2/3dd498dc28d8f89cdd52e39950c5e591499ae423f61694c0bb4d03ed1d82/xonsh-0.22.4-py312-none-any.whl", hash = "sha256:4e538fac9f4c3d866ddbdeca068f0c0515469c997ed58d3bfee963878c6df5a5", size = 654300, upload-time = "2026-02-17T07:53:35.813Z" },
{ url = "https://files.pythonhosted.org/packages/82/7d/1f9c7147518e9f03f6ce081b5bfc4f1aceb6ec5caba849024d005e41d3be/xonsh-0.22.4-py313-none-any.whl", hash = "sha256:cc5fabf0ad0c56a2a11bed1e6a43c4ec6416a5b30f24f126b8e768547c3793e2", size = 654818, upload-time = "2026-02-17T07:53:33.477Z" },
]
[[package]]