Compare commits

..

2 Commits

12 changed files with 100 additions and 211 deletions

View File

@ -275,15 +275,9 @@ async def open_history_client(
f'{times}'
)
# XXX, debug any case where the latest 1m bar we get is
# already another "sample's-step-old"..
if end_dt is None:
inow: int = round(time.time())
if (
_time_step := (inow - times[-1])
>
timeframe * 2
):
if (inow - times[-1]) > 60:
await tractor.pause()
start_dt = from_timestamp(times[0])

View File

@ -1115,7 +1115,6 @@ async def stream_quotes(
con: Contract = details.contract
first_ticker: Ticker|None = None
first_quote: dict[str, Any] = {}
timeout: float = 1.6
with trio.move_on_after(timeout) as quote_cs:
@ -1168,14 +1167,15 @@ async def stream_quotes(
first_quote,
))
# it's not really live but this will unblock
# the brokerd feed task to tell the ui to update?
feed_is_live.set()
# block and let data history backfill code run.
# XXX obvi given the venue is closed, we never expect feed
# to come up; a taskc should be the only way to
# terminate this task.
await trio.sleep_forever()
#
# ^^XXX^^TODO! INSTEAD impl a `trio.sleep()` for the
# duration until the venue opens!!
# ?TODO, we could instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this
@ -1199,9 +1199,6 @@ async def stream_quotes(
'Rxed init quote:\n'
f'{pformat(first_quote)}'
)
# signal `.data.feed` layer that mkt quotes are LIVE
feed_is_live.set()
cs: trio.CancelScope|None = None
startup: bool = True
iter_quotes: trio.abc.Channel
@ -1255,6 +1252,7 @@ async def stream_quotes(
# tick.
ticker = await iter_quotes.receive()
quote = normalize(ticker)
feed_is_live.set()
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})

View File

