Compare commits

..

2 Commits

12 changed files with 100 additions and 211 deletions

View File

@ -275,15 +275,9 @@ async def open_history_client(
f'{times}' f'{times}'
) )
# XXX, debug any case where the latest 1m bar we get is
# already another "sample's-step-old"..
if end_dt is None: if end_dt is None:
inow: int = round(time.time()) inow: int = round(time.time())
if ( if (inow - times[-1]) > 60:
_time_step := (inow - times[-1])
>
timeframe * 2
):
await tractor.pause() await tractor.pause()
start_dt = from_timestamp(times[0]) start_dt = from_timestamp(times[0])

View File

@ -1115,7 +1115,6 @@ async def stream_quotes(
con: Contract = details.contract con: Contract = details.contract
first_ticker: Ticker|None = None first_ticker: Ticker|None = None
first_quote: dict[str, Any] = {}
timeout: float = 1.6 timeout: float = 1.6
with trio.move_on_after(timeout) as quote_cs: with trio.move_on_after(timeout) as quote_cs:
@ -1168,14 +1167,15 @@ async def stream_quotes(
first_quote, first_quote,
)) ))
# it's not really live but this will unblock
# the brokerd feed task to tell the ui to update?
feed_is_live.set()
# block and let data history backfill code run. # block and let data history backfill code run.
# XXX obvi given the venue is closed, we never expect feed # XXX obvi given the venue is closed, we never expect feed
# to come up; a taskc should be the only way to # to come up; a taskc should be the only way to
# terminate this task. # terminate this task.
await trio.sleep_forever() await trio.sleep_forever()
#
# ^^XXX^^TODO! INSTEAD impl a `trio.sleep()` for the
# duration until the venue opens!!
# ?TODO, we could instead spawn a task that waits on a feed # ?TODO, we could instead spawn a task that waits on a feed
# to start and let it wait indefinitely..instead of this # to start and let it wait indefinitely..instead of this
@ -1199,9 +1199,6 @@ async def stream_quotes(
'Rxed init quote:\n' 'Rxed init quote:\n'
f'{pformat(first_quote)}' f'{pformat(first_quote)}'
) )
# signal `.data.feed` layer that mkt quotes are LIVE
feed_is_live.set()
cs: trio.CancelScope|None = None cs: trio.CancelScope|None = None
startup: bool = True startup: bool = True
iter_quotes: trio.abc.Channel iter_quotes: trio.abc.Channel
@ -1255,6 +1252,7 @@ async def stream_quotes(
# tick. # tick.
ticker = await iter_quotes.receive() ticker = await iter_quotes.receive()
quote = normalize(ticker) quote = normalize(ticker)
feed_is_live.set()
fqme: str = quote['fqme'] fqme: str = quote['fqme']
await send_chan.send({fqme: quote}) await send_chan.send({fqme: quote})

View File

@ -80,14 +80,14 @@ class Sampler:
This non-instantiated type is meant to be a singleton within This non-instantiated type is meant to be a singleton within
a `samplerd` actor-service spawned once by the user wishing to a `samplerd` actor-service spawned once by the user wishing to
time-step-sample (real-time) quote feeds, see time-step-sample (real-time) quote feeds, see
`.service.maybe_open_samplerd()` and the below ``.service.maybe_open_samplerd()`` and the below
`register_with_sampler()`. ``register_with_sampler()``.
''' '''
service_nursery: None | trio.Nursery = None service_nursery: None | trio.Nursery = None
# TODO: we could stick these in a composed type to avoid angering # TODO: we could stick these in a composed type to avoid
# the "i hate module scoped variables crowd" (yawn). # angering the "i hate module scoped variables crowd" (yawn).
ohlcv_shms: dict[float, list[ShmArray]] = {} ohlcv_shms: dict[float, list[ShmArray]] = {}
# holds one-task-per-sample-period tasks which are spawned as-needed by # holds one-task-per-sample-period tasks which are spawned as-needed by
@ -335,7 +335,7 @@ async def register_with_sampler(
open_index_stream: bool = True, # open a 2way stream for sample step msgs? open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates? sub_for_broadcasts: bool = True, # sampler side to send step updates?
) -> set[int]: ) -> None:
get_console_log(tractor.current_actor().loglevel) get_console_log(tractor.current_actor().loglevel)
incr_was_started: bool = False incr_was_started: bool = False
@ -362,12 +362,7 @@ async def register_with_sampler(
# insert the base 1s period (for OHLC style sampling) into # insert the base 1s period (for OHLC style sampling) into
# the increment buffer set to update and shift every second. # the increment buffer set to update and shift every second.
if ( if shms_by_period is not None:
shms_by_period is not None
# and
# feed_is_live.is_set()
# ^TODO? pass it in instead?
):
from ._sharedmem import ( from ._sharedmem import (
attach_shm_array, attach_shm_array,
_Token, _Token,
@ -381,17 +376,12 @@ async def register_with_sampler(
readonly=False, readonly=False,
) )
shms_by_period[period] = shm shms_by_period[period] = shm
Sampler.ohlcv_shms.setdefault( Sampler.ohlcv_shms.setdefault(period, []).append(shm)
period,
[],
).append(shm)
assert Sampler.ohlcv_shms assert Sampler.ohlcv_shms
# unblock caller # unblock caller
await ctx.started( await ctx.started(set(Sampler.ohlcv_shms.keys()))
set(Sampler.ohlcv_shms.keys())
)
if open_index_stream: if open_index_stream:
try: try:
@ -543,8 +533,6 @@ async def open_sample_stream(
# yield bistream # yield bistream
# else: # else:
ctx: tractor.Context
shm_periods: set[int] # in `int`-seconds
async with ( async with (
# XXX: this should be singleton on a host, # XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be # a lone broker-daemon per provider should be
@ -559,10 +547,10 @@ async def open_sample_stream(
'open_index_stream': open_index_stream, 'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts, 'sub_for_broadcasts': sub_for_broadcasts,
}, },
) as (ctx, shm_periods) ) as (ctx, first)
): ):
if ensure_is_active: if ensure_is_active:
assert len(shm_periods) > 1 assert len(first) > 1
async with ( async with (
ctx.open_stream( ctx.open_stream(

View File

@ -447,13 +447,7 @@ def ldshm(
) )
# last chance manual overwrites in REPL # last chance manual overwrites in REPL
# await tractor.pause() # await tractor.pause()
if not aids: assert aids
log.warning(
f'No gaps were found !?\n'
f'fqme: {fqme!r}\n'
f'timeframe: {period_s!r}\n'
f"WELL THAT'S GOOD NOOZ!\n"
)
tf2aids[period_s] = aids tf2aids[period_s] = aids
else: else:

View File

@ -49,7 +49,6 @@ from pendulum import (
Duration, Duration,
duration as mk_duration, duration as mk_duration,
from_timestamp, from_timestamp,
timezone,
) )
import numpy as np import numpy as np
import polars as pl import polars as pl
@ -58,7 +57,9 @@ from piker.brokers import NoData
from piker.accounting import ( from piker.accounting import (
MktPair, MktPair,
) )
from piker.log import get_logger from piker.data._util import (
log,
)
from ..data._sharedmem import ( from ..data._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
ShmArray, ShmArray,
@ -96,9 +97,6 @@ if TYPE_CHECKING:
# from .feed import _FeedsBus # from .feed import _FeedsBus
log = get_logger()
# `ShmArray` buffer sizing configuration: # `ShmArray` buffer sizing configuration:
_mins_in_day = int(60 * 24) _mins_in_day = int(60 * 24)
# how much is probably dependent on lifestyle # how much is probably dependent on lifestyle
@ -249,11 +247,6 @@ async def maybe_fill_null_segments(
from_timestamp(array['time'][0]) from_timestamp(array['time'][0])
) < backfill_until_dt ) < backfill_until_dt
): ):
log.error(
f'Invalid frame_start !?\n'
f'frame_start_dt: {frame_start_dt!r}\n'
f'backfill_until_dt: {backfill_until_dt!r}\n'
)
await tractor.pause() await tractor.pause()
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance # XXX TODO: pretty sure if i plot tsla, btcusdt.binance
@ -403,9 +396,7 @@ async def start_backfill(
# based on the sample step size, maybe load a certain amount history # based on the sample step size, maybe load a certain amount history
update_start_on_prepend: bool = False update_start_on_prepend: bool = False
if ( if backfill_until_dt is None:
_until_was_none := (backfill_until_dt is None)
):
# TODO: per-provider default history-durations? # TODO: per-provider default history-durations?
# -[ ] inside the `open_history_client()` config allow # -[ ] inside the `open_history_client()` config allow
@ -439,8 +430,6 @@ async def start_backfill(
last_start_dt: datetime = backfill_from_dt last_start_dt: datetime = backfill_from_dt
next_prepend_index: int = backfill_from_shm_index next_prepend_index: int = backfill_from_shm_index
est = timezone('EST')
while last_start_dt > backfill_until_dt: while last_start_dt > backfill_until_dt:
log.info( log.info(
f'Requesting {timeframe}s frame:\n' f'Requesting {timeframe}s frame:\n'
@ -454,10 +443,9 @@ async def start_backfill(
next_end_dt, next_end_dt,
) = await get_hist( ) = await get_hist(
timeframe, timeframe,
end_dt=(end_dt_param := last_start_dt), end_dt=last_start_dt,
) )
except NoData as nodata: except NoData as _daterr:
_nodata = nodata
orig_last_start_dt: datetime = last_start_dt orig_last_start_dt: datetime = last_start_dt
gap_report: str = ( gap_report: str = (
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
@ -525,32 +513,8 @@ async def start_backfill(
== ==
next_start_dt.timestamp() next_start_dt.timestamp()
) )
assert (
(last_time := time[-1])
==
next_end_dt.timestamp()
)
frame_last_dt = from_timestamp(last_time) assert time[-1] == next_end_dt.timestamp()
if (
frame_last_dt.add(seconds=timeframe)
<
end_dt_param
):
est_frame_last_dt = est.convert(frame_last_dt)
est_end_dt_param = est.convert(end_dt_param)
log.warning(
f'Provider frame ending BEFORE requested end_dt={end_dt_param} ??\n'
f'frame_last_dt (EST): {est_frame_last_dt!r}\n'
f'end_dt_param (EST): {est_end_dt_param!r}\n'
f'\n'
f'Likely contains,\n'
f'- a venue closure.\n'
f'- (maybe?) missing data ?\n'
)
# ?TODO, check against venue closure hours
# if/when provided by backend?
await tractor.pause()
expected_dur: Interval = ( expected_dur: Interval = (
last_start_dt.subtract( last_start_dt.subtract(
@ -612,11 +576,10 @@ async def start_backfill(
'0 BARS TO PUSH after diff!?\n' '0 BARS TO PUSH after diff!?\n'
f'{next_start_dt} -> {last_start_dt}' f'{next_start_dt} -> {last_start_dt}'
) )
await tractor.pause()
# Check if we're about to exceed buffer capacity BEFORE # Check if we're about to exceed buffer capacity BEFORE
# attempting the push # attempting the push
if (next_prepend_index - ln) < 0: if next_prepend_index - ln < 0:
log.warning( log.warning(
f'Backfill would exceed buffer capacity!\n' f'Backfill would exceed buffer capacity!\n'
f'next_prepend_index: {next_prepend_index}\n' f'next_prepend_index: {next_prepend_index}\n'
@ -687,7 +650,7 @@ async def start_backfill(
}, },
}) })
# XXX, can't push the entire frame? so # can't push the entire frame? so
# push only the amount that can fit.. # push only the amount that can fit..
break break
@ -747,8 +710,8 @@ async def start_backfill(
) = dedupe(df) ) = dedupe(df)
if diff: if diff:
log.warning( log.warning(
f'Found {diff!r} duplicates in tsdb! ' f'Found {diff} duplicates in tsdb, '
f'=> Overwriting with `deduped` data !! <=\n' f'overwriting with deduped data\n'
) )
await storage.write_ohlcv( await storage.write_ohlcv(
col_sym_key, col_sym_key,
@ -1321,7 +1284,6 @@ async def manage_history(
some_data_ready: trio.Event, some_data_ready: trio.Event,
feed_is_live: trio.Event, feed_is_live: trio.Event,
timeframe: float = 60, # in seconds timeframe: float = 60, # in seconds
wait_for_live_timeout: float = 0.5,
task_status: TaskStatus[ task_status: TaskStatus[
tuple[ShmArray, ShmArray] tuple[ShmArray, ShmArray]
@ -1470,26 +1432,12 @@ async def manage_history(
1: rt_shm, 1: rt_shm,
60: hist_shm, 60: hist_shm,
} }
shms_by_period: dict|None = None
with trio.move_on_after(wait_for_live_timeout) as cs:
await feed_is_live.wait()
if cs.cancelled_caught:
log.warning(
f'No live feed within {wait_for_live_timeout!r}s\n'
f'fqme: {mkt.fqme!r}\n'
f'NOT activating shm-buffer-sampler!!\n'
)
if feed_is_live.is_set():
shms_by_period: dict[int, dict] = {
1.: rt_shm.token,
60.: hist_shm.token,
}
async with open_sample_stream( async with open_sample_stream(
period_s=1., period_s=1.,
shms_by_period=shms_by_period, shms_by_period={
1.: rt_shm.token,
60.: hist_shm.token,
},
# NOTE: we want to only open a stream for doing # NOTE: we want to only open a stream for doing
# broadcasts on backfill operations, not receive the # broadcasts on backfill operations, not receive the

View File

@ -27,15 +27,15 @@ import trio
from piker.ui.qt import ( from piker.ui.qt import (
QEvent, QEvent,
) )
from . import _chart
from . import _event
from . import _search
from ..accounting import unpack_fqme
from ..data._symcache import open_symcache
from ..data.feed import install_brokerd_search
from ..log import get_logger
from ..service import maybe_spawn_brokerd from ..service import maybe_spawn_brokerd
from . import _event
from ._exec import run_qtractor from ._exec import run_qtractor
from ..data.feed import install_brokerd_search
from ..data._symcache import open_symcache
from ..accounting import unpack_fqme
from . import _search
from ._chart import GodWidget
from ..log import get_logger
log = get_logger(__name__) log = get_logger(__name__)
@ -73,8 +73,8 @@ async def load_provider_search(
async def _async_main( async def _async_main(
# implicit required argument provided by `qtractor_run()` # implicit required argument provided by ``qtractor_run()``
main_widget: _chart.GodWidget, main_widget: GodWidget,
syms: list[str], syms: list[str],
brokers: dict[str, ModuleType], brokers: dict[str, ModuleType],
@ -87,9 +87,6 @@ async def _async_main(
Provision the "main" widget with initial symbol data and root nursery. Provision the "main" widget with initial symbol data and root nursery.
""" """
# set as singleton
_chart._godw = main_widget
from . import _display from . import _display
from ._pg_overrides import _do_overrides from ._pg_overrides import _do_overrides
_do_overrides() _do_overrides()
@ -204,6 +201,6 @@ def _main(
brokermods, brokermods,
piker_loglevel, piker_loglevel,
), ),
main_widget_type=_chart.GodWidget, main_widget_type=GodWidget,
tractor_kwargs=tractor_kwargs, tractor_kwargs=tractor_kwargs,
) )

View File

@ -82,25 +82,6 @@ if TYPE_CHECKING:
log = get_logger(__name__) log = get_logger(__name__)
_godw: GodWidget|None = None
def get_godw() -> GodWidget:
'''
Get the top level "god widget", the root/central-most Qt
widget-object set as `QMainWindow.setCentralWidget(_godw)`.
See `piker.ui._exec` for the runtime init details and all the
machinery for running `trio` on the Qt event loop in guest mode.
'''
if _godw is None:
raise RuntimeError(
'No god-widget initialized ??\n'
'Have you called `run_qtractor()` yet?\n'
)
return _godw
class GodWidget(QWidget): class GodWidget(QWidget):
''' '''
"Our lord and savior, the holy child of window-shua, there is no "Our lord and savior, the holy child of window-shua, there is no
@ -586,8 +567,8 @@ class LinkedSplits(QWidget):
# style? # style?
self.chart.setFrameStyle( self.chart.setFrameStyle(
QFrame.Shape.StyledPanel QFrame.Shape.StyledPanel |
|QFrame.Shadow.Plain QFrame.Shadow.Plain
) )
return self.chart return self.chart

View File

@ -27,6 +27,7 @@ import pyqtgraph as pg
from piker.ui.qt import ( from piker.ui.qt import (
QtWidgets, QtWidgets,
QGraphicsItem,
Qt, Qt,
QLineF, QLineF,
QRectF, QRectF,

View File

@ -169,10 +169,7 @@ class ArrowEditor(Struct):
f'{arrow!r}\n' f'{arrow!r}\n'
) )
for linked in self.godw.iter_linked(): for linked in self.godw.iter_linked():
if not (chart := linked.chart): linked.chart.plotItem.removeItem(arrow)
continue
chart.plotItem.removeItem(arrow)
try: try:
arrows.remove(arrow) arrows.remove(arrow)
except ValueError: except ValueError:

View File

@ -91,10 +91,6 @@ def run_qtractor(
window_type: QMainWindow = None, window_type: QMainWindow = None,
) -> None: ) -> None:
'''
Run the Qt event loop and embed `trio` via guest mode on it.
'''
# avoids annoying message when entering debugger from qt loop # avoids annoying message when entering debugger from qt loop
pyqtRemoveInputHook() pyqtRemoveInputHook()
@ -174,7 +170,7 @@ def run_qtractor(
# hook into app focus change events # hook into app focus change events
app.focusChanged.connect(window.on_focus_change) app.focusChanged.connect(window.on_focus_change)
instance: GodWidget = main_widget_type() instance = main_widget_type()
instance.window = window instance.window = window
# override tractor's defaults # override tractor's defaults

View File

@ -87,11 +87,7 @@ def update_fsp_chart(
# guard against unreadable case # guard against unreadable case
if not last_row: if not last_row:
log.warning( log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
f'Read-race on shm array,\n'
f'graphics_name: {graphics_name!r}\n'
f'shm.token: {shm.token}\n'
)
return return
# update graphics # update graphics
@ -207,6 +203,7 @@ async def open_fsp_actor_cluster(
async def run_fsp_ui( async def run_fsp_ui(
linkedsplits: LinkedSplits, linkedsplits: LinkedSplits,
flume: Flume, flume: Flume,
started: trio.Event, started: trio.Event,
@ -626,10 +623,8 @@ async def open_fsp_admin(
event.set() event.set()
# TODO, passing in `pikerd` related settings here!
# [ ] read in the `tractor` setting for `enable_transports: list`
# from the root `conf.toml`!
async def open_vlm_displays( async def open_vlm_displays(
linked: LinkedSplits, linked: LinkedSplits,
flume: Flume, flume: Flume,
dvlm: bool = True, dvlm: bool = True,
@ -639,12 +634,12 @@ async def open_vlm_displays(
) -> None: ) -> None:
''' '''
Vlm (volume) subchart displays. Volume subchart displays.
Since "volume" is often included directly alongside OHLCV price Since "volume" is often included directly alongside OHLCV price
data, we don't really need a separate FSP-actor + shm array for data, we don't really need a separate FSP-actor + shm array for it
it since it's likely already directly adjacent to OHLC samples since it's likely already directly adjacent to OHLC samples from the
from the data provider. data provider.
Further only if volume data is detected (it sometimes isn't provided Further only if volume data is detected (it sometimes isn't provided
eg. forex, certain commodities markets) will volume dependent FSPs eg. forex, certain commodities markets) will volume dependent FSPs