Compare commits
2 Commits
f417e8c170
...
fbb0fc6517
| Author | SHA1 | Date |
|---|---|---|
|
|
fbb0fc6517 | |
|
|
2ebb1e1019 |
|
|
@ -275,15 +275,9 @@ async def open_history_client(
|
|||
f'{times}'
|
||||
)
|
||||
|
||||
# XXX, debug any case where the latest 1m bar we get is
|
||||
# already another "sample's-step-old"..
|
||||
if end_dt is None:
|
||||
inow: int = round(time.time())
|
||||
if (
|
||||
_time_step := (inow - times[-1])
|
||||
>
|
||||
timeframe * 2
|
||||
):
|
||||
if (inow - times[-1]) > 60:
|
||||
await tractor.pause()
|
||||
|
||||
start_dt = from_timestamp(times[0])
|
||||
|
|
|
|||
|
|
@ -1115,7 +1115,6 @@ async def stream_quotes(
|
|||
|
||||
con: Contract = details.contract
|
||||
first_ticker: Ticker|None = None
|
||||
first_quote: dict[str, Any] = {}
|
||||
|
||||
timeout: float = 1.6
|
||||
with trio.move_on_after(timeout) as quote_cs:
|
||||
|
|
@ -1168,14 +1167,15 @@ async def stream_quotes(
|
|||
first_quote,
|
||||
))
|
||||
|
||||
# it's not really live but this will unblock
|
||||
# the brokerd feed task to tell the ui to update?
|
||||
feed_is_live.set()
|
||||
|
||||
# block and let data history backfill code run.
|
||||
# XXX obvi given the venue is closed, we never expect feed
|
||||
# to come up; a taskc should be the only way to
|
||||
# terminate this task.
|
||||
await trio.sleep_forever()
|
||||
#
|
||||
# ^^XXX^^TODO! INSTEAD impl a `trio.sleep()` for the
|
||||
# duration until the venue opens!!
|
||||
|
||||
# ?TODO, we could instead spawn a task that waits on a feed
|
||||
# to start and let it wait indefinitely..instead of this
|
||||
|
|
@ -1199,9 +1199,6 @@ async def stream_quotes(
|
|||
'Rxed init quote:\n'
|
||||
f'{pformat(first_quote)}'
|
||||
)
|
||||
# signal `.data.feed` layer that mkt quotes are LIVE
|
||||
feed_is_live.set()
|
||||
|
||||
cs: trio.CancelScope|None = None
|
||||
startup: bool = True
|
||||
iter_quotes: trio.abc.Channel
|
||||
|
|
@ -1255,6 +1252,7 @@ async def stream_quotes(
|
|||
# tick.
|
||||
ticker = await iter_quotes.receive()
|
||||
quote = normalize(ticker)
|
||||
feed_is_live.set()
|
||||
fqme: str = quote['fqme']
|
||||
await send_chan.send({fqme: quote})
|
||||
|
||||
|
|
|
|||
|
|
@ -80,14 +80,14 @@ class Sampler:
|
|||
This non-instantiated type is meant to be a singleton within
|
||||
a `samplerd` actor-service spawned once by the user wishing to
|
||||
time-step-sample (real-time) quote feeds, see
|
||||
`.service.maybe_open_samplerd()` and the below
|
||||
`register_with_sampler()`.
|
||||
``.service.maybe_open_samplerd()`` and the below
|
||||
``register_with_sampler()``.
|
||||
|
||||
'''
|
||||
service_nursery: None | trio.Nursery = None
|
||||
|
||||
# TODO: we could stick these in a composed type to avoid angering
|
||||
# the "i hate module scoped variables crowd" (yawn).
|
||||
# TODO: we could stick these in a composed type to avoid
|
||||
# angering the "i hate module scoped variables crowd" (yawn).
|
||||
ohlcv_shms: dict[float, list[ShmArray]] = {}
|
||||
|
||||
# holds one-task-per-sample-period tasks which are spawned as-needed by
|
||||
|
|
@ -335,7 +335,7 @@ async def register_with_sampler(
|
|||
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
|
||||
sub_for_broadcasts: bool = True, # sampler side to send step updates?
|
||||
|
||||
) -> set[int]:
|
||||
) -> None:
|
||||
|
||||
get_console_log(tractor.current_actor().loglevel)
|
||||
incr_was_started: bool = False
|
||||
|
|
@ -362,12 +362,7 @@ async def register_with_sampler(
|
|||
|
||||
# insert the base 1s period (for OHLC style sampling) into
|
||||
# the increment buffer set to update and shift every second.
|
||||
if (
|
||||
shms_by_period is not None
|
||||
# and
|
||||
# feed_is_live.is_set()
|
||||
# ^TODO? pass it in instead?
|
||||
):
|
||||
if shms_by_period is not None:
|
||||
from ._sharedmem import (
|
||||
attach_shm_array,
|
||||
_Token,
|
||||
|
|
@ -381,17 +376,12 @@ async def register_with_sampler(
|
|||
readonly=False,
|
||||
)
|
||||
shms_by_period[period] = shm
|
||||
Sampler.ohlcv_shms.setdefault(
|
||||
period,
|
||||
[],
|
||||
).append(shm)
|
||||
Sampler.ohlcv_shms.setdefault(period, []).append(shm)
|
||||
|
||||
assert Sampler.ohlcv_shms
|
||||
|
||||
# unblock caller
|
||||
await ctx.started(
|
||||
set(Sampler.ohlcv_shms.keys())
|
||||
)
|
||||
await ctx.started(set(Sampler.ohlcv_shms.keys()))
|
||||
|
||||
if open_index_stream:
|
||||
try:
|
||||
|
|
@ -543,8 +533,6 @@ async def open_sample_stream(
|
|||
# yield bistream
|
||||
# else:
|
||||
|
||||
ctx: tractor.Context
|
||||
shm_periods: set[int] # in `int`-seconds
|
||||
async with (
|
||||
# XXX: this should be singleton on a host,
|
||||
# a lone broker-daemon per provider should be
|
||||
|
|
@ -559,10 +547,10 @@ async def open_sample_stream(
|
|||
'open_index_stream': open_index_stream,
|
||||
'sub_for_broadcasts': sub_for_broadcasts,
|
||||
},
|
||||
) as (ctx, shm_periods)
|
||||
) as (ctx, first)
|
||||
):
|
||||
if ensure_is_active:
|
||||
assert len(shm_periods) > 1
|
||||
assert len(first) > 1
|
||||
|
||||
async with (
|
||||
ctx.open_stream(
|
||||
|
|
|
|||
|
|
@ -447,13 +447,7 @@ def ldshm(
|
|||
)
|
||||
# last chance manual overwrites in REPL
|
||||
# await tractor.pause()
|
||||
if not aids:
|
||||
log.warning(
|
||||
f'No gaps were found !?\n'
|
||||
f'fqme: {fqme!r}\n'
|
||||
f'timeframe: {period_s!r}\n'
|
||||
f"WELL THAT'S GOOD NOOZ!\n"
|
||||
)
|
||||
assert aids
|
||||
tf2aids[period_s] = aids
|
||||
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -49,7 +49,6 @@ from pendulum import (
|
|||
Duration,
|
||||
duration as mk_duration,
|
||||
from_timestamp,
|
||||
timezone,
|
||||
)
|
||||
import numpy as np
|
||||
import polars as pl
|
||||
|
|
@ -58,7 +57,9 @@ from piker.brokers import NoData
|
|||
from piker.accounting import (
|
||||
MktPair,
|
||||
)
|
||||
from piker.log import get_logger
|
||||
from piker.data._util import (
|
||||
log,
|
||||
)
|
||||
from ..data._sharedmem import (
|
||||
maybe_open_shm_array,
|
||||
ShmArray,
|
||||
|
|
@ -96,9 +97,6 @@ if TYPE_CHECKING:
|
|||
# from .feed import _FeedsBus
|
||||
|
||||
|
||||
log = get_logger()
|
||||
|
||||
|
||||
# `ShmArray` buffer sizing configuration:
|
||||
_mins_in_day = int(60 * 24)
|
||||
# how much is probably dependent on lifestyle
|
||||
|
|
@ -249,11 +247,6 @@ async def maybe_fill_null_segments(
|
|||
from_timestamp(array['time'][0])
|
||||
) < backfill_until_dt
|
||||
):
|
||||
log.error(
|
||||
f'Invalid frame_start !?\n'
|
||||
f'frame_start_dt: {frame_start_dt!r}\n'
|
||||
f'backfill_until_dt: {backfill_until_dt!r}\n'
|
||||
)
|
||||
await tractor.pause()
|
||||
|
||||
# XXX TODO: pretty sure if i plot tsla, btcusdt.binance
|
||||
|
|
@ -403,9 +396,7 @@ async def start_backfill(
|
|||
|
||||
# based on the sample step size, maybe load a certain amount history
|
||||
update_start_on_prepend: bool = False
|
||||
if (
|
||||
_until_was_none := (backfill_until_dt is None)
|
||||
):
|
||||
if backfill_until_dt is None:
|
||||
|
||||
# TODO: per-provider default history-durations?
|
||||
# -[ ] inside the `open_history_client()` config allow
|
||||
|
|
@ -439,8 +430,6 @@ async def start_backfill(
|
|||
last_start_dt: datetime = backfill_from_dt
|
||||
next_prepend_index: int = backfill_from_shm_index
|
||||
|
||||
est = timezone('EST')
|
||||
|
||||
while last_start_dt > backfill_until_dt:
|
||||
log.info(
|
||||
f'Requesting {timeframe}s frame:\n'
|
||||
|
|
@ -454,10 +443,9 @@ async def start_backfill(
|
|||
next_end_dt,
|
||||
) = await get_hist(
|
||||
timeframe,
|
||||
end_dt=(end_dt_param := last_start_dt),
|
||||
end_dt=last_start_dt,
|
||||
)
|
||||
except NoData as nodata:
|
||||
_nodata = nodata
|
||||
except NoData as _daterr:
|
||||
orig_last_start_dt: datetime = last_start_dt
|
||||
gap_report: str = (
|
||||
f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n'
|
||||
|
|
@ -525,32 +513,8 @@ async def start_backfill(
|
|||
==
|
||||
next_start_dt.timestamp()
|
||||
)
|
||||
assert (
|
||||
(last_time := time[-1])
|
||||
==
|
||||
next_end_dt.timestamp()
|
||||
)
|
||||
|
||||
frame_last_dt = from_timestamp(last_time)
|
||||
if (
|
||||
frame_last_dt.add(seconds=timeframe)
|
||||
<
|
||||
end_dt_param
|
||||
):
|
||||
est_frame_last_dt = est.convert(frame_last_dt)
|
||||
est_end_dt_param = est.convert(end_dt_param)
|
||||
log.warning(
|
||||
f'Provider frame ending BEFORE requested end_dt={end_dt_param} ??\n'
|
||||
f'frame_last_dt (EST): {est_frame_last_dt!r}\n'
|
||||
f'end_dt_param (EST): {est_end_dt_param!r}\n'
|
||||
f'\n'
|
||||
f'Likely contains,\n'
|
||||
f'- a venue closure.\n'
|
||||
f'- (maybe?) missing data ?\n'
|
||||
)
|
||||
# ?TODO, check against venue closure hours
|
||||
# if/when provided by backend?
|
||||
await tractor.pause()
|
||||
assert time[-1] == next_end_dt.timestamp()
|
||||
|
||||
expected_dur: Interval = (
|
||||
last_start_dt.subtract(
|
||||
|
|
@ -612,11 +576,10 @@ async def start_backfill(
|
|||
'0 BARS TO PUSH after diff!?\n'
|
||||
f'{next_start_dt} -> {last_start_dt}'
|
||||
)
|
||||
await tractor.pause()
|
||||
|
||||
# Check if we're about to exceed buffer capacity BEFORE
|
||||
# attempting the push
|
||||
if (next_prepend_index - ln) < 0:
|
||||
if next_prepend_index - ln < 0:
|
||||
log.warning(
|
||||
f'Backfill would exceed buffer capacity!\n'
|
||||
f'next_prepend_index: {next_prepend_index}\n'
|
||||
|
|
@ -687,7 +650,7 @@ async def start_backfill(
|
|||
},
|
||||
})
|
||||
|
||||
# XXX, can't push the entire frame? so
|
||||
# can't push the entire frame? so
|
||||
# push only the amount that can fit..
|
||||
break
|
||||
|
||||
|
|
@ -747,8 +710,8 @@ async def start_backfill(
|
|||
) = dedupe(df)
|
||||
if diff:
|
||||
log.warning(
|
||||
f'Found {diff!r} duplicates in tsdb! '
|
||||
f'=> Overwriting with `deduped` data !! <=\n'
|
||||
f'Found {diff} duplicates in tsdb, '
|
||||
f'overwriting with deduped data\n'
|
||||
)
|
||||
await storage.write_ohlcv(
|
||||
col_sym_key,
|
||||
|
|
@ -1321,7 +1284,6 @@ async def manage_history(
|
|||
some_data_ready: trio.Event,
|
||||
feed_is_live: trio.Event,
|
||||
timeframe: float = 60, # in seconds
|
||||
wait_for_live_timeout: float = 0.5,
|
||||
|
||||
task_status: TaskStatus[
|
||||
tuple[ShmArray, ShmArray]
|
||||
|
|
@ -1470,26 +1432,12 @@ async def manage_history(
|
|||
1: rt_shm,
|
||||
60: hist_shm,
|
||||
}
|
||||
|
||||
shms_by_period: dict|None = None
|
||||
with trio.move_on_after(wait_for_live_timeout) as cs:
|
||||
await feed_is_live.wait()
|
||||
|
||||
if cs.cancelled_caught:
|
||||
log.warning(
|
||||
f'No live feed within {wait_for_live_timeout!r}s\n'
|
||||
f'fqme: {mkt.fqme!r}\n'
|
||||
f'NOT activating shm-buffer-sampler!!\n'
|
||||
)
|
||||
|
||||
if feed_is_live.is_set():
|
||||
shms_by_period: dict[int, dict] = {
|
||||
1.: rt_shm.token,
|
||||
60.: hist_shm.token,
|
||||
}
|
||||
async with open_sample_stream(
|
||||
period_s=1.,
|
||||
shms_by_period=shms_by_period,
|
||||
shms_by_period={
|
||||
1.: rt_shm.token,
|
||||
60.: hist_shm.token,
|
||||
},
|
||||
|
||||
# NOTE: we want to only open a stream for doing
|
||||
# broadcasts on backfill operations, not receive the
|
||||
|
|
|
|||
|
|
@ -27,15 +27,15 @@ import trio
|
|||
from piker.ui.qt import (
|
||||
QEvent,
|
||||
)
|
||||
from . import _chart
|
||||
from . import _event
|
||||
from . import _search
|
||||
from ..accounting import unpack_fqme
|
||||
from ..data._symcache import open_symcache
|
||||
from ..data.feed import install_brokerd_search
|
||||
from ..log import get_logger
|
||||
from ..service import maybe_spawn_brokerd
|
||||
from . import _event
|
||||
from ._exec import run_qtractor
|
||||
from ..data.feed import install_brokerd_search
|
||||
from ..data._symcache import open_symcache
|
||||
from ..accounting import unpack_fqme
|
||||
from . import _search
|
||||
from ._chart import GodWidget
|
||||
from ..log import get_logger
|
||||
|
||||
log = get_logger(__name__)
|
||||
|
||||
|
|
@ -73,8 +73,8 @@ async def load_provider_search(
|
|||
|
||||
async def _async_main(
|
||||
|
||||
# implicit required argument provided by `qtractor_run()`
|
||||
main_widget: _chart.GodWidget,
|
||||
# implicit required argument provided by ``qtractor_run()``
|
||||
main_widget: GodWidget,
|
||||
|
||||
syms: list[str],
|
||||
brokers: dict[str, ModuleType],
|
||||
|
|
@ -87,9 +87,6 @@ async def _async_main(
|
|||
Provision the "main" widget with initial symbol data and root nursery.
|
||||
|
||||
"""
|
||||
# set as singleton
|
||||
_chart._godw = main_widget
|
||||
|
||||
from . import _display
|
||||
from ._pg_overrides import _do_overrides
|
||||
_do_overrides()
|
||||
|
|
@ -204,6 +201,6 @@ def _main(
|
|||
brokermods,
|
||||
piker_loglevel,
|
||||
),
|
||||
main_widget_type=_chart.GodWidget,
|
||||
main_widget_type=GodWidget,
|
||||
tractor_kwargs=tractor_kwargs,
|
||||
)
|
||||
|
|
|
|||
|
|
@ -82,25 +82,6 @@ if TYPE_CHECKING:
|
|||
log = get_logger(__name__)
|
||||
|
||||
|
||||
_godw: GodWidget|None = None
|
||||
|
||||
def get_godw() -> GodWidget:
|
||||
'''
|
||||
Get the top level "god widget", the root/central-most Qt
|
||||
widget-object set as `QMainWindow.setCentralWidget(_godw)`.
|
||||
|
||||
See `piker.ui._exec` for the runtime init details and all the
|
||||
machinery for running `trio` on the Qt event loop in guest mode.
|
||||
|
||||
'''
|
||||
if _godw is None:
|
||||
raise RuntimeError(
|
||||
'No god-widget initialized ??\n'
|
||||
'Have you called `run_qtractor()` yet?\n'
|
||||
)
|
||||
return _godw
|
||||
|
||||
|
||||
class GodWidget(QWidget):
|
||||
'''
|
||||
"Our lord and savior, the holy child of window-shua, there is no
|
||||
|
|
@ -586,8 +567,8 @@ class LinkedSplits(QWidget):
|
|||
|
||||
# style?
|
||||
self.chart.setFrameStyle(
|
||||
QFrame.Shape.StyledPanel
|
||||
|QFrame.Shadow.Plain
|
||||
QFrame.Shape.StyledPanel |
|
||||
QFrame.Shadow.Plain
|
||||
)
|
||||
|
||||
return self.chart
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ import pyqtgraph as pg
|
|||
|
||||
from piker.ui.qt import (
|
||||
QtWidgets,
|
||||
QGraphicsItem,
|
||||
Qt,
|
||||
QLineF,
|
||||
QRectF,
|
||||
|
|
|
|||
|
|
@ -169,10 +169,7 @@ class ArrowEditor(Struct):
|
|||
f'{arrow!r}\n'
|
||||
)
|
||||
for linked in self.godw.iter_linked():
|
||||
if not (chart := linked.chart):
|
||||
continue
|
||||
|
||||
chart.plotItem.removeItem(arrow)
|
||||
linked.chart.plotItem.removeItem(arrow)
|
||||
try:
|
||||
arrows.remove(arrow)
|
||||
except ValueError:
|
||||
|
|
|
|||
|
|
@ -91,10 +91,6 @@ def run_qtractor(
|
|||
window_type: QMainWindow = None,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Run the Qt event loop and embed `trio` via guest mode on it.
|
||||
|
||||
'''
|
||||
# avoids annoying message when entering debugger from qt loop
|
||||
pyqtRemoveInputHook()
|
||||
|
||||
|
|
@ -174,7 +170,7 @@ def run_qtractor(
|
|||
# hook into app focus change events
|
||||
app.focusChanged.connect(window.on_focus_change)
|
||||
|
||||
instance: GodWidget = main_widget_type()
|
||||
instance = main_widget_type()
|
||||
instance.window = window
|
||||
|
||||
# override tractor's defaults
|
||||
|
|
|
|||
|
|
@ -87,11 +87,7 @@ def update_fsp_chart(
|
|||
|
||||
# guard against unreadable case
|
||||
if not last_row:
|
||||
log.warning(
|
||||
f'Read-race on shm array,\n'
|
||||
f'graphics_name: {graphics_name!r}\n'
|
||||
f'shm.token: {shm.token}\n'
|
||||
)
|
||||
log.warning(f'Read-race on shm array: {graphics_name}@{shm.token}')
|
||||
return
|
||||
|
||||
# update graphics
|
||||
|
|
@ -207,6 +203,7 @@ async def open_fsp_actor_cluster(
|
|||
|
||||
|
||||
async def run_fsp_ui(
|
||||
|
||||
linkedsplits: LinkedSplits,
|
||||
flume: Flume,
|
||||
started: trio.Event,
|
||||
|
|
@ -626,10 +623,8 @@ async def open_fsp_admin(
|
|||
event.set()
|
||||
|
||||
|
||||
# TODO, passing in `pikerd` related settings here!
|
||||
# [ ] read in the `tractor` setting for `enable_transports: list`
|
||||
# from the root `conf.toml`!
|
||||
async def open_vlm_displays(
|
||||
|
||||
linked: LinkedSplits,
|
||||
flume: Flume,
|
||||
dvlm: bool = True,
|
||||
|
|
@ -639,12 +634,12 @@ async def open_vlm_displays(
|
|||
|
||||
) -> None:
|
||||
'''
|
||||
Vlm (volume) subchart displays.
|
||||
Volume subchart displays.
|
||||
|
||||
Since "volume" is often included directly alongside OHLCV price
|
||||
data, we don't really need a separate FSP-actor + shm array for
|
||||
it since it's likely already directly adjacent to OHLC samples
|
||||
from the data provider.
|
||||
data, we don't really need a separate FSP-actor + shm array for it
|
||||
since it's likely already directly adjacent to OHLC samples from the
|
||||
data provider.
|
||||
|
||||
Further only if volume data is detected (it sometimes isn't provided
|
||||
eg. forex, certain commodities markets) will volume dependent FSPs
|
||||
|
|
|
|||
Loading…
Reference in New Issue