@ -80,20 +80,20 @@ class Sampler:
This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see
`.service.maybe_open_samplerd()` and the below
`register_with_sampler()`.
``.service.maybe_open_samplerd()`` and the below
``register_with_sampler()``.
'''
service_nursery: None|trio.Nursery = None
service_nursery: None | trio.Nursery = None
# TODO: we could stick these in a composed type to avoid angering
# the "i hate module scoped variables crowd" (yawn).
# TODO: we could stick these in a composed type to avoid
# angering the "i hate module scoped variables crowd" (yawn).
ohlcv_shms: dict[float, list[ShmArray]] = {}
# holds one-task-per-sample-period tasks which are spawned as-needed by
# data feed requests with a given detected time step usually from
# history loading.
incr_task_cs: trio.CancelScope|None = None
incr_task_cs: trio.CancelScope | None = None
bcast_errors: tuple[Exception] = (
trio.BrokenResourceError,
@ -248,8 +248,8 @@ class Sampler:
async def broadcast(
self,
period_s: float,
time_stamp: float|None = None,
info: dict|None = None,
time_stamp: float | None = None,
info: dict | None = None,
) -> None:
'''
@ -313,7 +313,7 @@ class Sampler:
@classmethod
async def broadcast_all(
self,
info: dict|None = None,
info: dict | None = None,
) -> None:
# NOTE: take a copy of subs since removals can happen
@ -330,12 +330,12 @@ class Sampler:
async def register_with_sampler(
ctx: Context,
period_s: float,
shms_by_period: dict[float, dict]|None = None,
shms_by_period: dict[float, dict] | None = None,
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates?
) -> set[int]:
) -> None:
get_console_log(tractor.current_actor().loglevel)
incr_was_started: bool = False
@ -362,12 +362,7 @@ async def register_with_sampler(
# insert the base 1s period (for OHLC style sampling) into
# the increment buffer set to update and shift every second.
if (
shms_by_period is not None
# and
# feed_is_live.is_set()
# ^TODO? pass it in instead?
):
if shms_by_period is not None:
from ._sharedmem import (
attach_shm_array,
_Token,
@ -381,17 +376,12 @@ async def register_with_sampler(
readonly=False,
)
shms_by_period[period] = shm
Sampler.ohlcv_shms.setdefault(
period,
[],
).append(shm)
Sampler.ohlcv_shms.setdefault(period, []).append(shm)
assert Sampler.ohlcv_shms
# unblock caller
await ctx.started(
set(Sampler.ohlcv_shms.keys())
)
await ctx.started(set(Sampler.ohlcv_shms.keys()))
if open_index_stream:
try:
@ -437,7 +427,7 @@ async def register_with_sampler(
async def spawn_samplerd(
loglevel: str|None = None,
loglevel: str | None = None,
**extra_tractor_kwargs
) -> bool:
@ -483,7 +473,7 @@ async def spawn_samplerd(
@acm
async def maybe_open_samplerd(
loglevel: str|None = None,
loglevel: str | None = None,
**pikerd_kwargs,
) -> tractor.Portal: # noqa
@ -508,11 +498,11 @@ async def maybe_open_samplerd(
@acm
async def open_sample_stream(
period_s: float,
shms_by_period: dict[float, dict]|None = None,
shms_by_period: dict[float, dict] | None = None,
open_index_stream: bool = True,
sub_for_broadcasts: bool = True,
cache_key: str|None = None,
cache_key: str | None = None,
allow_new_sampler: bool = True,
ensure_is_active: bool = False,
@ -543,8 +533,6 @@ async def open_sample_stream(
# yield bistream
# else:
ctx: tractor.Context
shm_periods: set[int] # in `int`-seconds
async with (
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
@ -559,10 +547,10 @@ async def open_sample_stream(
'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts,
},
) as (ctx, shm_periods)
) as (ctx, first)
):
if ensure_is_active:
assert len(shm_periods) > 1
assert len(first) > 1
async with (
ctx.open_stream(

View File

@ -447,13 +447,7 @@ def ldshm(
)
# last chance manual overwrites in REPL
# await tractor.pause()
if not aids:
log.warning(
f'No gaps were found !?\n'
f'fqme: {fqme!r}\n'
f'timeframe: {period_s!r}\n'
f"WELL THAT'S GOOD NOOZ!\n"
)
assert aids
tf2aids[period_s] = aids
else:

View File

@ -49,7 +49,6 @@ from pendulum import (
Duration,
duration as mk_duration,
from_timestamp,
timezone,
)
import numpy as np
import polars as pl
@ -58,7 +57,9 @@ from piker.brokers import NoData
from piker.accounting import (
MktPair,
)
from piker.log import get_logger
from piker.data._util import (
log,
)
from ..data._sharedmem import (
maybe_open_shm_array,
ShmArray,
@ -96,9 +97,6 @@ if TYPE_CHECKING:
# from .feed import _FeedsBus
log = get_logger()
# `ShmArray` buffer sizing configuration:
_mins_in_day = int(60 * 24)
# how much is probably dependent on lifestyle
@ -249,11 +247,6 @@ async def maybe_fill_null_segments(
from_timestamp(array['time'][0])
) < backfill_until_dt
):
log.error(
f'Invalid frame_start !?\n'
f'frame_start_dt: {frame_start_dt!r}\n'
f'backfill_until_dt: {backfill_until_dt!r}\n'
)
await tractor.pause()
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
@ -403,9 +396,7 @@ async def start_backfill(
# based on the sample step size, maybe load a certain amount history
update_start_on_prepend: bool = False
if (
_until_was_none := (backfill_until_dt is None)
):
if backfill_until_dt is None:
# TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow
@ -439,8 +430,6 @@ async def start_backfill(
last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index
est = timezone('EST')
while last_start_dt > backfill_until_dt:
log.info(
f'Requesting {timeframe}s frame:\n'
@ -454,10 +443,9 @@ async def start_backfill(
next_end_dt,
) = await get_hist(
timeframe,
end_dt=(end_dt_param := last_start_dt),
end_dt=last_start_dt,
)
except NoData as nodata:
_nodata = nodata
except NoData as _daterr:
orig_last_start_dt: datetime = last_start_dt
gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
@ -525,32 +513,8 @@ async def start_backfill(
==
next_start_dt.timestamp()
)
assert (
(last_time := time[-1])
==
next_end_dt.timestamp()
)
frame_last_dt = from_timestamp(last_time)
if (
frame_last_dt.add(seconds=timeframe)
<
end_dt_param
):
est_frame_last_dt = est.convert(frame_last_dt)
est_end_dt_param = est.convert(end_dt_param)
log.warning(
f'Provider frame ending BEFORE requested end_dt={end_dt_param} ??\n'
f'frame_last_dt (EST): {est_frame_last_dt!r}\n'
f'end_dt_param (EST): {est_end_dt_param!r}\n'
f'\n'
f'Likely contains,\n'
f'- a venue closure.\n'
f'- (maybe?) missing data ?\n'
)
# ?TODO, check against venue closure hours
# if/when provided by backend?
await tractor.pause()
assert time[-1] == next_end_dt.timestamp()
expected_dur: Interval = (
last_start_dt.subtract(
@ -612,11 +576,10 @@ async def start_backfill(
'0 BARS TO PUSH after diff!?\n'
f'{next_start_dt} -> {last_start_dt}'
)
await tractor.pause()
# Check if we're about to exceed buffer capacity BEFORE
# attempting the push
if (next_prepend_index - ln) < 0:
if next_prepend_index - ln < 0:
log.warning(
f'Backfill would exceed buffer capacity!\n'
f'next_prepend_index: {next_prepend_index}\n'
@ -687,7 +650,7 @@ async def start_backfill(
},
})
# XXX, can't push the entire frame? so
# can't push the entire frame? so
# push only the amount that can fit..
break
@ -747,8 +710,8 @@ async def start_backfill(
) = dedupe(df)
if diff:
log.warning(
f'Found {diff!r} duplicates in tsdb! '
f'=> Overwriting with `deduped` data !! <=\n'
f'Found {diff} duplicates in tsdb, '
f'overwriting with deduped data\n'
)
await storage.write_ohlcv(
col_sym_key,
@ -1321,7 +1284,6 @@ async def manage_history(
some_data_ready: trio.Event,
feed_is_live: trio.Event,
timeframe: float = 60, # in seconds
wait_for_live_timeout: float = 0.5,
task_status: TaskStatus[
tuple[ShmArray, ShmArray]
@ -1470,26 +1432,12 @@ async def manage_history(
1: rt_shm,
60: hist_shm,
}
shms_by_period: dict|None = None
with trio.move_on_after(wait_for_live_timeout) as cs:
await feed_is_live.wait()
if cs.cancelled_caught:
log.warning(
f'No live feed within {wait_for_live_timeout!r}s\n'
f'fqme: {mkt.fqme!r}\n'
f'NOT activating shm-buffer-sampler!!\n'
)
if feed_is_live.is_set():
shms_by_period: dict[int, dict] = {
1.: rt_shm.token,
60.: hist_shm.token,
}
async with open_sample_stream(
period_s=1.,
shms_by_period=shms_by_period,
shms_by_period={
1.: rt_shm.token,
60.: hist_shm.token,
},
# NOTE: we want to only open a stream for doing
# broadcasts on backfill operations, not receive the

View File

@ -27,15 +27,15 @@ import trio
from piker.ui.qt import (
QEvent,
)
from . import _chart
from . import _event
from . import _search
from ..accounting import unpack_fqme
from ..data._symcache import open_symcache
from ..data.feed import install_brokerd_search
from ..log import get_logger
from ..service import maybe_spawn_brokerd
from . import _event
from ._exec import run_qtractor
from ..data.feed import install_brokerd_search
from ..data._symcache import open_symcache
from ..accounting import unpack_fqme
from . import _search
from ._chart import GodWidget
from ..log import get_logger
log = get_logger(__name__)
@ -73,8 +73,8 @@ async def load_provider_search(
async def _async_main(
# implicit required argument provided by `qtractor_run()`
main_widget: _chart.GodWidget,
# implicit required argument provided by ``qtractor_run()``
main_widget: GodWidget,
syms: list[str],
brokers: dict[str, ModuleType],
@ -87,9 +87,6 @@ async def _async_main(
Provision the "main" widget with initial symbol data and root nursery.
"""
# set as singleton
_chart._godw = main_widget
from . import _display
from ._pg_overrides import _do_overrides
_do_overrides()
@ -204,6 +201,6 @@ def _main(
brokermods,
piker_loglevel,
),
main_widget_type=_chart.GodWidget,
main_widget_type=GodWidget,
tractor_kwargs=tractor_kwargs,
)

View File

@ -82,25 +82,6 @@ if TYPE_CHECKING:
log = get_logger(__name__)
_godw: GodWidget|None = None
def get_godw() -> GodWidget:
'''
Get the top level "god widget", the root/central-most Qt
widget-object set as `QMainWindow.setCentralWidget(_godw)`.
See `piker.ui._exec` for the runtime init details and all the
machinery for running `trio` on the Qt event loop in guest mode.
'''
if _godw is None:
raise RuntimeError(
'No god-widget initialized ??\n'
'Have you called `run_qtractor()` yet?\n'
)
return _godw
class GodWidget(QWidget):
'''
"Our lord and savior, the holy child of window-shua, there is no
@ -123,7 +104,7 @@ class GodWidget(QWidget):
super().__init__(parent)
self.search: SearchWidget|None = None
self.search: SearchWidget | None = None
self.hbox = QHBoxLayout(self)
self.hbox.setContentsMargins(0, 0, 0, 0)
@ -142,9 +123,9 @@ class GodWidget(QWidget):
tuple[LinkedSplits, LinkedSplits],
] = {}
self.hist_linked: LinkedSplits|None = None
self.rt_linked: LinkedSplits|None = None
self._active_cursor: Cursor|None = None
self.hist_linked: LinkedSplits | None = None
self.rt_linked: LinkedSplits | None = None
self._active_cursor: Cursor | None = None
# assigned in the startup func `_async_main()`
self._root_n: trio.Nursery = None
@ -388,9 +369,9 @@ class ChartnPane(QFrame):
https://doc.qt.io/qt-5/qwidget.html#composite-widgets
'''
sidepane: FieldsForm|SearchWidget
sidepane: FieldsForm | SearchWidget
hbox: QHBoxLayout
chart: ChartPlotWidget|None = None
chart: ChartPlotWidget | None = None
def __init__(
self,
@ -406,13 +387,13 @@ class ChartnPane(QFrame):
self.chart = None
hbox = self.hbox = QHBoxLayout(self)
hbox.setAlignment(Qt.AlignTop|Qt.AlignLeft)
hbox.setAlignment(Qt.AlignTop | Qt.AlignLeft)
hbox.setContentsMargins(0, 0, 0, 0)
hbox.setSpacing(3)
def set_sidepane(
self,
sidepane: FieldsForm|SearchWidget,
sidepane: FieldsForm | SearchWidget,
) -> None:
# add sidepane **after** chart; place it on axis side
@ -423,7 +404,7 @@ class ChartnPane(QFrame):
self._sidepane = sidepane
@property
def sidepane(self) -> FieldsForm|SearchWidget:
def sidepane(self) -> FieldsForm | SearchWidget:
return self._sidepane
@ -469,7 +450,7 @@ class LinkedSplits(QWidget):
# chart-local graphics state that can be passed to
# a ``graphic_update_cycle()`` call by any task wishing to
# update the UI for a given "chart instance".
self.display_state: DisplayState|None = None
self.display_state: DisplayState | None = None
self._mkt: MktPair = None
@ -505,7 +486,7 @@ class LinkedSplits(QWidget):
def set_split_sizes(
self,
prop: float|None = None,
prop: float | None = None,
) -> None:
'''
@ -586,8 +567,8 @@ class LinkedSplits(QWidget):
# style?
self.chart.setFrameStyle(
QFrame.Shape.StyledPanel
|QFrame.Shadow.Plain
QFrame.Shape.StyledPanel |
QFrame.Shadow.Plain
)
return self.chart
@ -599,11 +580,11 @@ class LinkedSplits(QWidget):
shm: ShmArray,
flume: Flume,
array_key: str|None = None,
array_key: str | None = None,
style: str = 'line',
_is_main: bool = False,
sidepane: QWidget|None = None,
sidepane: QWidget | None = None,
draw_kwargs: dict = {},
**cpw_kwargs,
@ -706,7 +687,7 @@ class LinkedSplits(QWidget):
cpw.plotItem.vb.linked = self
cpw.setFrameStyle(
QFrame.Shape.StyledPanel
# |QFrame.Shadow.Plain
# | QFrame.Shadow.Plain
)
# don't show the little "autoscale" A label.
@ -819,7 +800,7 @@ class LinkedSplits(QWidget):
def resize_sidepanes(
self,
from_linked: LinkedSplits|None = None,
from_linked: LinkedSplits | None = None,
) -> None:
'''
@ -893,7 +874,7 @@ class ChartPlotWidget(pg.PlotWidget):
# TODO: load from config
use_open_gl: bool = False,
static_yrange: tuple[float, float]|None = None,
static_yrange: tuple[float, float] | None = None,
parent=None,
**kwargs,
@ -908,7 +889,7 @@ class ChartPlotWidget(pg.PlotWidget):
# NOTE: must be set bfore calling ``.mk_vb()``
self.linked = linkedsplits
self.sidepane: FieldsForm|None = None
self.sidepane: FieldsForm | None = None
# source of our custom interactions
self.cv = self.mk_vb(name)
@ -942,7 +923,7 @@ class ChartPlotWidget(pg.PlotWidget):
self.useOpenGL(use_open_gl)
self.name = name
self.data_key = data_key or name
self.qframe: ChartnPane|None = None
self.qframe: ChartnPane | None = None
# scene-local placeholder for book graphics
# sizing to avoid overlap with data contents
@ -953,7 +934,7 @@ class ChartPlotWidget(pg.PlotWidget):
# registry of overlay curve names
self._vizs: dict[str, Viz] = {}
self.feed: Feed|None = None
self.feed: Feed | None = None
self._labels = {} # registry of underlying graphics
self._ysticks = {} # registry of underlying graphics
@ -1046,7 +1027,7 @@ class ChartPlotWidget(pg.PlotWidget):
def increment_view(
self,
datums: int = 1,
vb: ChartView|None = None,
vb: ChartView | None = None,
) -> None:
'''
@ -1077,8 +1058,8 @@ class ChartPlotWidget(pg.PlotWidget):
def overlay_plotitem(
self,
name: str,
index: int|None = None,
axis_title: str|None = None,
index: int | None = None,
axis_title: str | None = None,
axis_side: str = 'right',
axis_kwargs: dict = {},
@ -1166,14 +1147,14 @@ class ChartPlotWidget(pg.PlotWidget):
shm: ShmArray,
flume: Flume,
array_key: str|None = None,
array_key: str | None = None,
overlay: bool = False,
color: str|None = None,
color: str | None = None,
add_label: bool = True,
pi: pg.PlotItem|None = None,
pi: pg.PlotItem | None = None,
step_mode: bool = False,
is_ohlc: bool = False,
add_sticky: None|str = 'right',
add_sticky: None | str = 'right',
**graphics_kwargs,
@ -1271,7 +1252,7 @@ class ChartPlotWidget(pg.PlotWidget):
# use the tick size precision for display
name = name or pi.name
mkt: MktPair = self.linked.mkt
digits: int|None = None
digits: int | None = None
if name in mkt.fqme:
digits = mkt.price_tick_digits
@ -1305,7 +1286,7 @@ class ChartPlotWidget(pg.PlotWidget):
shm: ShmArray,
flume: Flume,
array_key: str|None = None,
array_key: str | None = None,
**draw_curve_kwargs,
) -> Viz:

View File

@ -27,6 +27,7 @@ import pyqtgraph as pg
from piker.ui.qt import (
QtWidgets,
QGraphicsItem,
Qt,
QLineF,
QRectF,

View File

@ -169,10 +169,7 @@ class ArrowEditor(Struct):
f'{arrow!r}\n'
)
for linked in self.godw.iter_linked():
if not (chart := linked.chart):
continue
chart.plotItem.removeItem(arrow)
linked.chart.plotItem.removeItem(arrow)
try:
arrows.remove(arrow)
except ValueError:

View File

@ -91,10 +91,6 @@ def run_qtractor(
window_type: QMainWindow = None,
) -> None:
'''
Run the Qt event loop and embed `trio` via guest mode on it.
'''
# avoids annoying message when entering debugger from qt loop
pyqtRemoveInputHook()
@ -174,7 +170,7 @@ def run_qtractor(
# hook into app focus change events
app.focusChanged.connect(window.on_focus_change)
instance: GodWidget = main_widget_type()
instance = main_widget_type()
instance.window = window
# override tractor's defaults

View File

@ -73,7 +73,7 @@ log = get_logger(__name__)
def update_fsp_chart(
viz,
graphics_name: str,
array_key: str|None,
array_key: str | None,
**kwargs,
) -> None:
@ -87,11 +87,7 @@ def update_fsp_chart(
# guard against unreadable case
if not last_row:
log.warning(
f'Read-race on shm array,\n'
f'graphics_name: {graphics_name!r}\n'
f'shm.token: {shm.token}\n'
)
log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
return
# update graphics
@ -207,6 +203,7 @@ async def open_fsp_actor_cluster(
async def run_fsp_ui(
linkedsplits: LinkedSplits,
flume: Flume,
started: trio.Event,
@ -474,7 +471,7 @@ class FspAdmin:
target: Fsp,
conf: dict[str, dict[str, Any]],
worker_name: str|None = None,
worker_name: str | None = None,
loglevel: str = 'info',
) -> (Flume, trio.Event):
@ -626,10 +623,8 @@ async def open_fsp_admin(
event.set()
# TODO, passing in `pikerd` related settings here!
# [ ] read in the `tractor` setting for `enable_transports: list`
# from the root `conf.toml`!
async def open_vlm_displays(
linked: LinkedSplits,
flume: Flume,
dvlm: bool = True,
@ -639,12 +634,12 @@ async def open_vlm_displays(
) -> None:
'''
Vlm (volume) subchart displays.
Volume subchart displays.
Since "volume" is often included directly alongside OHLCV price
data, we don't really need a separate FSP-actor + shm array for
it since it's likely already directly adjacent to OHLC samples
from the data provider.
data, we don't really need a separate FSP-actor + shm array for it
since it's likely already directly adjacent to OHLC samples from the
data provider.
Further only if volume data is detected (it sometimes isn't provided
eg. forex, certain commodities markets) will volume dependent FSPs

View File

@ -61,9 +61,9 @@ class MultiStatus:
self,
msg: str,
final_msg: str|None = None,
final_msg: str | None = None,
clear_on_next: bool = False,
group_key: Union[bool, str]|None = False,
group_key: Union[bool, str] | None = False,
) -> Union[Callable[..., None], str]:
'''
@ -175,11 +175,11 @@ class MainWindow(QMainWindow):
self.setWindowTitle(self.title)
# set by runtime after `trio` is engaged.
self.godwidget: GodWidget|None = None
self.godwidget: GodWidget | None = None
self._status_bar: QStatusBar = None
self._status_label: QLabel = None
self._size: tuple[int, int]|None = None
self._size: tuple[int, int] | None = None
@property
def mode_label(self) -> QLabel:
@ -202,7 +202,7 @@ class MainWindow(QMainWindow):
label.setMargin(2)
label.setAlignment(
QtCore.Qt.AlignVCenter
|QtCore.Qt.AlignRight
| QtCore.Qt.AlignRight
)
self.statusBar().addPermanentWidget(label)
label.show()
@ -288,7 +288,7 @@ class MainWindow(QMainWindow):
def configure_to_desktop(
self,
size: tuple[int, int]|None = None,
size: tuple[int, int] | None = None,
) -> None:
'''