Compare commits

...

6 Commits

Author SHA1 Message Date
Gud Boi 50825f30b8 Add annot refreshed-positioning to `Viz` iface
Extend `Viz` with dynamic annot repositioning logic in a new
`._reposition_annotations()` method. Try calling it inside
`Viz.update_graphics()/.reset_graphics()` to attempt keeping annots
aligned with underlying data coords.

Also,
- add index-range cache to skip redundant repositioning
- re-enable backfill force-redraw match block in
  `.ui._display.increment_history_view()`
  * uncomment `viz` and `name` bindings for match block use
    to make the above valid.
- claude did some weird `profiler()` as logger thing that we'll need to
  correct, weird how it only did it once and the other was using `log`
  XD

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-20 18:57:26 -05:00
Gud Boi ed9c211b96 Adjust binance stale-bar detection to 2x tolerance
Change the stale-bar check in `.binance.feed` from `timeframe` to
`timeframe * 2` tolerance to avoid false-positive pauses when bars
are slightly delayed but still within acceptable bounds.

Styling,
- add walrus operator to capture `_time_step` for debugger
  inspection.
- add comment explaining the debug purpose of this check.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-20 16:36:44 -05:00
Gud Boi f1b27e9696 Replace assert with warn for no-gaps in `.storage.cli`
Change `assert aids` to a warning log when no history gaps are found
during `ldshm` gap detection; it is the **ideal case** OBVI. This avoids
crashing the CLI when gap detection finds no issues, which is actually
good news!

Bp

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-20 16:33:53 -05:00
Gud Boi 959d04024b .tsp._history: add gap detection in backfill loop
Add frame-gap detection when `frame_last_dt < end_dt_param` to
warn about potential venue closures or missing data during the
backfill loop in `start_backfill()`.

Deats,
- add `frame_last_dt < end_dt_param` check after frame recv
- log warnings with EST-converted timestamps for clarity
- add `await tractor.pause()` for REPL-investigation on gaps
- add TODO comment about venue closure hour checking
- capture `_until_was_none` walrus var for null-check clarity
- add `last_time` assertion for `time[-1] == next_end_dt`
- rename `_daterr` to `nodata` with `_nodata` capture

Also,
- import `pendulum.timezone` and create `est` tz instance
- change `get_logger()` import from `.data._util` to `.log`
- add parens around `(next_prepend_index - ln) < 0` check

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-20 16:32:34 -05:00
Gud Boi 6f8a361e80 Cleanups and doc tweaks to `.ui._fsp`
Expand read-race warning log for clarity, add TODO for reading
`tractor` transport config from `conf.toml`, and reflow docstring
in `open_vlm_displays()`.

Also,
- whitespace cleanup: `Type | None` -> `Type|None`
- clarify "Volume" -> "Vlm (volume)" in docstr

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-01 19:28:14 -05:00
Gud Boi 2d678e1582 Guard against `None` chart in `ArrowEditor.remove()`
Add null check for `linked.chart` before calling
`.plotItem.removeItem()` to prevent `AttributeError` when chart
is `None`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-30 19:21:28 -05:00
7 changed files with 177 additions and 46 deletions

View File

@ -275,9 +275,15 @@ async def open_history_client(
f'{times}' f'{times}'
) )
# XXX, debug any case where the latest 1m bar we get is
# already another "sample's-step-old"..
if end_dt is None: if end_dt is None:
inow: int = round(time.time()) inow: int = round(time.time())
if (inow - times[-1]) > 60: if (
_time_step := (inow - times[-1])
>
timeframe * 2
):
await tractor.pause() await tractor.pause()
start_dt = from_timestamp(times[0]) start_dt = from_timestamp(times[0])

View File

@ -447,7 +447,13 @@ def ldshm(
) )
# last chance manual overwrites in REPL # last chance manual overwrites in REPL
# await tractor.pause() # await tractor.pause()
assert aids if not aids:
log.warning(
f'No gaps were found !?\n'
f'fqme: {fqme!r}\n'
f'timeframe: {period_s!r}\n'
f"WELL THAT'S GOOD NOOZ!\n"
)
tf2aids[period_s] = aids tf2aids[period_s] = aids
else: else:

View File

@ -49,6 +49,7 @@ from pendulum import (
Duration, Duration,
duration as mk_duration, duration as mk_duration,
from_timestamp, from_timestamp,
timezone,
) )
import numpy as np import numpy as np
import polars as pl import polars as pl
@ -57,9 +58,7 @@ from piker.brokers import NoData
from piker.accounting import ( from piker.accounting import (
MktPair, MktPair,
) )
from piker.data._util import ( from piker.log import get_logger
log,
)
from ..data._sharedmem import ( from ..data._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
ShmArray, ShmArray,
@ -97,6 +96,9 @@ if TYPE_CHECKING:
# from .feed import _FeedsBus # from .feed import _FeedsBus
log = get_logger()
# `ShmArray` buffer sizing configuration: # `ShmArray` buffer sizing configuration:
_mins_in_day = int(60 * 24) _mins_in_day = int(60 * 24)
# how much is probably dependent on lifestyle # how much is probably dependent on lifestyle
@ -401,7 +403,9 @@ async def start_backfill(
# based on the sample step size, maybe load a certain amount history # based on the sample step size, maybe load a certain amount history
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if backfill_until_dt is None: if (
_until_was_none := (backfill_until_dt is None)
):
# TODO: per-provider default history-durations? # TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow # -[ ] inside the `open_history_client()` config allow
@ -435,6 +439,8 @@ async def start_backfill(
last_start_dt: datetime = backfill_from_dt last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index next_prepend_index: int = backfill_from_shm_index
est = timezone('EST')
while last_start_dt > backfill_until_dt: while last_start_dt > backfill_until_dt:
log.info( log.info(
f'Requesting {timeframe}s frame:\n' f'Requesting {timeframe}s frame:\n'
@ -448,9 +454,10 @@ async def start_backfill(
next_end_dt, next_end_dt,
) = await get_hist( ) = await get_hist(
timeframe, timeframe,
end_dt=last_start_dt, end_dt=(end_dt_param := last_start_dt),
) )
except NoData as _daterr: except NoData as nodata:
_nodata = nodata
orig_last_start_dt: datetime = last_start_dt orig_last_start_dt: datetime = last_start_dt
gap_report: str = ( gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
@ -518,8 +525,32 @@ async def start_backfill(
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
assert (
(last_time := time[-1])
==
next_end_dt.timestamp()
)
assert time[-1] == next_end_dt.timestamp() frame_last_dt = from_timestamp(last_time)
if (
frame_last_dt.add(seconds=timeframe)
<
end_dt_param
):
est_frame_last_dt = est.convert(frame_last_dt)
est_end_dt_param = est.convert(end_dt_param)
log.warning(
f'Provider frame ending BEFORE requested end_dt={end_dt_param} ??\n'
f'frame_last_dt (EST): {est_frame_last_dt!r}\n'
f'end_dt_param (EST): {est_end_dt_param!r}\n'
f'\n'
f'Likely contains,\n'
f'- a venue closure.\n'
f'- (maybe?) missing data ?\n'
)
# ?TODO, check against venue closure hours
# if/when provided by backend?
await tractor.pause()
expected_dur: Interval = ( expected_dur: Interval = (
last_start_dt.subtract( last_start_dt.subtract(
@ -581,10 +612,11 @@ async def start_backfill(
'0 BARS TO PUSH after diff!?\n' '0 BARS TO PUSH after diff!?\n'
f'{next_start_dt} -> {last_start_dt}' f'{next_start_dt} -> {last_start_dt}'
) )
await tractor.pause()
# Check if we're about to exceed buffer capacity BEFORE # Check if we're about to exceed buffer capacity BEFORE
# attempting the push # attempting the push
if next_prepend_index - ln < 0: if (next_prepend_index - ln) < 0:
log.warning( log.warning(
f'Backfill would exceed buffer capacity!\n' f'Backfill would exceed buffer capacity!\n'
f'next_prepend_index: {next_prepend_index}\n' f'next_prepend_index: {next_prepend_index}\n'
@ -655,7 +687,7 @@ async def start_backfill(
}, },
}) })
# can't push the entire frame? so # XXX, can't push the entire frame? so
# push only the amount that can fit.. # push only the amount that can fit..
break break
@ -715,8 +747,8 @@ async def start_backfill(
) = dedupe(df) ) = dedupe(df)
if diff: if diff:
log.warning( log.warning(
f'Found {diff} duplicates in tsdb, ' f'Found {diff!r} duplicates in tsdb! '
f'overwriting with deduped data\n' f'=> Overwriting with `deduped` data !! <=\n'
) )
await storage.write_ohlcv( await storage.write_ohlcv(
col_sym_key, col_sym_key,

View File

@ -1031,12 +1031,83 @@ class Viz(Struct):
# track downsampled state # track downsampled state
self._in_ds = r._in_ds self._in_ds = r._in_ds
# XXX: reposition annotations after graphics update
# to ensure alignment with (potentially changed) data coords
if should_redraw or force_redraw:
n = self._reposition_annotations()
if n:
profiler(f'repositioned {n} annotations')
return ( return (
True, True,
(ivl, ivr), (ivl, ivr),
graphics, graphics,
) )
# class-level cache for tracking last repositioned index range
# to avoid redundant repositioning when shm hasn't changed
_annot_index_cache: dict[str, tuple[int, int]] = {}
def _reposition_annotations(
self,
force: bool = False,
) -> int:
'''
Reposition all annotations (arrows, text, rects) that have
stored absolute coordinates to ensure they stay aligned
with viz data after updates/redraws.
Only repositions if shm index range has changed since last
reposition, unless `force=True`.
'''
# check if shm index range changed
arr = self.shm.array
if not arr.size:
return 0
ifirst = arr[0]['index']
ilast = arr[-1]['index']
index_range = (ifirst, ilast)
# skip if range unchanged (unless forced)
cache_key: str = self.name
last_range = self._annot_index_cache.get(cache_key)
if (
not force
and last_range is not None
and last_range == index_range
):
return 0
# cache current range
self._annot_index_cache[cache_key] = index_range
n_repositioned: int = 0
for item in self.plot.items:
# arrows and text items use abs x,y coords
if (
hasattr(item, '_abs_x')
and
hasattr(item, '_abs_y')
):
item.setPos(
item._abs_x,
item._abs_y,
)
n_repositioned += 1
# rects use method + kwargs
elif (
hasattr(item, '_meth')
and
hasattr(item, '_kwargs')
):
getattr(item, item._meth)(**item._kwargs)
n_repositioned += 1
return n_repositioned
def reset_graphics( def reset_graphics(
self, self,
@ -1070,6 +1141,14 @@ class Viz(Struct):
self.update_graphics(force_redraw=True) self.update_graphics(force_redraw=True)
self._mxmn_cache_enabled = True self._mxmn_cache_enabled = True
# reposition annotations to stay aligned after reset
# (force=True since reset always changes coordinate system)
n = self._reposition_annotations(force=True)
if n:
log.info(
f'Repositioned {n} annotation(s) after reset'
)
def draw_last( def draw_last(
self, self,
array_key: str | None = None, array_key: str | None = None,

View File

@ -211,9 +211,9 @@ async def increment_history_view(
): ):
hist_chart: ChartPlotWidget = ds.hist_chart hist_chart: ChartPlotWidget = ds.hist_chart
hist_viz: Viz = ds.hist_viz hist_viz: Viz = ds.hist_viz
# viz: Viz = ds.viz viz: Viz = ds.viz
assert 'hist' in hist_viz.shm.token['shm_name'] assert 'hist' in hist_viz.shm.token['shm_name']
# name: str = hist_viz.name name: str = hist_viz.name
# TODO: seems this is more reliable at keeping the slow # TODO: seems this is more reliable at keeping the slow
# chart incremented in view more correctly? # chart incremented in view more correctly?
@ -250,27 +250,27 @@ async def increment_history_view(
# - samplerd could emit the actual update range via # - samplerd could emit the actual update range via
# tuple and then we only enter the below block if that # tuple and then we only enter the below block if that
# range is detected as in-view? # range is detected as in-view?
# match msg: match msg:
# case { case {
# 'backfilling': (viz_name, timeframe), 'backfilling': (viz_name, timeframe),
# } if ( } if (
# viz_name == name viz_name == name
# ): ):
# log.warning( log.warning(
# f'Forcing HARD REDRAW:\n' f'Forcing HARD REDRAW:\n'
# f'name: {name}\n' f'name: {name}\n'
# f'timeframe: {timeframe}\n' f'timeframe: {timeframe}\n'
# ) )
# # TODO: only allow this when the data is IN VIEW! # TODO: only allow this when the data is IN VIEW!
# # also, we probably can do this more efficiently # also, we probably can do this more efficiently
# # / smarter by only redrawing the portion of the # / smarter by only redrawing the portion of the
# # path necessary? # path necessary?
# { {
# 60: hist_viz, 60: hist_viz,
# 1: viz, 1: viz,
# }[timeframe].update_graphics( }[timeframe].update_graphics(
# force_redraw=True force_redraw=True
# ) )
# check if slow chart needs an x-domain shift and/or # check if slow chart needs an x-domain shift and/or
# y-range resize. # y-range resize.

View File

@ -169,7 +169,10 @@ class ArrowEditor(Struct):
f'{arrow!r}\n' f'{arrow!r}\n'
) )
for linked in self.godw.iter_linked(): for linked in self.godw.iter_linked():
linked.chart.plotItem.removeItem(arrow) if not (chart := linked.chart):
continue
chart.plotItem.removeItem(arrow)
try: try:
arrows.remove(arrow) arrows.remove(arrow)
except ValueError: except ValueError:

View File

@ -87,7 +87,11 @@ def update_fsp_chart(
# guard against unreadable case # guard against unreadable case
if not last_row: if not last_row:
log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}') log.warning(
f'Read-race on shm array,\n'
f'graphics_name: {graphics_name!r}\n'
f'shm.token: {shm.token}\n'
)
return return
# update graphics # update graphics
@ -203,7 +207,6 @@ async def open_fsp_actor_cluster(
async def run_fsp_ui( async def run_fsp_ui(
linkedsplits: LinkedSplits, linkedsplits: LinkedSplits,
flume: Flume, flume: Flume,
started: trio.Event, started: trio.Event,
@ -623,8 +626,10 @@ async def open_fsp_admin(
event.set() event.set()
# TODO, passing in `pikerd` related settings here!
# [ ] read in the `tractor` setting for `enable_transports: list`
# from the root `conf.toml`!
async def open_vlm_displays( async def open_vlm_displays(
linked: LinkedSplits, linked: LinkedSplits,
flume: Flume, flume: Flume,
dvlm: bool = True, dvlm: bool = True,
@ -634,12 +639,12 @@ async def open_vlm_displays(
) -> None: ) -> None:
''' '''
Volume subchart displays. Vlm (volume) subchart displays.
Since "volume" is often included directly alongside OHLCV price Since "volume" is often included directly alongside OHLCV price
data, we don't really need a separate FSP-actor + shm array for it data, we don't really need a separate FSP-actor + shm array for
since it's likely already directly adjacent to OHLC samples from the it since it's likely already directly adjacent to OHLC samples
data provider. from the data provider.
Further only if volume data is detected (it sometimes isn't provided Further only if volume data is detected (it sometimes isn't provided
eg. forex, certain commodities markets) will volume dependent FSPs eg. forex, certain commodities markets) will volume dependent FSPs