Compare commits
6 Commits
90b817eb69
...
50825f30b8
| Author | SHA1 | Date |
|---|---|---|
|
|
50825f30b8 | |
|
|
ed9c211b96 | |
|
|
f1b27e9696 | |
|
|
959d04024b | |
|
|
6f8a361e80 | |
|
|
2d678e1582 |
|
|
@ -275,9 +275,15 @@ async def open_history_client(
|
|||
f'{times}'
|
||||
)
|
||||
|
||||
# XXX, debug any case where the latest 1m bar we get is
|
||||
# already another "sample's-step-old"..
|
||||
if end_dt is None:
|
||||
inow: int = round(time.time())
|
||||
if (inow - times[-1]) > 60:
|
||||
if (
|
||||
_time_step := (inow - times[-1])
|
||||
>
|
||||
timeframe * 2
|
||||
):
|
||||
await tractor.pause()
|
||||
|
||||
start_dt = from_timestamp(times[0])
|
||||
|
|
|
|||
|
|
@ -447,7 +447,13 @@ def ldshm(
|
|||
)
|
||||
# last chance manual overwrites in REPL
|
||||
# await tractor.pause()
|
||||
assert aids
|
||||
if not aids:
|
||||
log.warning(
|
||||
f'No gaps were found !?\n'
|
||||
f'fqme: {fqme!r}\n'
|
||||
f'timeframe: {period_s!r}\n'
|
||||
f"WELL THAT'S GOOD NOOZ!\n"
|
||||
)
|
||||
tf2aids[period_s] = aids
|
||||
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -49,6 +49,7 @@ from pendulum import (
|
|||
Duration,
|
||||
duration as mk_duration,
|
||||
from_timestamp,
|
||||
timezone,
|
||||
)
|
||||
import numpy as np
|
||||
import polars as pl
|
||||
|
|
@ -57,9 +58,7 @@ from piker.brokers import NoData
|
|||
from piker.accounting import (
|
||||
MktPair,
|
||||
)
|
||||
from piker.data._util import (
|
||||
log,
|
||||
)
|
||||
from piker.log import get_logger
|
||||
from ..data._sharedmem import (
|
||||
maybe_open_shm_array,
|
||||
ShmArray,
|
||||
|
|
@ -97,6 +96,9 @@ if TYPE_CHECKING:
|
|||
# from .feed import _FeedsBus
|
||||
|
||||
|
||||
log = get_logger()
|
||||
|
||||
|
||||
# `ShmArray` buffer sizing configuration:
|
||||
_mins_in_day = int(60 * 24)
|
||||
# how much is probably dependent on lifestyle
|
||||
|
|
@ -401,7 +403,9 @@ async def start_backfill(
|
|||
|
||||
# based on the sample step size, maybe load a certain amount history
|
||||
update_start_on_prepend: bool = False
|
||||
if backfill_until_dt is None:
|
||||
if (
|
||||
_until_was_none := (backfill_until_dt is None)
|
||||
):
|
||||
|
||||
# TODO: per-provider default history-durations?
|
||||
# -[ ] inside the `open_history_client()` config allow
|
||||
|
|
@ -435,6 +439,8 @@ async def start_backfill(
|
|||
last_start_dt: datetime = backfill_from_dt
|
||||
next_prepend_index: int = backfill_from_shm_index
|
||||
|
||||
est = timezone('EST')
|
||||
|
||||
while last_start_dt > backfill_until_dt:
|
||||
log.info(
|
||||
f'Requesting {timeframe}s frame:\n'
|
||||
|
|
@ -448,9 +454,10 @@ async def start_backfill(
|
|||
next_end_dt,
|
||||
) = await get_hist(
|
||||
timeframe,
|
||||
end_dt=last_start_dt,
|
||||
end_dt=(end_dt_param := last_start_dt),
|
||||
)
|
||||
except NoData as _daterr:
|
||||
except NoData as nodata:
|
||||
_nodata = nodata
|
||||
orig_last_start_dt: datetime = last_start_dt
|
||||
gap_report: str = (
|
||||
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
|
||||
|
|
@ -518,8 +525,32 @@ async def start_backfill(
|
|||
==
|
||||
next_start_dt.timestamp()
|
||||
)
|
||||
assert (
|
||||
(last_time := time[-1])
|
||||
==
|
||||
next_end_dt.timestamp()
|
||||
)
|
||||
|
||||
assert time[-1] == next_end_dt.timestamp()
|
||||
frame_last_dt = from_timestamp(last_time)
|
||||
if (
|
||||
frame_last_dt.add(seconds=timeframe)
|
||||
<
|
||||
end_dt_param
|
||||
):
|
||||
est_frame_last_dt = est.convert(frame_last_dt)
|
||||
est_end_dt_param = est.convert(end_dt_param)
|
||||
log.warning(
|
||||
f'Provider frame ending BEFORE requested end_dt={end_dt_param} ??\n'
|
||||
f'frame_last_dt (EST): {est_frame_last_dt!r}\n'
|
||||
f'end_dt_param (EST): {est_end_dt_param!r}\n'
|
||||
f'\n'
|
||||
f'Likely contains,\n'
|
||||
f'- a venue closure.\n'
|
||||
f'- (maybe?) missing data ?\n'
|
||||
)
|
||||
# ?TODO, check against venue closure hours
|
||||
# if/when provided by backend?
|
||||
await tractor.pause()
|
||||
|
||||
expected_dur: Interval = (
|
||||
last_start_dt.subtract(
|
||||
|
|
@ -581,10 +612,11 @@ async def start_backfill(
|
|||
'0 BARS TO PUSH after diff!?\n'
|
||||
f'{next_start_dt} -> {last_start_dt}'
|
||||
)
|
||||
await tractor.pause()
|
||||
|
||||
# Check if we're about to exceed buffer capacity BEFORE
|
||||
# attempting the push
|
||||
if next_prepend_index - ln < 0:
|
||||
if (next_prepend_index - ln) < 0:
|
||||
log.warning(
|
||||
f'Backfill would exceed buffer capacity!\n'
|
||||
f'next_prepend_index: {next_prepend_index}\n'
|
||||
|
|
@ -655,7 +687,7 @@ async def start_backfill(
|
|||
},
|
||||
})
|
||||
|
||||
# can't push the entire frame? so
|
||||
# XXX, can't push the entire frame? so
|
||||
# push only the amount that can fit..
|
||||
break
|
||||
|
||||
|
|
@ -715,8 +747,8 @@ async def start_backfill(
|
|||
) = dedupe(df)
|
||||
if diff:
|
||||
log.warning(
|
||||
f'Found {diff} duplicates in tsdb, '
|
||||
f'overwriting with deduped data\n'
|
||||
f'Found {diff!r} duplicates in tsdb! '
|
||||
f'=> Overwriting with `deduped` data !! <=\n'
|
||||
)
|
||||
await storage.write_ohlcv(
|
||||
col_sym_key,
|
||||
|
|
|
|||
|
|
@ -1031,12 +1031,83 @@ class Viz(Struct):
|
|||
# track downsampled state
|
||||
self._in_ds = r._in_ds
|
||||
|
||||
# XXX: reposition annotations after graphics update
|
||||
# to ensure alignment with (potentially changed) data coords
|
||||
if should_redraw or force_redraw:
|
||||
n = self._reposition_annotations()
|
||||
if n:
|
||||
profiler(f'repositioned {n} annotations')
|
||||
|
||||
return (
|
||||
True,
|
||||
(ivl, ivr),
|
||||
graphics,
|
||||
)
|
||||
|
||||
# class-level cache for tracking last repositioned index range
|
||||
# to avoid redundant repositioning when shm hasn't changed
|
||||
_annot_index_cache: dict[str, tuple[int, int]] = {}
|
||||
|
||||
def _reposition_annotations(
|
||||
self,
|
||||
force: bool = False,
|
||||
) -> int:
|
||||
'''
|
||||
Reposition all annotations (arrows, text, rects) that have
|
||||
stored absolute coordinates to ensure they stay aligned
|
||||
with viz data after updates/redraws.
|
||||
|
||||
Only repositions if shm index range has changed since last
|
||||
reposition, unless `force=True`.
|
||||
|
||||
'''
|
||||
# check if shm index range changed
|
||||
arr = self.shm.array
|
||||
if not arr.size:
|
||||
return 0
|
||||
|
||||
ifirst = arr[0]['index']
|
||||
ilast = arr[-1]['index']
|
||||
index_range = (ifirst, ilast)
|
||||
|
||||
# skip if range unchanged (unless forced)
|
||||
cache_key: str = self.name
|
||||
last_range = self._annot_index_cache.get(cache_key)
|
||||
if (
|
||||
not force
|
||||
and last_range is not None
|
||||
and last_range == index_range
|
||||
):
|
||||
return 0
|
||||
|
||||
# cache current range
|
||||
self._annot_index_cache[cache_key] = index_range
|
||||
|
||||
n_repositioned: int = 0
|
||||
for item in self.plot.items:
|
||||
# arrows and text items use abs x,y coords
|
||||
if (
|
||||
hasattr(item, '_abs_x')
|
||||
and
|
||||
hasattr(item, '_abs_y')
|
||||
):
|
||||
item.setPos(
|
||||
item._abs_x,
|
||||
item._abs_y,
|
||||
)
|
||||
n_repositioned += 1
|
||||
|
||||
# rects use method + kwargs
|
||||
elif (
|
||||
hasattr(item, '_meth')
|
||||
and
|
||||
hasattr(item, '_kwargs')
|
||||
):
|
||||
getattr(item, item._meth)(**item._kwargs)
|
||||
n_repositioned += 1
|
||||
|
||||
return n_repositioned
|
||||
|
||||
def reset_graphics(
|
||||
self,
|
||||
|
||||
|
|
@ -1070,6 +1141,14 @@ class Viz(Struct):
|
|||
self.update_graphics(force_redraw=True)
|
||||
self._mxmn_cache_enabled = True
|
||||
|
||||
# reposition annotations to stay aligned after reset
|
||||
# (force=True since reset always changes coordinate system)
|
||||
n = self._reposition_annotations(force=True)
|
||||
if n:
|
||||
log.info(
|
||||
f'Repositioned {n} annotation(s) after reset'
|
||||
)
|
||||
|
||||
def draw_last(
|
||||
self,
|
||||
array_key: str | None = None,
|
||||
|
|
|
|||
|
|
@ -211,9 +211,9 @@ async def increment_history_view(
|
|||
):
|
||||
hist_chart: ChartPlotWidget = ds.hist_chart
|
||||
hist_viz: Viz = ds.hist_viz
|
||||
# viz: Viz = ds.viz
|
||||
viz: Viz = ds.viz
|
||||
assert 'hist' in hist_viz.shm.token['shm_name']
|
||||
# name: str = hist_viz.name
|
||||
name: str = hist_viz.name
|
||||
|
||||
# TODO: seems this is more reliable at keeping the slow
|
||||
# chart incremented in view more correctly?
|
||||
|
|
@ -250,27 +250,27 @@ async def increment_history_view(
|
|||
# - samplerd could emit the actual update range via
|
||||
# tuple and then we only enter the below block if that
|
||||
# range is detected as in-view?
|
||||
# match msg:
|
||||
# case {
|
||||
# 'backfilling': (viz_name, timeframe),
|
||||
# } if (
|
||||
# viz_name == name
|
||||
# ):
|
||||
# log.warning(
|
||||
# f'Forcing HARD REDRAW:\n'
|
||||
# f'name: {name}\n'
|
||||
# f'timeframe: {timeframe}\n'
|
||||
# )
|
||||
# # TODO: only allow this when the data is IN VIEW!
|
||||
# # also, we probably can do this more efficiently
|
||||
# # / smarter by only redrawing the portion of the
|
||||
# # path necessary?
|
||||
# {
|
||||
# 60: hist_viz,
|
||||
# 1: viz,
|
||||
# }[timeframe].update_graphics(
|
||||
# force_redraw=True
|
||||
# )
|
||||
match msg:
|
||||
case {
|
||||
'backfilling': (viz_name, timeframe),
|
||||
} if (
|
||||
viz_name == name
|
||||
):
|
||||
log.warning(
|
||||
f'Forcing HARD REDRAW:\n'
|
||||
f'name: {name}\n'
|
||||
f'timeframe: {timeframe}\n'
|
||||
)
|
||||
# TODO: only allow this when the data is IN VIEW!
|
||||
# also, we probably can do this more efficiently
|
||||
# / smarter by only redrawing the portion of the
|
||||
# path necessary?
|
||||
{
|
||||
60: hist_viz,
|
||||
1: viz,
|
||||
}[timeframe].update_graphics(
|
||||
force_redraw=True
|
||||
)
|
||||
|
||||
# check if slow chart needs an x-domain shift and/or
|
||||
# y-range resize.
|
||||
|
|
|
|||
|
|
@ -169,7 +169,10 @@ class ArrowEditor(Struct):
|
|||
f'{arrow!r}\n'
|
||||
)
|
||||
for linked in self.godw.iter_linked():
|
||||
linked.chart.plotItem.removeItem(arrow)
|
||||
if not (chart := linked.chart):
|
||||
continue
|
||||
|
||||
chart.plotItem.removeItem(arrow)
|
||||
try:
|
||||
arrows.remove(arrow)
|
||||
except ValueError:
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ log = get_logger(__name__)
|
|||
def update_fsp_chart(
|
||||
viz,
|
||||
graphics_name: str,
|
||||
array_key: str | None,
|
||||
array_key: str|None,
|
||||
**kwargs,
|
||||
|
||||
) -> None:
|
||||
|
|
@ -87,7 +87,11 @@ def update_fsp_chart(
|
|||
|
||||
# guard against unreadable case
|
||||
if not last_row:
|
||||
log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
|
||||
log.warning(
|
||||
f'Read-race on shm array,\n'
|
||||
f'graphics_name: {graphics_name!r}\n'
|
||||
f'shm.token: {shm.token}\n'
|
||||
)
|
||||
return
|
||||
|
||||
# update graphics
|
||||
|
|
@ -203,7 +207,6 @@ async def open_fsp_actor_cluster(
|
|||
|
||||
|
||||
async def run_fsp_ui(
|
||||
|
||||
linkedsplits: LinkedSplits,
|
||||
flume: Flume,
|
||||
started: trio.Event,
|
||||
|
|
@ -471,7 +474,7 @@ class FspAdmin:
|
|||
target: Fsp,
|
||||
conf: dict[str, dict[str, Any]],
|
||||
|
||||
worker_name: str | None = None,
|
||||
worker_name: str|None = None,
|
||||
loglevel: str = 'info',
|
||||
|
||||
) -> (Flume, trio.Event):
|
||||
|
|
@ -623,8 +626,10 @@ async def open_fsp_admin(
|
|||
event.set()
|
||||
|
||||
|
||||
# TODO, passing in `pikerd` related settings here!
|
||||
# [ ] read in the `tractor` setting for `enable_transports: list`
|
||||
# from the root `conf.toml`!
|
||||
async def open_vlm_displays(
|
||||
|
||||
linked: LinkedSplits,
|
||||
flume: Flume,
|
||||
dvlm: bool = True,
|
||||
|
|
@ -634,12 +639,12 @@ async def open_vlm_displays(
|
|||
|
||||
) -> None:
|
||||
'''
|
||||
Volume subchart displays.
|
||||
Vlm (volume) subchart displays.
|
||||
|
||||
Since "volume" is often included directly alongside OHLCV price
|
||||
data, we don't really need a separate FSP-actor + shm array for it
|
||||
since it's likely already directly adjacent to OHLC samples from the
|
||||
data provider.
|
||||
data, we don't really need a separate FSP-actor + shm array for
|
||||
it since it's likely already directly adjacent to OHLC samples
|
||||
from the data provider.
|
||||
|
||||
Further only if volume data is detected (it sometimes isn't provided
|
||||
eg. forex, certain commodities markets) will volume dependent FSPs
|
||||
|
|
|
|||
Loading…
Reference in New Issue