Compare commits

..

55 Commits

Author SHA1 Message Date
Gud Boi 341a584cea Add holiday-gap detection via `exchange_calendars`
Integrate `exchange_calendars` lib to detect market holidays in
gap-checking logic via new `.ib.venues.has_holiday()` helper!

The `.ib.venues` impl deats,
- add  a new `has_holiday()` using `xcals.get_calendar()` and friends
  for sanity checking a venue's holiday closure-gaps.
  * final holiday detection-check is basically,
   `(cash_gap := (next_open - prev_close)) > period`
- include `time_step_s` param to `is_venue_closure()` for boundary
  tolerance checks.
  * let's us expand closure-time checks to include `+/-time_step_s`
    "off-by-one-`timeframe`-sample" edge case ranges.
- add real docstring to `has_weekend()`.

In `.ib.api` refine usage for ^ changes,
- move `is_venue_open()` call + tz-convert outside gap check
- use a walrus to capture `has_closure_gap` from `is_venue_closure()`
- add a `not has_closure_gap` condition to the
  mismatched-duration/short-frame warning block to avoid needless warns.
- keep duration-based "short-frame" log as `.error()` but toss in a bp
  so (somone can) umask to figure out wtf is going on..
  * we should **never** really hit this path unless there's a valid bug
    or data issue with IB/GFIS!
  * keep recursion path masked-out just leave a `breakpoint()` for now.

Also some logger updates,
- import `get_logger()` from top-level `piker.log` vs `.ib._util` which
  was always kinda wrong..
- change `NonShittyIB._logger` to use `__name__` vs literal.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 18:21:17 -05:00
Gud Boi eb516c4c33 Add `exchange_calendar` dep for venue-closure gap checkin 2026-02-22 18:21:17 -05:00
Gud Boi c65d481916 Adjust type annots in binance and IB symbol mods
Namely, again switching `|`-union syntax to rm adjacent white space.

Also, flip to multiline style for threshold comparison in
`.binance.feed` and change gap-check threshold to `timeframe` (vs
a hardcoded `60`s) in the `get_ohlc()` closure.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 18:21:17 -05:00
Gud Boi 2f4ae3be4d Use `.ib.venues.is_venue_open()` in `.feed`
In `.ib.feed.stream_quotes()` specifically that is since time-range
checking code was moved to the new sub-mod.

Deats,
- drop import of old `is_current_time_in_range()` from `._util`
- change `get_bars()` sig: `end_dt`/`start_dt` to `datetime|None`
- comment-out `breakpoint()` in `open_history_client()`

Styling,
- add multiline style to conditionals and tuple unpacks
- fix type annotation: `Contract|None` vs `Contract | None`
- fix backticks in comment: `ib_insync` vs `ib_async`

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 18:21:17 -05:00
Gud Boi 8874ff1294 Add venue-closure gap-detection in `.ib.api.Client.bars()`
With all detection logic coming from our new `.ib.venues` helpers
allowing use to verify IB's OHLC bars frames don't contain unexpected
time-gaps.

`Client.bars()` new checking deats,
- add `is_venue_open()`, `has_weekend()`, `sesh_times()`, and
  `is_venue_closure()` checks when `last_dt < end_dt`
- always calc gap-period in local tz via `ContractDetails.timeZoneId`.
- log warnings on invalid non-closure gaps, debug on closures for now.
- change recursion case to just `log.error()` + `breakpoint()`; we might end
  up tossing it since i don't think i could ever get it to be reliable..
  * mask-out recursive `.bars()` call (likely unnecessary).
- flip `start_dt`/`end_dt` param defaults to `None` vs epoch `str`.
- update docstring to clarify no `start_dt` support by IB
- add mod level `_iso8601_epoch_in_est` const to keep track of orig
  param default value.
- add multiline style to return type-annot, type all `pendulum` objects.

Also,
- uppercase `Crypto.symbol` for PAXOS contracts in `.find_contracts()`,
  tho now we're getting a weird new API error i left in a todo-comment..

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 18:21:17 -05:00
Gud Boi a009bdf2bb Mv `parse_trading_hours()` from `._util` to `.venues`
It was an AI-model draft that we can prolly toss but figured might as
well org it appropriately.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 18:21:17 -05:00
Gud Boi 39cf1edd36 Add `.ib.venues` for mkt-venue-closure checkers
Introduce set of helper-fns for detecting venue open/close status,
session start/end times, and related time-gap detection using
`pendulum`.

Deats,
- add `iter_sessions()` to yield `pendulum.Interval`s from
  a `ContractDetails` instance.
- add `is_venue_open()` to check if active at a given time.
- add `is_venue_closure()` to detect valid closure gaps.
- add `sesh_times()` to extract weekday-agnostic open/close times.
- add `has_weekend()` to check for Sat/Sun in interval.
- move in lowlevel `is_current_time_in_range()` for checking a
  datetime within a `sesh: pendulum.Interval`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-22 18:21:17 -05:00
Gud Boi e937b60ed6 Flip `.tsp._history` logger to explicit mod-name (again) 2026-02-22 18:16:58 -05:00
Gud Boi ed9c211b96 Adjust binance stale-bar detection to 2x tolerance
Change the stale-bar check in `.binance.feed` from `timeframe` to
`timeframe * 2` tolerance to avoid false-positive pauses when bars
are slightly delayed but still within acceptable bounds.

Styling,
- add walrus operator to capture `_time_step` for debugger
  inspection.
- add comment explaining the debug purpose of this check.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-20 16:36:44 -05:00
Gud Boi f1b27e9696 Replace assert with warn for no-gaps in `.storage.cli`
Change `assert aids` to a warning log when no history gaps are found
during `ldshm` gap detection; it is the **ideal case** OBVI. This avoids
crashing the CLI when gap detection finds no issues, which is actually
good news!

Bp

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-20 16:33:53 -05:00
Gud Boi 959d04024b .tsp._history: add gap detection in backfill loop
Add frame-gap detection when `frame_last_dt < end_dt_param` to
warn about potential venue closures or missing data during the
backfill loop in `start_backfill()`.

Deats,
- add `frame_last_dt < end_dt_param` check after frame recv
- log warnings with EST-converted timestamps for clarity
- add `await tractor.pause()` for REPL-investigation on gaps
- add TODO comment about venue closure hour checking
- capture `_until_was_none` walrus var for null-check clarity
- add `last_time` assertion for `time[-1] == next_end_dt`
- rename `_daterr` to `nodata` with `_nodata` capture

Also,
- import `pendulum.timezone` and create `est` tz instance
- change `get_logger()` import from `.data._util` to `.log`
- add parens around `(next_prepend_index - ln) < 0` check

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-20 16:32:34 -05:00
Gud Boi 6f8a361e80 Cleanups and doc tweaks to `.ui._fsp`
Expand read-race warning log for clarity, add TODO for reading
`tractor` transport config from `conf.toml`, and reflow docstring
in `open_vlm_displays()`.

Also,
- whitespace cleanup: `Type | None` -> `Type|None`
- clarify "Volume" -> "Vlm (volume)" in docstr

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-02-01 19:28:14 -05:00
Gud Boi 2d678e1582 Guard against `None` chart in `ArrowEditor.remove()`
Add null check for `linked.chart` before calling
`.plotItem.removeItem()` to prevent `AttributeError` when chart
is `None`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-30 19:21:28 -05:00
Gud Boi 48493e50b0 .ib.feed: only set `feed_is_live` after first quote
Move `feed_is_live.set()` to after receiving the first valid
quote instead of setting early on venue-closed path. Prevents
sampler registration when no live data expected.

Also,
- drop redundant `.set()` call in quote iteration loop
- add TODO note about sleeping until venue opens vs forever
- init `first_quote: dict` early for consistency

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-30 18:50:26 -05:00
Gud Boi f73b981173 Only register shms w sampler when `feed_is_live`
Add timeout-gated wait for `feed_is_live: trio.Event` before passing shm
tokens to `open_sample_stream()`; skip registering shm-buffers with the
sampler if the feed doesn't "go live" within a new timeout.

The main motivation here is to avoid the sampler incrementing shm-array
bufs when the mkt-venue is closed so that a trailing "same price"
line/bars isn't updated/rendered in the chart's view when unnecessary.

Deats,
- add `wait_for_live_timeout: float = 0.5` param to `manage_history()`
- warn-log the fqme when timeout triggers
- add error log for invalid `frame_start_dt` comparisons to
  `maybe_fill_null_segments()`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-30 18:40:48 -05:00
Gud Boi d5edd3484f Clarify `register_with_sampler()` started type and vars
Markup `ctx.started()` type-sig as `set[int]`, rename binding var
`first` to `shm_periods` and add type hints for clarity on context mgr
unpacking.

Also,
- whitespace cleanup: `Type | None` -> `Type|None` throughout
- format long lines: `.setdefault()`, `await ctx.started()`
- fix backtick style in docstrings for consistency
- add placeholder TODO comment for `feed_is_live` check; it might be
  more rigorous to pass the syncing state down thru all this?

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-30 18:38:45 -05:00
Gud Boi bac8317a4a Add `get_godw()` singleton getter for `GodWidget`
Expose `get_godw()` helper to retrieve the central `GodWidget`
instance from anywhere in the UI code. Set the singleton in
`_async_main()` on startup.

Also,
- add docstring to `run_qtractor()` explaining trio guest mode
- type annotate `instance: GodWidget` in `run_qtractor()`
- import reorg in `._app` for cleaner grouping
- whitespace cleanup: `Type | None` -> `Type|None` throughout
- fix bitwise-or alignment: `Flag | Other` -> `Flag|Other`

(this commit-msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-30 15:39:25 -05:00
Gud Boi eb78437994 Rm unused import in `.ui._curve` 2026-01-30 14:58:41 -05:00
Gud Boi 88353ffef8 Ignore single-zero-sample trace on no runtime.. 2026-01-30 14:53:00 -05:00
Gud Boi ec4e6ec742 ib.feed: drop legacy "quote-with-vlm" polling
Since now we explicitly check each mkt's venue hours now we don't need
this mega hacky "waiting on a quote with real vlm" stuff to determing
whether historical data should be loaded immediately. This approach also
had the added complexity that we needed to handle edge cases for tickers
(like xauusd.cmdty) which never have vlm.. so it's nice to be rid of it
all ;p
2026-01-30 14:47:11 -05:00
Gud Boi 205058de21 Always overwrite tsdb duplicates found during backfill
Enable the previously commented-out dedupe-and-write logic in
`start_backfill()` to ensure tsdb stays clean of duplicate
entries.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-30 14:46:23 -05:00
Gud Boi f11ab5f0aa For claude, ignore no runtime for offline shm reading 2026-01-29 02:49:25 -05:00
Gud Boi 8718ad4874 .ib._util: ignore attr err on click-hack twm wakeups? 2026-01-29 02:48:41 -05:00
Gud Boi 3a515afccd Use `get_fonts()`, add `show_txt` flag to gap annots
Switch `.tsp._annotate.markup_gaps()` to use new
`.ui._style.get_fonts()` API for font size calc on client side and add
optional `show_txt: bool` flag to toggle gap duration labels (with
default `False`).

Also,
- replace `sgn` checks with named bools: `up_gap`, `down_gap`
- use `small_font.px_size - 1` for gap label font sizing
- wrap text creation in `if show_txt:` block
- update IPC handler to use `get_fonts()` vs direct `_font` import

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-28 16:30:41 -05:00
Gud Boi 88732a67d5 Add `get_fonts()` API and fix `.px_size` for non-Qt ctxs
Add a public `.ui._style.get_fonts()` helper to retrieve the
`_font[_small]: DpiAwareFont` singleton pair. Adjust
`DpiAwareFont.px_size` to return `conf.toml` value when Qt returns `-1`
(no active Qt app).

Also,
- raise `ValueError` with detailed msg if both Qt and a conf-lookup fail
- add some more type union whitespace cleanups: `int | None` -> `int|None`

(this commit-msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-28 15:34:57 -05:00
Gud Boi 858cfce958 Relay annot creation failures with err-dict resps
Change annot-ctl APIs to return `None` on failure instead of invalid
`aid`s. Server now sends `{'error': msg}` dict on failures, client
match-blocks handle gracefully.

Also,
- update return types: `.add_rect()`, `.add_arrow()`, `.add_text()`
  now return `int|None`
- match on `{'error': str(msg)}` in client IPC receive blocks
- send error dicts from server on timestamp lookup failures
- add failure handling in `markup_gaps()` to skip bad rects

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-28 14:43:52 -05:00
Gud Boi 51d109f7e7 Do time-based shm-index lookup for annots on server
Fix annotation misalignment during backfill by switching from
client-computed indices to server-side timestamp lookups against
current shm state. Store absolute coords on annotations and
reposition on viz redraws.

Lowlevel impl deats,
- add `time` param to `.add_arrow()`, `.add_text()`, `.add_rect()`
- lookup indices from shm via timestamp matching in IPC handlers
- force chart redraw before `markup_gaps()` annotation creation
- wrap IPC send/receive in `trio.fail_after(3)` for timeout when
  server fails to respond, likely hangs on no-case-match/error.
- cache `_meth`/`_kwargs` on rects, `_abs_x`/`_abs_y` on arrows
- auto-reposition all annotations after viz reset in redraw cmd

Also,
- handle `KeyError` for missing timeframes in chart lookup
- return `-1` aid on annotation creation failures (lol oh `claude`..)
- reconstruct rect positions from timestamps + BGM offset logic
- log repositioned annotation counts on viz redraw

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-28 12:48:26 -05:00
Gud Boi 76f199df3b Add buffer capacity checks to backfill loop
Prevent `ValueError` from negative prepend index in
`start_backfill()` by checking buffer space before push
attempts. Truncate incoming frame if needed and stop gracefully
when buffer full.

Also,
- add pre-push capacity check with frame truncation logic
- stop backfill when `next_prepend_index <= 0`
- log warnings for capacity exceeded and buffer-full conditions

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-27 23:52:00 -05:00
Gud Boi 4e3cd7f986 Drop decimal points for whole-number durations
Adjust `humanize_duration()` to show "3h" instead of "3.0h" when the
duration value is a whole number, making labels cleaner.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-27 21:09:49 -05:00
Gud Boi 1fb0fe3a04 Add `font_size` param to `AnnotCtl.add_text()` API
Expose font sizing control for `pg.TextItem` annotations thru the
annot-ctl API. Default to `_font.font.pixelSize() - 3` when no
size provided.

Also,
- thread `font_size` param thru IPC handler in `serve_rc_annots()`
- apply font via `QFont.setPixelSize()` on text item creation
- add `?TODO` note in `markup_gaps()` re using `conf.toml` value
- update `add_text()` docstring with font_size param desc

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-27 20:53:10 -05:00
Gud Boi de5b1737b4 Add humanized duration labels to gap annotations
Introduce `humanize_duration()` helper in `.tsp._annotate` to
convert seconds to short human-readable format (d/h/m/s). Extend
annot-ctl API with `add_text()` method for placing `pg.TextItem`
labels on charts.

Also,
- add duration labels on RHS of gap arrows in `markup_gaps()`
- handle text item removal in `rm_annot()` match block
- expose `TextItem` cmd in `serve_rc_annots()` IPC handler
- use `hcolor()` for named-to-hex color conversion
- set anchor positioning for up vs down gaps

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-27 20:51:21 -05:00
Gud Boi 1776242413 .ib.feed: trim bars frame to `start_dt` 2026-01-27 17:37:25 -05:00
Gud Boi 848c8ae533 ib._util: ignore timeout-errs when crash-handling `pyvnc` connects 2026-01-27 17:36:33 -05:00
Gud Boi fdea8556d7 Lul, woops compare against first-dt in `.ib.feed` bars frame.. 2026-01-27 16:21:19 -05:00
Gud Boi be28d083e4 Expose more `pg.ArrowItem` params thru annot-ctl API 2026-01-27 16:20:23 -05:00
Gud Boi 8701b517e7 Add `pexpect`, `xonsh`@github:main to deps
The former bc `claude` needs it for its new "offline" REPL simulator
script `snippets/claude_debug_helper.py` and pin to `xonsh` git mainline
to get the fancy new next cmd/suggestion prompt feats (which @goodboy is
using from `modden` already). Bump lock file to match.

Ah right, and for now while hackin pin to a local `tractor` Bp
2026-01-27 14:16:51 -05:00
Gud Boi f39b362bc4 Add a couple cooler "cooler"/"muted" red and greens 2026-01-27 13:34:52 -05:00
Gud Boi d2e1d6ce91 Add break for single bar null segments 2026-01-27 13:33:46 -05:00
Gud Boi d0966e0363 Space gap rect-annots "between" start-end bars 2026-01-27 13:33:13 -05:00
Gud Boi 4081336bd3 Catch too-early ib hist frames
For now by REPLing them and raising an RTE inside `.ib.feed` as well as
tracing any such cases that make it (from other providers) up to the
`.tsp._history` layer during null-segment backfilling.
2026-01-27 13:17:28 -05:00
Gud Boi ff502b62bf .ui.order_mode: multiline import styling 2026-01-25 22:19:39 -05:00
Gud Boi e77bec203d Add arrow indicators to time gaps
Such that they're easier to spot when zoomed out, a similar color to the
`RectItem`s and also remote-controlled via the `AnnotCtl` api.

Deats,
- request an arrow per gap from `markup_gaps()` using a new
  `.add_arrow()` meth, set the color, direction and alpha with
  position always as the `iend`/close of the last valid bar.
- extend the `.ui._remote_ctl` subys to support the above,
  * add a new `AnnotCtl.add_arrow()`.
  * add the service-side IPC endpoint for a 'cmd': 'ArrowEditor'.
- add a new `rm_annot()` helper to ensure the right graphics removal
  API is used by annotation type:
  * `pg.ArrowItem` looks up the `ArrowEditor` and calls `.remove(annot).
  * `pg.SelectRect` keeps with calling `.delete()`.
- global-ize an `_editors` table to enable the prior.
- add an explicit RTE for races on the chart-actor's `_dss` init.
2026-01-25 22:06:59 -05:00
Gud Boi 809ec6accb Arrow editor refinements in prep for gap checker
Namely exposing `ArrowEditor.add()` params to provide access to
coloring/transparency settings over the remote-ctl annotation API and
also adding a new `.remove_all()` to easily clear all arrows from
a single call. Also add `.remove()` compat methods to the other editors
(i.e. for lines, rects).
2026-01-25 14:14:42 -05:00
Gud Boi ad299789db Mv `markup_gaps()` to new `.tsp._annotate` mod 2026-01-21 23:52:12 -05:00
Gud Boi cd6bc105de Enable tracing back insert backfills
Namely insertion writes which over-fill the shm buffer past the latest
tsdb sample via `.tsp._history.shm_push_in_between()`.

Deats,
- check earliest `to_push` timestamp and enter pause point if it's
  earlier then the tsdb's `backfill_until_dt` stamp.
- requires actually passing the `backfill_until_dt: datetime` thru,
  * `get_null_segs()`
  * `maybe_fill_null_segments()`
  * `shm_push_in_between()` (obvi XD)
2026-01-21 22:38:42 -05:00
Gud Boi a8e4e1b2c5 Tolerate various "bad data" cases in `markup_gaps()`
Namely such that when the previous-df-row by our shm-abs-'index' doesn't
exist we ignore certain cases which are likely due to borked-but-benign
samples written to the tsdb or rt shm buffers prior.

Particularly we now ignore,
- any `dt`/`prev_dt` values which are UNIX-epoch timestamped (val of 0).
- any row-is-first-row in the df; there is no previous.
- any missing previous datum by 'index', in which case we lookup the
  `wdts` prior row and use that instead.
  * this would indicate a missing sample for the time-step but we can
    still detect a "gap" by looking at the prior row, by df-abs-index
    `i`, and use its timestamp to determine the period/size of missing
    samples (which need to likely still be retrieved).
  * in this case i'm leaving in a pause-point for introspecting these
    rarer cases when `--pdb` is passed via CLI.

Relatedly in the `piker store` CLI ep,
- add `--pdb` flag to `piker store`, pass it verbatim as `debug_mode`.
- when `times` has only a single row, don't calc a `period_s` median.
- only trace `null_segs` when in debug mode.
- always markup/dedupe gaps for `period_s==60`
2026-01-21 22:20:43 -05:00
Gud Boi caf2cc5a5b ib: up API timeout default for remote host conns 2026-01-21 22:20:43 -05:00
Gud Boi d4b46e0eda Fix `Qt6` types for new sub-namespaces 2026-01-21 22:20:43 -05:00
Gud Boi a1048c847b Add vlm-based "smart" OHLCV de-duping & bar validation
Using `claude`, add a `.tsp._dedupe_smart` module that attemps "smarter"
duplicate bars by attempting to distinguish between erroneous bars
partially written during concurrent backfill race conditions vs.
**actual** data quality issues from historical providers.

Problem:
--------
Concurrent writes (live updates vs. backfilling) can result in create
duplicate timestamped ohlcv vars with different values. Some
potential scenarios include,

- a market live feed is cancelled during live update resulting in the
  "last" datum being partially updated with all the ticks for the
  time step.
- when the feed is rebooted during charting, the backfiller will not
  finalize this bar since rn it presumes it should only fill data for
  time steps not already in the tsdb storage.

Our current naive  `.unique()` approach obvi keeps the incomplete bar
and a "smarter" approach is to compare the provider's final vlm
amount vs. the maybe-cancelled tsdb's bar; a higher vlm value from
the provider likely indicates the cancelled-during-live-write and
**not** a datum discrepancy from said data provider.

Analysis (with `claude`) of `zecusdt` data revealed:
- 1000 duplicate timestamps
- 999 identical bars (pure duplicates from 2022 backfill overlap)
- 1 volume-monotonic conflict (live partial vs backfill complete)

A soln from `claude` -> `tsp._dedupe_smart.dedupe_ohlcv_smart()`
which:
- sorts by vlm **before** deduplication and keep the most complete
  bar based on vlm monotonicity as well as the following OHLCV
  validation assumptions:
  * volume should always increase
  * high should be non-decreasing,
  * low should be non-increasing
  * open should be identical
- Separates valid race conditions from provider data quality issues
  and reports and returns both dfs.

Change summary by `claude`:
- `.tsp._dedupe_smart`: new module with validation logic
- `.tsp.__init__`: expose `dedupe_ohlcv_smart()`
- `.storage.cli`: integrate smart dedupe, add logging for:
  * duplicate counts (identical vs monotonic races)
  * data quality violations (non-monotonic, invalid OHLC ranges)
  * warnings for provider data issues
- Remove `assert not diff` (duplicates are valid now)

Verified on `zecusdt`: correctly keeps index 3143645
(volume=287.777) over 3143644 (volume=140.299) for
conflicting 2026-01-16 18:54 UTC bar.

`claude`'s Summary of reasoning
-------------------------------
- volume monotonicity is critical: a bar's volume only increases
  during its time window.
- a backfilled bar should always have volume >= live updated.
- violations indicate any of:
  * Provider data corruption
  * Non-OHLCV aggregation semantics
  * Timestamp misalignment

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-21 22:20:43 -05:00
Gud Boi 192fe0dc73 Add `pexpect`-based `pdbp`-REPL offline helper
Add a new `snippets/claude_debug_helper.py` to
provide a programmatic interface to `tractor.pause()` debugger
sessions for incremental data inspection matching the interactive UX
but able to be run by `claude` "offline" since it can't seem to feed
stdin (so it claims) to the `pdb` instance due to lack of ability to
allocate a tty internally.

The script-wrapper is based on `tractor`'s `tests/devx/` suite's use of
`pexpect` patterns for driving `pdbp` prompts and thus enables
automated-offline execution of REPL-inspection commands **without**
using incremental-realtime output capture (like a human would use it).

Features:
- `run_pdb_commands()`: batch command execution
- `InteractivePdbSession`: context manager for step-by-step REPL interaction
- `expect()` wrapper: timeout handling with buffer display
- Proper stdin/stdout handling via `pexpect.spawn()`

Example usage:
```python
from debug_helper import InteractivePdbSession

with InteractivePdbSession(
    cmd='piker store ldshm zecusdt.usdtm.perp.binance'
) as session:
    session.run('deduped.shape')
    session.run('step_gaps.shape')
```

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-21 22:20:43 -05:00
Gud Boi 4bfdd388bb Fix polars 1.36.0 duration API
Polars tightened type safety for `.dt` accessor methods requiring
`total_*` methods for duration types vs datetime component accessors
like `day()` which now only work on datetime dtypes.

`detect_time_gaps()` in `.tsp._anal` was calling `.dt.day()`
on `dt_diff` column (a duration from `.diff()`) which throws
`InvalidOperationError` on modern polars.

Changes:
- use f-string to add pluralization to map time unit strings to
  `total_<unit>s` form for the new duration API.
- Handle singular/plural forms: 'day' -> 'days' -> 'total_days'
- Ensure trailing 's' before applying 'total_' prefix

Also updates inline comments explaining the polars type distinction
between datetime components vs duration totals.

Fixes `piker store ldshm` crashes on datasets with time gaps.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-01-21 22:20:43 -05:00
Tyler Goodlet 534b13f755 `.storage.__init__`: code styling updates 2026-01-21 22:20:43 -05:00
Tyler Goodlet 108646fdfb `.tsp._history`: drop `feed_is_live` syncing, another seg flag
The `await feed_is_live.wait()` is more or less pointless and would only
cause slower startup afaig (as-far-as-i-grok) so i'm masking it here.
This also removes the final `strict_exception_groups=False` use from the
non-tests code base, flipping to the `tractor.trionics` collapser once
and for all!
2026-01-21 22:20:43 -05:00
Tyler Goodlet d6d4fec666 Woops, keep `np2pl` exposed from `.tsp` 2026-01-21 22:20:43 -05:00
Tyler Goodlet 14ac351a65 Factor to a new `.tsp._history` sub-mod
Cleaning out the `piker.tsp` pkg-mod to be only the (re)exports needed
for `._anal`/`._history` refs-use elsewhere!
2026-01-21 22:20:43 -05:00
56 changed files with 861 additions and 1169 deletions

View File

@ -19,10 +19,8 @@
for tendiez.
'''
from piker.log import (
get_console_log,
get_logger,
)
from ..log import get_logger
from .calc import (
iter_by_dt,
)
@ -53,17 +51,7 @@ from ._allocate import (
log = get_logger(__name__)
# ?TODO, enable console on import
# [ ] necessary? or `open_brokerd_dialog()` doing it is sufficient?
#
# bc might as well enable whenev imported by
# other sub-sys code (namely `.clearing`).
get_console_log(
level='warning',
name=__name__,
)
# TODO, the `as <samename>` style?
__all__ = [
'Account',
'Allocator',

View File

@ -60,16 +60,12 @@ from ..clearing._messages import (
BrokerdPosition,
)
from piker.types import Struct
from piker.log import (
get_logger,
)
from piker.log import get_logger
if TYPE_CHECKING:
from piker.data._symcache import SymbologyCache
log = get_logger(
name=__name__,
)
log = get_logger(__name__)
class Position(Struct):

View File

@ -21,6 +21,7 @@ CLI front end for trades ledger and position tracking management.
from __future__ import annotations
from pprint import pformat
from rich.console import Console
from rich.markdown import Markdown
import polars as pl
@ -28,10 +29,7 @@ import tractor
import trio
import typer
from piker.log import (
get_console_log,
get_logger,
)
from ..log import get_logger
from ..service import (
open_piker_runtime,
)
@ -47,7 +45,6 @@ from .calc import (
open_ledger_dfs,
)
log = get_logger(name=__name__)
ledger = typer.Typer()
@ -82,10 +79,7 @@ def sync(
"-l",
),
):
log = get_console_log(
level=loglevel,
name=__name__,
)
log = get_logger(loglevel)
console = Console()
pair: tuple[str, str]

View File

@ -25,16 +25,15 @@ from types import ModuleType
from tractor.trionics import maybe_open_context
from piker.log import (
get_logger,
)
from ._util import (
log,
BrokerError,
SymbolNotFound,
NoData,
DataUnavailable,
DataThrottle,
resproc,
get_logger,
)
__all__: list[str] = [
@ -44,6 +43,7 @@ __all__: list[str] = [
'DataUnavailable',
'DataThrottle',
'resproc',
'get_logger',
]
__brokers__: list[str] = [
@ -65,10 +65,6 @@ __brokers__: list[str] = [
# bitso
]
log = get_logger(
name=__name__,
)
def get_brokermod(brokername: str) -> ModuleType:
'''

View File

@ -33,18 +33,12 @@ import exceptiongroup as eg
import tractor
import trio
from piker.log import (
get_logger,
get_console_log,
)
from . import _util
from . import get_brokermod
if TYPE_CHECKING:
from ..data import _FeedsBus
log = get_logger(name=__name__)
# `brokerd` enabled modules
# TODO: move this def to the `.data` subpkg..
# NOTE: keeping this list as small as possible is part of our caps-sec
@ -78,14 +72,13 @@ async def _setup_persistent_brokerd(
# since all hosted daemon tasks will reference this same
# log instance's (actor local) state and thus don't require
# any further (level) configuration on their own B)
actor: tractor.Actor = tractor.current_actor()
tll: str = actor.loglevel
log = get_console_log(
level=loglevel or tll,
log = _util.get_console_log(
loglevel or tractor.current_actor().loglevel,
name=f'{_util.subsys}.{brokername}',
with_tractor_log=bool(tll),
)
assert log.name == _util.subsys
# set global for this actor to this new process-wide instance B)
_util.log = log
# further, set the log level on any broker broker specific
# logger instance.
@ -104,7 +97,7 @@ async def _setup_persistent_brokerd(
# NOTE: see ep invocation details inside `.data.feed`.
try:
async with (
# tractor.trionics.collapse_eg(),
tractor.trionics.collapse_eg(),
trio.open_nursery() as service_nursery
):
bus: _FeedsBus = feed.get_feed_bus(
@ -200,6 +193,7 @@ def broker_init(
async def spawn_brokerd(
brokername: str,
loglevel: str | None = None,
@ -207,10 +201,8 @@ async def spawn_brokerd(
) -> bool:
log.info(
f'Spawning broker-daemon,\n'
f'backend: {brokername!r}'
)
from piker.service._util import log # use service mngr log
log.info(f'Spawning {brokername} broker daemon')
(
brokermode,
@ -272,11 +264,6 @@ async def maybe_spawn_brokerd(
'''
from piker.service import maybe_spawn_daemon
# if (
# loglevel != 'info'
# ):
# await tractor.pause()
async with maybe_spawn_daemon(
f'brokerd.{brokername}',

View File

@ -19,13 +19,15 @@ Handy cross-broker utils.
"""
from __future__ import annotations
# from functools import partial
from functools import partial
import json
import httpx
import logging
from piker.log import (
from ..log import (
get_logger,
get_console_log,
colorize_json,
)
subsys: str = 'piker.brokers'
@ -33,22 +35,12 @@ subsys: str = 'piker.brokers'
# NOTE: level should be reset by any actor that is spawned
# as well as given a (more) explicit name/key such
# as `piker.brokers.binance` matching the subpkg.
# log = get_logger(subsys)
log = get_logger(subsys)
# ?TODO?? we could use this approach, but we need to be able
# to pass multiple `name=` values so for example we can include the
# emissions in `.accounting._pos` and others!
# [ ] maybe we could do the `log = get_logger()` above,
# then cycle through the list of subsys mods we depend on
# and then get all their loggers and pass them to
# `get_console_log(logger=)`??
# [ ] OR just write THIS `get_console_log()` as a hook which does
# that based on who calls it?.. i dunno
#
# get_console_log = partial(
# get_console_log,
# name=subsys,
# )
get_console_log = partial(
get_console_log,
name=subsys,
)
class BrokerError(Exception):

View File

@ -37,9 +37,8 @@ import trio
from piker.accounting import (
Asset,
)
from piker.log import (
from piker.brokers._util import (
get_logger,
get_console_log,
)
from piker.data._web_bs import (
open_autorecon_ws,
@ -70,9 +69,7 @@ from .venues import (
)
from .api import Client
log = get_logger(
name=__name__,
)
log = get_logger('piker.brokers.binance')
# Fee schedule template, mostly for paper engine fees modelling.
@ -248,16 +245,9 @@ async def handle_order_requests(
@tractor.context
async def open_trade_dialog(
ctx: tractor.Context,
loglevel: str = 'warning',
) -> AsyncIterator[dict[str, Any]]:
# enable piker.clearing console log for *this* `brokerd` subactor
get_console_log(
level=loglevel,
name=__name__,
)
# TODO: how do we set this from the EMS such that
# positions are loaded from the correct venue on the user
# stream at startup? (that is in an attempt to support both

View File

@ -64,9 +64,9 @@ from piker.data._web_bs import (
open_autorecon_ws,
NoBsWs,
)
from piker.log import get_logger
from piker.brokers._util import (
DataUnavailable,
get_logger,
)
from .api import (
@ -78,7 +78,7 @@ from .venues import (
get_api_eps,
)
log = get_logger(name=__name__)
log = get_logger('piker.brokers.binance')
class L1(Struct):

View File

@ -27,12 +27,14 @@ import click
import trio
import tractor
from piker.cli import cli
from piker import watchlists as wl
from piker.log import (
from ..cli import cli
from .. import watchlists as wl
from ..log import (
colorize_json,
)
from ._util import (
log,
get_console_log,
get_logger,
)
from ..service import (
maybe_spawn_brokerd,
@ -43,15 +45,12 @@ from ..brokers import (
get_brokermod,
data,
)
log = get_logger(
name=__name__,
)
DEFAULT_BROKER = 'binance'
_config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
OK = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
@ -346,10 +345,7 @@ def contracts(ctx, loglevel, broker, symbol, ids):
'''
brokermod = get_brokermod(broker)
get_console_log(
level=loglevel,
name=__name__,
)
get_console_log(loglevel)
contracts = trio.run(partial(core.contracts, brokermod, symbol))
if not ids:
@ -481,12 +477,11 @@ def search(
# the `piker --pdb` XD ..
# -[ ] pull from the parent click ctx's values..dumdum
# assert pdb
loglevel: str = config['loglevel']
# define tractor entrypoint
async def main(func):
async with maybe_open_pikerd(
loglevel=loglevel,
loglevel=config['loglevel'],
debug_mode=pdb,
):
return await func()
@ -499,7 +494,6 @@ def search(
core.symbol_search,
brokermods,
pattern,
loglevel=loglevel,
),
)

View File

@ -28,14 +28,12 @@ from typing import (
import trio
from piker.log import get_logger
from ._util import log
from . import get_brokermod
from ..service import maybe_spawn_brokerd
from . import open_cached_client
from ..accounting import MktPair
log = get_logger(name=__name__)
async def api(brokername: str, methname: str, **kwargs) -> dict:
'''
@ -149,7 +147,6 @@ async def search_w_brokerd(
async def symbol_search(
brokermods: list[ModuleType],
pattern: str,
loglevel: str = 'warning',
**kwargs,
) -> dict[str, dict[str, dict[str, Any]]]:
@ -179,7 +176,6 @@ async def symbol_search(
'_infect_asyncio',
False,
),
loglevel=loglevel
) as portal:
results.append((

View File

@ -41,15 +41,12 @@ import tractor
from tractor.experimental import msgpub
from async_generator import asynccontextmanager
from piker.log import(
get_logger,
from ._util import (
log,
get_console_log,
)
from . import get_brokermod
log = get_logger(
name='piker.brokers.binance',
)
async def wait_for_network(
net_func: Callable,
@ -246,10 +243,7 @@ async def start_quote_stream(
'''
# XXX: why do we need this again?
get_console_log(
level=tractor.current_actor().loglevel,
name=__name__,
)
get_console_log(tractor.current_actor().loglevel)
# pull global vars from local actor
symbols = list(symbols)

View File

@ -34,13 +34,13 @@ import subprocess
import tractor
from piker.log import get_logger
from piker.brokers._util import get_logger
if TYPE_CHECKING:
from .api import Client
import i3ipc
log = get_logger(name=__name__)
log = get_logger('piker.brokers.ib')
_reset_tech: Literal[
'vnc',

View File

@ -50,10 +50,6 @@ from ib_insync.objects import (
)
from piker import config
from piker.log import (
get_logger,
get_console_log,
)
from piker.types import Struct
from piker.accounting import (
Position,
@ -81,6 +77,7 @@ from piker.clearing._messages import (
BrokerdFill,
BrokerdError,
)
from ._util import log
from .api import (
_accounts2clients,
get_config,
@ -98,10 +95,6 @@ from .ledger import (
update_ledger_from_api_trades,
)
log = get_logger(
name=__name__,
)
def pack_position(
pos: IbPosition,
@ -543,15 +536,9 @@ class IbAcnt(Struct):
@tractor.context
async def open_trade_dialog(
ctx: tractor.Context,
loglevel: str = 'warning',
) -> AsyncIterator[dict[str, Any]]:
get_console_log(
level=loglevel,
name=__name__,
)
# task local msg dialog tracking
flows = OrderDialogs()
accounts_def = config.load_accounts(['ib'])

View File

@ -56,11 +56,11 @@ from piker.brokers._util import (
NoData,
DataUnavailable,
)
from piker.log import get_logger
from .api import (
# _adhoc_futes_set,
Client,
con2fqme,
log,
load_aio_clients,
MethodProxy,
open_client_proxies,
@ -78,9 +78,6 @@ from .symbols import get_mkt_info
if TYPE_CHECKING:
from trio._core._run import Task
log = get_logger(
name=__name__,
)
# XXX NOTE: See available types table docs:
# https://interactivebrokers.github.io/tws-api/tick_types.html

View File

@ -44,7 +44,6 @@ from ib_insync import (
CommissionReport,
)
from piker.log import get_logger
from piker.types import Struct
from piker.data import (
SymbologyCache,
@ -58,6 +57,7 @@ from piker.accounting import (
iter_by_dt,
)
from ._flex_reports import parse_flex_dt
from ._util import log
if TYPE_CHECKING:
from .api import (
@ -65,9 +65,6 @@ if TYPE_CHECKING:
MethodProxy,
)
log = get_logger(
name=__name__,
)
tx_sort: Callable = partial(
iter_by_dt,

View File

@ -42,7 +42,10 @@ from piker.accounting import (
from piker._cacheables import (
async_lifo_cache,
)
from piker.log import get_logger
from ._util import (
log,
)
if TYPE_CHECKING:
from .api import (
@ -50,10 +53,6 @@ if TYPE_CHECKING:
Client,
)
log = get_logger(
name=__name__,
)
_futes_venues = (
'GLOBEX',
'NYMEX',

View File

@ -62,12 +62,9 @@ from piker.clearing._messages import (
from piker.brokers import (
open_cached_client,
)
from piker.log import (
get_console_log,
get_logger,
)
from piker.data import open_symcache
from .api import (
log,
Client,
BrokerError,
)
@ -81,8 +78,6 @@ from .ledger import (
verify_balances,
)
log = get_logger(name=__name__)
MsgUnion = Union[
BrokerdCancel,
BrokerdError,
@ -436,15 +431,9 @@ def trades2pps(
@tractor.context
async def open_trade_dialog(
ctx: tractor.Context,
loglevel: str = 'warning',
) -> AsyncIterator[dict[str, Any]]:
get_console_log(
level=loglevel,
name=__name__,
)
async with (
# TODO: maybe bind these together and deliver
# a tuple from `.open_cached_client()`?

View File

@ -50,19 +50,13 @@ from . import open_cached_client
from piker._cacheables import async_lifo_cache
from .. import config
from ._util import resproc, BrokerError, SymbolNotFound
from piker.log import (
from ..log import (
colorize_json,
)
from ._util import (
log,
get_console_log,
)
from piker.log import (
get_logger,
)
log = get_logger(
name=__name__,
)
_use_practice_account = False
_refresh_token_ep = 'https://{}login.questrade.com/oauth2/'
@ -1211,10 +1205,7 @@ async def stream_quotes(
# feed_type: str = 'stock',
) -> AsyncGenerator[str, Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(
level=loglevel,
name=__name__,
)
get_console_log(loglevel)
async with open_cached_client('questrade') as client:
if feed_type == 'stock':

View File

@ -30,16 +30,9 @@ import asks
from ._util import (
resproc,
BrokerError,
log,
)
from piker.calc import percent_change
from piker.log import (
get_logger,
)
log = get_logger(
name=__name__,
)
from ..calc import percent_change
_service_ep = 'https://api.robinhood.com'

View File

@ -215,7 +215,7 @@ async def relay_orders_from_sync_code(
async def open_ems(
fqme: str,
mode: str = 'live',
loglevel: str = 'warning',
loglevel: str = 'error',
) -> tuple[
OrderClient, # client

View File

@ -47,7 +47,6 @@ from tractor import trionics
from ._util import (
log, # sub-sys logger
get_console_log,
subsys,
)
from ..accounting._mktinfo import (
unpack_fqme,
@ -352,21 +351,9 @@ async def open_brokerd_dialog(
broker backend, configuration, or client code usage.
'''
get_console_log(
level=loglevel,
name='clearing',
)
# enable `.accounting` console since normally used by
# each `brokerd`.
get_console_log(
level=loglevel,
name='piker.accounting',
)
broker: str = brokermod.name
def mk_paper_ep(
loglevel: str,
):
def mk_paper_ep():
from . import _paper_engine as paper_mod
nonlocal brokermod, exec_mode
@ -418,21 +405,17 @@ async def open_brokerd_dialog(
if (
trades_endpoint is not None
or
exec_mode != 'paper'
or exec_mode != 'paper'
):
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
@acm
async def maybe_open_paper_ep():
if exec_mode == 'paper':
async with mk_paper_ep(
loglevel=loglevel,
) as msg:
async with mk_paper_ep() as msg:
yield msg
return
@ -443,9 +426,7 @@ async def open_brokerd_dialog(
# runtime indication that the backend can't support live
# order ctrl yet, so boot the paperboi B0
if first == 'paper':
async with mk_paper_ep(
loglevel=loglevel,
) as msg:
async with mk_paper_ep() as msg:
yield msg
return
else:
@ -748,7 +729,6 @@ class Router(Struct):
except (
trio.ClosedResourceError,
trio.BrokenResourceError,
tractor.TransportClosed,
):
to_remove.add(client_stream)
log.warning(
@ -785,11 +765,7 @@ async def _setup_persistent_emsd(
) -> None:
if loglevel:
_log = get_console_log(
level=loglevel,
name=subsys,
)
assert _log.name == 'piker.clearing'
get_console_log(loglevel)
global _router
@ -1723,5 +1699,5 @@ async def _emsd_main(
if not client_streams:
log.warning(
f'Order dialog is not being monitored:\n'
f'{oid!r} <-> {client_stream.chan.aid.reprol()}\n'
f'{oid} ->\n{client_stream._ctx.chan.uid}'
)

View File

@ -59,9 +59,9 @@ from piker.data import (
open_symcache,
)
from piker.types import Struct
from piker.log import (
from ._util import (
log, # sub-sys logger
get_console_log,
get_logger,
)
from ._messages import (
BrokerdCancel,
@ -73,8 +73,6 @@ from ._messages import (
BrokerdError,
)
log = get_logger(name=__name__)
class PaperBoi(Struct):
'''
@ -552,6 +550,7 @@ _sells: defaultdict[
@tractor.context
async def open_trade_dialog(
ctx: tractor.Context,
broker: str,
fqme: str | None = None, # if empty, we only boot broker mode
@ -559,11 +558,8 @@ async def open_trade_dialog(
) -> None:
# enable piker.clearing console log for *this* `brokerd` subactor
get_console_log(
level=loglevel,
name=__name__,
)
# enable piker.clearing console log for *this* subactor
get_console_log(loglevel)
symcache: SymbologyCache
async with open_symcache(get_brokermod(broker)) as symcache:

View File

@ -28,14 +28,12 @@ from ..log import (
from piker.types import Struct
subsys: str = 'piker.clearing'
log = get_logger(
name='piker.clearing',
)
log = get_logger(subsys)
# TODO, oof doesn't this ignore the `loglevel` then???
get_console_log = partial(
get_console_log,
name='clearing',
name=subsys,
)

View File

@ -61,8 +61,7 @@ def load_trans_eps(
if (
network
and
not maddrs
and not maddrs
):
# load network section and (attempt to) connect all endpoints
# which are reachable B)
@ -113,27 +112,31 @@ def load_trans_eps(
default=None,
help='Multiaddrs to bind or contact',
)
# @click.option(
# '--tsdb',
# is_flag=True,
# help='Enable local ``marketstore`` instance'
# )
# @click.option(
# '--es',
# is_flag=True,
# help='Enable local ``elasticsearch`` instance'
# )
def pikerd(
maddr: list[str] | None,
loglevel: str,
tl: bool,
pdb: bool,
# tsdb: bool,
# es: bool,
):
'''
Start the "root service actor", `pikerd`, run it until
cancellation.
This "root daemon" operates as the top most service-mngr and
subsys-as-subactor supervisor, think of it as the "init proc" of
any of any `piker` application or daemon-process tree.
Spawn the piker broker-daemon.
'''
# from tractor.devx import maybe_open_crash_handler
# with maybe_open_crash_handler(pdb=False):
log = get_console_log(
level=loglevel,
with_tractor_log=tl,
)
log = get_console_log(loglevel, name='cli')
if pdb:
log.warning((
@ -234,14 +237,6 @@ def cli(
regaddr: str,
) -> None:
'''
The "root" `piker`-cmd CLI endpoint.
NOTE, this def generally relies on and requires a sub-cmd to be
provided by the user, OW only a `--help` msg (listing said
subcmds) will be dumped to console.
'''
if configdir is not None:
assert os.path.isdir(configdir), f"`{configdir}` is not a valid path"
config._override_config_dir(configdir)
@ -300,50 +295,17 @@ def cli(
@click.option('--tl', is_flag=True, help='Enable tractor logging')
@click.argument('ports', nargs=-1, required=False)
@click.pass_obj
def services(
config,
tl: bool,
ports: list[int],
):
'''
List all `piker` "service deamons" to the console in
a `json`-table which maps each actor's UID in the form,
def services(config, tl, ports):
`{service_name}.{subservice_name}.{UUID}`
to its (primary) IPC server address.
(^TODO, should be its multiaddr form once we support it)
Note that by convention actors which operate as "headless"
processes (those without GUIs/graphics, and which generally
parent some noteworthy subsystem) are normally suffixed by
a "d" such as,
- pikerd: the root runtime supervisor
- brokerd: a broker-backend order ctl daemon
- emsd: the internal dark-clearing and order routing daemon
- datad: a data-provider-backend data feed daemon
- samplerd: the real-time data sampling and clock-syncing daemon
"Headed units" are normally just given an obvious app-like name
with subactors indexed by `.` such as,
- chart: the primary modal charting iface, a Qt app
- chart.fsp_0: a financial-sig-proc cascade instance which
delivers graphics to a parent `chart` app.
- polars_boi: some (presumably) `polars` using console app.
'''
from piker.service import (
from ..service import (
open_piker_runtime,
_default_registry_port,
_default_registry_host,
)
# !TODO, mk this to work with UDS!
host: str = _default_registry_host
host = _default_registry_host
if not ports:
ports: list[int] = [_default_registry_port]
ports = [_default_registry_port]
addr = tractor._addr.wrap_address(
addr=(host, ports[0])
@ -354,11 +316,7 @@ def services(
async with (
open_piker_runtime(
name='service_query',
loglevel=(
config['loglevel']
if tl
else None
),
loglevel=config['loglevel'] if tl else None,
),
tractor.get_registry(
addr=addr,
@ -378,15 +336,7 @@ def services(
def _load_clis() -> None:
'''
Dynamically load and register all subsys CLI endpoints (at call
time).
NOTE, obviously this is normally expected to be called at
`import` time and implicitly relies on our use of various
`click`/`typer` decorator APIs.
'''
# from ..service import elastic # noqa
from ..brokers import cli # noqa
from ..ui import cli # noqa
from ..watchlists import cli # noqa
@ -396,5 +346,5 @@ def _load_clis() -> None:
from ..accounting import cli # noqa
# load all subsytem cli eps
# load downstream cli modules
_load_clis()

View File

@ -99,7 +99,6 @@ class Sampler:
trio.BrokenResourceError,
trio.ClosedResourceError,
trio.EndOfChannel,
tractor.TransportClosed,
)
# holds all the ``tractor.Context`` remote subscriptions for
@ -292,10 +291,9 @@ class Sampler:
except self.bcast_errors as err:
log.error(
f'Connection dropped for IPC ctx due to,\n'
f'{type(err)!r}\n'
f'\n'
f'{stream._ctx}'
f'Connection dropped for IPC ctx\n'
f'{stream._ctx}\n\n'
f'Due to {type(err)}'
)
borked.add(stream)
else:
@ -336,18 +334,10 @@ async def register_with_sampler(
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates?
loglevel: str|None = None,
) -> set[int]:
get_console_log(
level=(
loglevel
or
tractor.current_actor().loglevel
),
name=__name__,
)
get_console_log(tractor.current_actor().loglevel)
incr_was_started: bool = False
try:
@ -484,7 +474,6 @@ async def spawn_samplerd(
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
loglevel=loglevel,
)
return True
@ -493,6 +482,7 @@ async def spawn_samplerd(
@acm
async def maybe_open_samplerd(
loglevel: str|None = None,
**pikerd_kwargs,
@ -521,10 +511,10 @@ async def open_sample_stream(
shms_by_period: dict[float, dict]|None = None,
open_index_stream: bool = True,
sub_for_broadcasts: bool = True,
loglevel: str|None = None,
# cache_key: str|None = None,
# allow_new_sampler: bool = True,
cache_key: str|None = None,
allow_new_sampler: bool = True,
ensure_is_active: bool = False,
) -> AsyncIterator[dict[str, float]]:
@ -559,9 +549,7 @@ async def open_sample_stream(
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
maybe_open_samplerd(
loglevel=loglevel,
) as portal,
maybe_open_samplerd() as portal,
portal.open_context(
register_with_sampler,
@ -570,7 +558,6 @@ async def open_sample_stream(
'shms_by_period': shms_by_period,
'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts,
'loglevel': loglevel,
},
) as (ctx, shm_periods)
):
@ -766,7 +753,7 @@ async def sample_and_broadcast(
log.warning(
f'Feed OVERRUN {sub_key}'
f'@{bus.brokername} -> \n'
f'feed @ {chan.aid.reprol()}\n'
f'feed @ {chan.uid}\n'
f'throttle = {throttle} Hz'
)

View File

@ -26,9 +26,7 @@ from ..log import (
)
subsys: str = 'piker.data'
log = get_logger(
name=subsys,
)
log = get_logger(subsys)
get_console_log = partial(
get_console_log,

View File

@ -31,7 +31,6 @@ from typing import (
AsyncContextManager,
AsyncGenerator,
Iterable,
Type,
)
import json
@ -68,7 +67,7 @@ class NoBsWs:
'''
# apparently we can QoS for all sorts of reasons..so catch em.
recon_errors: tuple[Type[Exception]] = (
recon_errors = (
ConnectionClosed,
DisconnectionTimeout,
ConnectionRejected,
@ -106,10 +105,7 @@ class NoBsWs:
def connected(self) -> bool:
return self._connected.is_set()
async def reset(
self,
timeout: float,
) -> bool:
async def reset(self) -> None:
'''
Reset the underlying ws connection by cancelling
the bg relay task and waiting for it to signal
@ -118,31 +114,18 @@ class NoBsWs:
'''
self._connected = trio.Event()
self._cs.cancel()
with trio.move_on_after(timeout) as cs:
await self._connected.wait()
return True
assert cs.cancelled_caught
return False
async def send_msg(
self,
data: Any,
timeout: float = 3,
) -> None:
while True:
try:
msg: Any = self._dumps(data)
return await self._ws.send_message(msg)
except self.recon_errors:
with trio.CancelScope(shield=True):
reconnected: bool = await self.reset(
timeout=timeout,
)
if not reconnected:
log.warning(
'Failed to reconnect after {timeout!r}s ??'
)
await self.reset()
async def recv_msg(self) -> Any:
msg: Any = await self._rx.receive()
@ -208,9 +191,7 @@ async def _reconnect_forever(
f'{src_mod}\n'
f'{url} connection bail with:'
)
with trio.CancelScope(shield=True):
await trio.sleep(0.5)
rent_cs.cancel()
# go back to reonnect loop in parent task
@ -310,7 +291,6 @@ async def _reconnect_forever(
log.exception(
'Reconnect-attempt failed ??\n'
)
with trio.CancelScope(shield=True):
await trio.sleep(0.2) # throttle
raise berr
@ -371,7 +351,6 @@ async def open_autorecon_ws(
rcv: trio.MemoryReceiveChannel
snd, rcv = trio.open_memory_channel(616)
try:
async with (
tractor.trionics.collapse_eg(),
trio.open_nursery() as tn
@ -399,12 +378,6 @@ async def open_autorecon_ws(
finally:
tn.cancel_scope.cancel()
except NoBsWs.recon_errors as con_err:
log.warning(
f'Entire ws-channel disconnect due to,\n'
f'con_err: {con_err!r}\n'
)
'''
JSONRPC response-request style machinery for transparent multiplexing

View File

@ -62,6 +62,7 @@ from ._util import (
log,
get_console_log,
)
from .flows import Flume
from .validate import (
FeedInit,
validate_backend,
@ -76,7 +77,6 @@ from ._sampling import (
)
if TYPE_CHECKING:
from .flows import Flume
from tractor._addr import Address
from tractor.msg.types import Aid
@ -239,6 +239,7 @@ async def allocate_persistent_feed(
brokername: str,
symstr: str,
loglevel: str,
start_stream: bool = True,
init_timeout: float = 616,
@ -347,14 +348,11 @@ async def allocate_persistent_feed(
izero_rt,
rt_shm,
) = await bus.nursery.start(
partial(
manage_history,
mod=mod,
mkt=mkt,
some_data_ready=some_data_ready,
feed_is_live=feed_is_live,
loglevel=loglevel,
)
mod,
mkt,
some_data_ready,
feed_is_live,
)
# yield back control to starting nursery once we receive either
@ -364,8 +362,6 @@ async def allocate_persistent_feed(
)
await some_data_ready.wait()
# XXX, avoid cycle; it imports this mod.
from .flows import Flume
flume = Flume(
# TODO: we have to use this for now since currently the
@ -462,6 +458,7 @@ async def allocate_persistent_feed(
@tractor.context
async def open_feed_bus(
ctx: tractor.Context,
brokername: str,
symbols: list[str], # normally expected to the broker-specific fqme
@ -482,16 +479,13 @@ async def open_feed_bus(
'''
if loglevel is None:
loglevel: str = tractor.current_actor().loglevel
loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker
# logging
get_console_log(
level=(loglevel
or
tractor.current_actor().loglevel
),
name=__name__,
loglevel
or tractor.current_actor().loglevel
)
# local state sanity checks
@ -506,6 +500,7 @@ async def open_feed_bus(
sub_registered = trio.Event()
flumes: dict[str, Flume] = {}
for symbol in symbols:
# if no cached feed for this symbol has been created for this
@ -689,7 +684,6 @@ class Feed(Struct):
'''
mods: dict[str, ModuleType] = {}
portals: dict[ModuleType, tractor.Portal] = {}
flumes: dict[
str, # FQME
Flume,
@ -819,11 +813,6 @@ async def maybe_open_feed(
'''
fqme = fqmes[0]
# if (
# loglevel != 'info'
# ):
# await tractor.pause()
async with trionics.maybe_open_context(
acm_func=open_feed,
kwargs={
@ -890,12 +879,9 @@ async def open_feed(
providers.setdefault(mod, []).append(bs_fqme)
feed.mods[mod.name] = mod
if (
loglevel != 'info'
):
await tractor.pause()
# one actor per brokerd for now
brokerd_ctxs = []
for brokermod, bfqmes in providers.items():
# if no `brokerd` for this backend exists yet we spawn
@ -965,8 +951,6 @@ async def open_feed(
assert len(feed.mods) == len(feed.portals)
# XXX, avoid cycle; it imports this mod.
from .flows import Flume
async with (
trionics.gather_contexts(bus_ctxs) as ctxs,
):

View File

@ -24,7 +24,6 @@ from functools import partial
from typing import (
AsyncIterator,
Callable,
TYPE_CHECKING,
)
import numpy as np
@ -34,12 +33,12 @@ import tractor
from tractor.msg import NamespacePath
from piker.types import Struct
from ..log import (
get_logger,
get_console_log,
)
from ..log import get_logger, get_console_log
from .. import data
from ..data.flows import Flume
from ..data.feed import (
Flume,
Feed,
)
from ..data._sharedmem import ShmArray
from ..data._sampling import (
_default_delay_s,
@ -53,9 +52,6 @@ from ._api import (
)
from ..toolz import Profiler
if TYPE_CHECKING:
from ..data.feed import Feed
log = get_logger(__name__)
@ -173,10 +169,8 @@ class Cascade(Struct):
if not synced:
fsp: Fsp = self.fsp
log.warning(
f'***DESYNCED fsp***\n'
f'------------------\n'
f'ns-path: {fsp.ns_path!r}\n'
f'shm-token: {src_shm.token}\n'
'***DESYNCED FSP***\n'
f'{fsp.ns_path}@{src_shm.token}\n'
f'step_diff: {step_diff}\n'
f'len_diff: {len_diff}\n'
)
@ -404,6 +398,7 @@ async def connect_streams(
@tractor.context
async def cascade(
ctx: tractor.Context,
# data feed key
@ -431,17 +426,7 @@ async def cascade(
)
if loglevel:
log = get_console_log(
loglevel,
name=__name__,
)
# XXX TODO!
# figure out why this writes a dict to,
# `tractor._state._runtime_vars['_root_mailbox']`
# XD .. wtf
# TODO, solve this as reported in,
# https://www.pikers.dev/pikers/piker/issues/70
# await tractor.pause()
get_console_log(loglevel)
src: Flume = Flume.from_msg(src_flume_addr)
dst: Flume = Flume.from_msg(
@ -484,8 +469,7 @@ async def cascade(
# open a data feed stream with requested broker
feed: Feed
async with data.feed.maybe_open_feed(
fqmes=[fqme],
loglevel=loglevel,
[fqme],
# TODO throttle tick outputs from *this* daemon since
# it'll emit tons of ticks due to the throttle only
@ -583,8 +567,7 @@ async def cascade(
# on every step msg received from the global `samplerd`
# service.
async with open_sample_stream(
period_s=float(delay_s),
loglevel=loglevel,
float(delay_s)
) as istream:
profiler(f'{func_name}: sample stream up')

View File

@ -37,84 +37,35 @@ _proj_name: str = 'piker'
def get_logger(
name: str|None = None,
**tractor_log_kwargs,
name: str = None,
) -> logging.Logger:
'''
Return the package log or a sub-logger if a `name=` is provided,
which defaults to the calling module's pkg-namespace path.
See `tractor.log.get_logger()` for details.
Return the package log or a sub-log for `name` if provided.
'''
pkg_name: str = _proj_name
if (
name
and
pkg_name in name
):
name: str = name.lstrip(f'{_proj_name}.')
return tractor.log.get_logger(
name=name,
pkg_name=pkg_name,
**tractor_log_kwargs,
_root_name=_proj_name,
)
def get_console_log(
level: str | None = None,
name: str | None = None,
pkg_name: str|None = None,
with_tractor_log: bool = False,
# ?TODO, support a "log-spec" style `str|dict[str, str]` which
# dictates both the sublogger-key and a level?
# -> see similar idea in `modden`'s usage.
**tractor_log_kwargs,
) -> logging.Logger:
'''
Get the package logger and enable a handler which writes to
stderr.
Get the package logger and enable a handler which writes to stderr.
Yeah yeah, i know we can use `DictConfig`.
You do it.. Bp
Yeah yeah, i know we can use ``DictConfig``. You do it...
'''
pkg_name: str = _proj_name
if (
name
and
pkg_name in name
):
name: str = name.lstrip(f'{_proj_name}.')
tll: str|None = None
if (
with_tractor_log is not False
):
tll = level
elif maybe_actor := tractor.current_actor(
err_on_no_runtime=False,
):
tll = maybe_actor.loglevel
if tll:
t_log = tractor.log.get_console_log(
level=tll,
name='tractor', # <- XXX, force root tractor log!
**tractor_log_kwargs,
)
# TODO/ allow only enabling certain tractor sub-logs?
assert t_log.name == 'tractor'
return tractor.log.get_console_log(
level=level,
level,
name=name,
pkg_name=pkg_name,
**tractor_log_kwargs,
)
_root_name=_proj_name,
) # our root logger
def colorize_json(

View File

@ -21,6 +21,7 @@
from __future__ import annotations
import os
from typing import (
Optional,
Any,
ClassVar,
)
@ -31,11 +32,8 @@ from contextlib import (
import tractor
import trio
from piker.log import (
get_console_log,
)
from ._util import (
subsys,
get_console_log,
)
from ._mngr import (
Services,
@ -61,7 +59,7 @@ async def open_piker_runtime(
registry_addrs: list[tuple[str, int]] = [],
enable_modules: list[str] = [],
loglevel: str|None = None,
loglevel: Optional[str] = None,
# XXX NOTE XXX: you should pretty much never want debug mode
# for data daemons when running in production.
@ -99,8 +97,7 @@ async def open_piker_runtime(
# setting it as the root actor on localhost.
registry_addrs = (
registry_addrs
or
[_default_reg_addr]
or [_default_reg_addr]
)
if ems := tractor_kwargs.pop('enable_modules', None):
@ -166,6 +163,7 @@ _root_modules: list[str] = [
@acm
async def open_pikerd(
registry_addrs: list[tuple[str, int]],
loglevel: str | None = None,
# XXX: you should pretty much never want debug mode
@ -194,6 +192,7 @@ async def open_pikerd(
async with (
open_piker_runtime(
name=_root_dname,
loglevel=loglevel,
debug_mode=debug_mode,
@ -274,10 +273,7 @@ async def maybe_open_pikerd(
'''
if loglevel:
get_console_log(
name=subsys,
level=loglevel
)
get_console_log(loglevel)
# subtle, we must have the runtime up here or portal lookup will fail
query_name = kwargs.pop(

View File

@ -49,15 +49,13 @@ from requests.exceptions import (
ReadTimeout,
)
from piker.log import (
get_console_log,
get_logger,
)
from ._mngr import Services
from ._util import (
log, # sub-sys logger
get_console_log,
)
from .. import config
log = get_logger(name=__name__)
class DockerNotStarted(Exception):
'Prolly you dint start da daemon bruh'
@ -338,16 +336,13 @@ class Container:
async def open_ahabd(
ctx: tractor.Context,
endpoint: str, # ns-pointer str-msg-type
loglevel: str = 'cancel',
loglevel: str | None = None,
**ep_kwargs,
) -> None:
log = get_console_log(
level=loglevel,
name='piker.service',
)
log = get_console_log(loglevel or 'cancel')
async with open_docker() as client:

View File

@ -30,9 +30,8 @@ from contextlib import (
import tractor
from trio.lowlevel import current_task
from piker.log import (
get_console_log,
get_logger,
from ._util import (
log, # sub-sys logger
)
from ._mngr import (
Services,
@ -40,11 +39,10 @@ from ._mngr import (
from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service
log = get_logger(name=__name__)
@acm
async def maybe_spawn_daemon(
service_name: str,
service_task_target: Callable,
@ -68,12 +66,6 @@ async def maybe_spawn_daemon(
clients.
'''
log = get_console_log(
level=loglevel,
name=__name__,
)
assert log.name == 'piker.service'
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Services.locks[service_name]
@ -160,6 +152,7 @@ async def maybe_spawn_daemon(
async def spawn_emsd(
loglevel: str | None = None,
**extra_tractor_kwargs
@ -197,6 +190,7 @@ async def spawn_emsd(
@acm
async def maybe_open_emsd(
brokername: str,
loglevel: str | None = None,

View File

@ -34,9 +34,9 @@ from tractor import (
Portal,
)
from piker.log import get_logger
log = get_logger(name=__name__)
from ._util import (
log, # sub-sys logger
)
# TODO: we need remote wrapping and a general soln:

View File

@ -27,29 +27,15 @@ from typing import (
)
import tractor
from tractor import (
msg,
Actor,
Portal,
from tractor import Portal
from ._util import (
log, # sub-sys logger
)
from piker.log import get_logger
log = get_logger(name=__name__)
# TODO? default path-space for UDS registry?
# [ ] needs to be Xplatform tho!
# _default_registry_path: Path = (
# Path(os.environ['XDG_RUNTIME_DIR'])
# /'piker'
# )
_default_registry_host: str = '127.0.0.1'
_default_registry_port: int = 6116
_default_reg_addr: tuple[
str,
int, # |str TODO, once we support UDS, see above.
] = (
_default_reg_addr: tuple[str, int] = (
_default_registry_host,
_default_registry_port,
)
@ -89,22 +75,16 @@ async def open_registry(
'''
global _tractor_kwargs
actor: Actor = tractor.current_actor()
aid: msg.Aid = actor.aid
uid: tuple[str, str] = aid.uid
preset_reg_addrs: list[
tuple[str, int]
] = Registry.addrs
actor = tractor.current_actor()
uid = actor.uid
preset_reg_addrs: list[tuple[str, int]] = Registry.addrs
if (
preset_reg_addrs
and
addrs
and addrs
):
if preset_reg_addrs != addrs:
# if any(addr in preset_reg_addrs for addr in addrs):
diff: set[
tuple[str, int]
] = set(preset_reg_addrs) - set(addrs)
diff: set[tuple[str, int]] = set(preset_reg_addrs) - set(addrs)
if diff:
log.warning(
f'`{uid}` requested only subset of registrars: {addrs}\n'
@ -118,6 +98,7 @@ async def open_registry(
)
was_set: bool = False
if (
not tractor.is_root_process()
and
@ -134,23 +115,16 @@ async def open_registry(
f"`{uid}` registry should already exist but doesn't?"
)
if not Registry.addrs:
if (
not Registry.addrs
):
was_set = True
Registry.addrs = (
addrs
or
[_default_reg_addr]
)
Registry.addrs = addrs or [_default_reg_addr]
# NOTE: only spot this seems currently used is inside
# `.ui._exec` which is the (eventual qtloops) bootstrapping
# with guest mode.
reg_addrs: list[tuple[str, str|int]] = Registry.addrs
# !TODO, a struct-API to stringently allow this only in special
# cases?
# -> better would be to have some way to (atomically) rewrite
# and entire `RuntimeVars`?? ideas welcome obvi..
_tractor_kwargs['registry_addrs'] = reg_addrs
_tractor_kwargs['registry_addrs'] = Registry.addrs
try:
yield Registry.addrs
@ -175,7 +149,7 @@ async def find_service(
| None
):
# try:
reg_addrs: list[tuple[str, int|str]]
reg_addrs: list[tuple[str, int]]
async with open_registry(
addrs=(
registry_addrs
@ -198,13 +172,15 @@ async def find_service(
only_first=first_only, # if set only returns single ref
) as maybe_portals:
if not maybe_portals:
log.info(
# log.info(
print(
f'Could NOT find service {service_name!r} -> {maybe_portals!r}'
)
yield None
return
log.info(
# log.info(
print(
f'Found service {service_name!r} -> {maybe_portals}'
)
yield maybe_portals
@ -219,6 +195,7 @@ async def find_service(
async def check_for_service(
service_name: str,
) -> None | tuple[str, int]:
'''
Service daemon "liveness" predicate.

View File

@ -14,12 +14,20 @@
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
"""
Sub-sys module commons (if any ?? Bp).
Sub-sys module commons.
"""
from functools import partial
from ..log import (
get_logger,
get_console_log,
)
subsys: str = 'piker.service'
# ?TODO, if we were going to keep a `get_console_log()` in here to be
# invoked at `import`-time, how do we dynamically hand in the
# `level=` value? seems too early in the runtime to be injected
# right?
log = get_logger(subsys)
get_console_log = partial(
get_console_log,
name=subsys,
)

View File

@ -16,7 +16,6 @@
from __future__ import annotations
from contextlib import asynccontextmanager as acm
from pprint import pformat
from typing import (
Any,
TYPE_CHECKING,
@ -27,17 +26,12 @@ import asks
if TYPE_CHECKING:
import docker
from ._ahab import DockerContainer
from . import (
Services,
)
from piker.log import (
from ._util import log # sub-sys logger
from ._util import (
get_console_log,
get_logger,
)
log = get_logger(name=__name__)
# container level config
_config = {
@ -73,10 +67,7 @@ def start_elasticsearch(
elastic
'''
get_console_log(
level='info',
name=__name__,
)
get_console_log('info', name=__name__)
dcntr: DockerContainer = client.containers.run(
'piker:elastic',

View File

@ -52,18 +52,17 @@ import pendulum
# TODO: import this for specific error set expected by mkts client
# import purerpc
from piker.data.feed import maybe_open_feed
from ..data.feed import maybe_open_feed
from . import Services
from piker.log import (
from ._util import (
log, # sub-sys logger
get_console_log,
get_logger,
)
if TYPE_CHECKING:
import docker
from ._ahab import DockerContainer
log = get_logger(name=__name__)
# ahabd-supervisor and container level config

View File

@ -54,10 +54,10 @@ from ..log import (
# for "time series processing"
subsys: str = 'piker.tsp'
log = get_logger(name=__name__)
log = get_logger(subsys)
get_console_log = partial(
get_console_log,
name=subsys, # activate for subsys-pkg "downward"
name=subsys,
)
# NOTE: union type-defs to handle generic `numpy` and `polars` types

View File

@ -63,10 +63,8 @@ from ..data._sharedmem import (
maybe_open_shm_array,
ShmArray,
)
from piker.data._source import (
def_iohlcv_fields,
)
from piker.data._sampling import (
from ..data._source import def_iohlcv_fields
from ..data._sampling import (
open_sample_stream,
)
@ -98,9 +96,7 @@ if TYPE_CHECKING:
# from .feed import _FeedsBus
log = get_logger(
name=__name__,
)
log = get_logger(__name__)
# `ShmArray` buffer sizing configuration:
@ -554,7 +550,7 @@ async def start_backfill(
)
# ?TODO, check against venue closure hours
# if/when provided by backend?
# await tractor.pause()
await tractor.pause()
expected_dur: Interval = (
last_start_dt.subtract(
@ -1324,7 +1320,6 @@ async def manage_history(
mkt: MktPair,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
loglevel: str = 'warning',
timeframe: float = 60, # in seconds
wait_for_live_timeout: float = 0.5,
@ -1502,7 +1497,6 @@ async def manage_history(
# data feed layer that needs to consume it).
open_index_stream=True,
sub_for_broadcasts=False,
loglevel=loglevel,
) as sample_stream:
# register 1s and 1m buffers with the global

View File

@ -21,6 +21,230 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib
types.
'''
from tractor.msg.pretty_struct import (
Struct as Struct,
from __future__ import annotations
from collections import UserList
from pprint import (
saferepr,
)
from typing import Any
from msgspec import (
msgpack,
Struct as _Struct,
structs,
)
class DiffDump(UserList):
'''
Very simple list delegator that repr() dumps (presumed) tuple
elements of the form `tuple[str, Any, Any]` in a nice
multi-line readable form for analyzing `Struct` diffs.
'''
def __repr__(self) -> str:
if not len(self):
return super().__repr__()
# format by displaying item pair's ``repr()`` on multiple,
# indented lines such that they are more easily visually
# comparable when printed to console when printed to
# console.
repstr: str = '[\n'
for k, left, right in self:
repstr += (
f'({k},\n'
f'\t{repr(left)},\n'
f'\t{repr(right)},\n'
')\n'
)
repstr += ']\n'
return repstr
class Struct(
_Struct,
# https://jcristharif.com/msgspec/structs.html#tagged-unions
# tag='pikerstruct',
# tag=True,
):
'''
A "human friendlier" (aka repl buddy) struct subtype.
'''
def _sin_props(self) -> Iterator[
tuple[
structs.FieldIinfo,
str,
Any,
]
]:
'''
Iterate over all non-@property fields of this struct.
'''
fi: structs.FieldInfo
for fi in structs.fields(self):
key: str = fi.name
val: Any = getattr(self, key)
yield fi, key, val
def to_dict(
self,
include_non_members: bool = True,
) -> dict:
'''
Like it sounds.. direct delegation to:
https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict
BUT, by default we pop all non-member (aka not defined as
struct fields) fields by default.
'''
asdict: dict = structs.asdict(self)
if include_non_members:
return asdict
# only return a dict of the struct members
# which were provided as input, NOT anything
# added as type-defined `@property` methods!
sin_props: dict = {}
fi: structs.FieldInfo
for fi, k, v in self._sin_props():
sin_props[k] = asdict[k]
return sin_props
def pformat(
self,
field_indent: int = 2,
indent: int = 0,
) -> str:
'''
Recursion-safe `pprint.pformat()` style formatting of
a `msgspec.Struct` for sane reading by a human using a REPL.
'''
# global whitespace indent
ws: str = ' '*indent
# field whitespace indent
field_ws: str = ' '*(field_indent + indent)
# qtn: str = ws + self.__class__.__qualname__
qtn: str = self.__class__.__qualname__
obj_str: str = '' # accumulator
fi: structs.FieldInfo
k: str
v: Any
for fi, k, v in self._sin_props():
# TODO: how can we prefer `Literal['option1', 'option2,
# ..]` over .__name__ == `Literal` but still get only the
# latter for simple types like `str | int | None` etc..?
ft: type = fi.type
typ_name: str = getattr(ft, '__name__', str(ft))
# recurse to get sub-struct's `.pformat()` output Bo
if isinstance(v, Struct):
val_str: str = v.pformat(
indent=field_indent + indent,
field_indent=indent + field_indent,
)
else: # the `pprint` recursion-safe format:
# https://docs.python.org/3.11/library/pprint.html#pprint.saferepr
val_str: str = saferepr(v)
obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n')
return (
f'{qtn}(\n'
f'{obj_str}'
f'{ws})'
)
# TODO: use a pprint.PrettyPrinter instance around ONLY rendering
# inside a known tty?
# def __repr__(self) -> str:
# ...
# __str__ = __repr__ = pformat
__repr__ = pformat
def copy(
self,
update: dict | None = None,
) -> Struct:
'''
Validate-typecast all self defined fields, return a copy of
us with all such fields.
NOTE: This is kinda like the default behaviour in
`pydantic.BaseModel` except a copy of the object is
returned making it compat with `frozen=True`.
'''
if update:
for k, v in update.items():
setattr(self, k, v)
# NOTE: roundtrip serialize to validate
# - enode to msgpack binary format,
# - decode that back to a struct.
return msgpack.Decoder(type=type(self)).decode(
msgpack.Encoder().encode(self)
)
def typecast(
self,
# TODO: allow only casting a named subset?
# fields: set[str] | None = None,
) -> None:
'''
Cast all fields using their declared type annotations
(kinda like what `pydantic` does by default).
NOTE: this of course won't work on frozen types, use
``.copy()`` above in such cases.
'''
# https://jcristharif.com/msgspec/api.html#msgspec.structs.fields
fi: structs.FieldInfo
for fi in structs.fields(self):
setattr(
self,
fi.name,
fi.type(getattr(self, fi.name)),
)
def __sub__(
self,
other: Struct,
) -> DiffDump[tuple[str, Any, Any]]:
'''
Compare fields/items key-wise and return a ``DiffDump``
for easy visual REPL comparison B)
'''
diffs: DiffDump[tuple[str, Any, Any]] = DiffDump()
for fi in structs.fields(self):
attr_name: str = fi.name
ours: Any = getattr(self, attr_name)
theirs: Any = getattr(other, attr_name)
if ours != theirs:
diffs.append((
attr_name,
ours,
theirs,
))
return diffs

View File

@ -33,10 +33,7 @@ from . import _search
from ..accounting import unpack_fqme
from ..data._symcache import open_symcache
from ..data.feed import install_brokerd_search
from ..log import (
get_logger,
get_console_log,
)
from ..log import get_logger
from ..service import maybe_spawn_brokerd
from ._exec import run_qtractor
@ -90,13 +87,6 @@ async def _async_main(
Provision the "main" widget with initial symbol data and root nursery.
"""
# enable chart's console logging
if loglevel:
get_console_log(
level=loglevel,
name=__name__,
)
# set as singleton
_chart._godw = main_widget

View File

@ -29,6 +29,7 @@ from typing import (
)
import pyqtgraph as pg
import trio
from piker.ui.qt import (
QtCore,
@ -40,7 +41,6 @@ from piker.ui.qt import (
QVBoxLayout,
QSplitter,
)
from ._widget import GodWidget
from ._axes import (
DynamicDateAxis,
PriceAxis,
@ -61,6 +61,10 @@ from ._style import (
_xaxis_at,
# _min_points_to_show,
)
from ..data.feed import (
Feed,
Flume,
)
from ..accounting import (
MktPair,
)
@ -74,12 +78,305 @@ from . import _pg_overrides as pgo
if TYPE_CHECKING:
from ._display import DisplayState
from ..data.flows import Flume
from ..data.feed import Feed
log = get_logger(__name__)
_godw: GodWidget|None = None
def get_godw() -> GodWidget:
'''
Get the top level "god widget", the root/central-most Qt
widget-object set as `QMainWindow.setCentralWidget(_godw)`.
See `piker.ui._exec` for the runtime init details and all the
machinery for running `trio` on the Qt event loop in guest mode.
'''
if _godw is None:
raise RuntimeError(
'No god-widget initialized ??\n'
'Have you called `run_qtractor()` yet?\n'
)
return _godw
class GodWidget(QWidget):
'''
"Our lord and savior, the holy child of window-shua, there is no
widget above thee." - 6|6
The highest level composed widget which contains layouts for
organizing charts as well as other sub-widgets used to control or
modify them.
'''
search: SearchWidget
mode_name: str = 'god'
def __init__(
self,
parent=None,
) -> None:
super().__init__(parent)
self.search: SearchWidget|None = None
self.hbox = QHBoxLayout(self)
self.hbox.setContentsMargins(0, 0, 0, 0)
self.hbox.setSpacing(6)
self.hbox.setAlignment(Qt.AlignTop)
self.vbox = QVBoxLayout()
self.vbox.setContentsMargins(0, 0, 0, 0)
self.vbox.setSpacing(2)
self.vbox.setAlignment(Qt.AlignTop)
self.hbox.addLayout(self.vbox)
self._chart_cache: dict[
str,
tuple[LinkedSplits, LinkedSplits],
] = {}
self.hist_linked: LinkedSplits|None = None
self.rt_linked: LinkedSplits|None = None
self._active_cursor: Cursor|None = None
# assigned in the startup func `_async_main()`
self._root_n: trio.Nursery = None
self._widgets: dict[str, QWidget] = {}
self._resizing: bool = False
# TODO: do we need this, when would god get resized
# and the window does not? Never right?!
# self.reg_for_resize(self)
# TODO: strat loader/saver that we don't need yet.
# def init_strategy_ui(self):
# self.toolbar_layout = QHBoxLayout()
# self.toolbar_layout.setContentsMargins(0, 0, 0, 0)
# self.vbox.addLayout(self.toolbar_layout)
# self.strategy_box = StrategyBoxWidget(self)
# self.toolbar_layout.addWidget(self.strategy_box)
@property
def linkedsplits(self) -> LinkedSplits:
return self.rt_linked
def set_chart_symbols(
self,
group_key: tuple[str], # of form <fqme>.<providername>
all_linked: tuple[LinkedSplits, LinkedSplits], # type: ignore
) -> None:
# re-sort org cache symbol list in LIFO order
cache = self._chart_cache
cache.pop(group_key, None)
cache[group_key] = all_linked
def get_chart_symbols(
self,
symbol_key: str,
) -> tuple[LinkedSplits, LinkedSplits]: # type: ignore
return self._chart_cache.get(symbol_key)
async def load_symbols(
self,
fqmes: list[str],
loglevel: str,
reset: bool = False,
) -> trio.Event:
'''
Load a new contract into the charting app.
Expects a ``numpy`` structured array containing all the ohlcv fields.
'''
# NOTE: for now we use the first symbol in the set as the "key"
# for the overlay of feeds on the chart.
group_key: tuple[str] = tuple(fqmes)
all_linked = self.get_chart_symbols(group_key)
order_mode_started = trio.Event()
if not self.vbox.isEmpty():
# XXX: seems to make switching slower?
# qframe = self.hist_linked.chart.qframe
# if qframe.sidepane is self.search:
# qframe.hbox.removeWidget(self.search)
for linked in [self.rt_linked, self.hist_linked]:
# XXX: this is CRITICAL especially with pixel buffer caching
linked.hide()
linked.unfocus()
# XXX: pretty sure we don't need this
# remove any existing plots?
# XXX: ahh we might want to support cache unloading..
# self.vbox.removeWidget(linked)
# switching to a new viewable chart
if all_linked is None or reset:
from ._display import display_symbol_data
# we must load a fresh linked charts set
self.rt_linked = rt_charts = LinkedSplits(self)
self.hist_linked = hist_charts = LinkedSplits(self)
# spawn new task to start up and update new sub-chart instances
self._root_n.start_soon(
display_symbol_data,
self,
fqmes,
loglevel,
order_mode_started,
)
# self.vbox.addWidget(hist_charts)
self.vbox.addWidget(rt_charts)
self.set_chart_symbols(
group_key,
(hist_charts, rt_charts),
)
for linked in [hist_charts, rt_charts]:
linked.show()
linked.focus()
await trio.sleep(0)
else:
# symbol is already loaded and ems ready
order_mode_started.set()
self.hist_linked, self.rt_linked = all_linked
for linked in all_linked:
# TODO:
# - we'll probably want per-instrument/provider state here?
# change the order config form over to the new chart
# chart is already in memory so just focus it
linked.show()
linked.focus()
linked.graphics_cycle()
await trio.sleep(0)
# resume feeds *after* rendering chart view asap
chart = linked.chart
if chart:
chart.resume_all_feeds()
# TODO: we need a check to see if the chart
# last had the xlast in view, if so then shift so it's
# still in view, if the user was viewing history then
# do nothing yah?
self.rt_linked.chart.main_viz.default_view(
do_min_bars=True,
)
# if a history chart instance is already up then
# set the search widget as its sidepane.
hist_chart = self.hist_linked.chart
if hist_chart:
hist_chart.qframe.set_sidepane(self.search)
# NOTE: this is really stupid/hard to follow.
# we have to reposition the active position nav
# **AFTER** applying the search bar as a sidepane
# to the newly switched to symbol.
await trio.sleep(0)
# TODO: probably stick this in some kinda `LooknFeel` API?
for tracker in self.rt_linked.mode.trackers.values():
pp_nav = tracker.nav
if tracker.live_pp.cumsize:
pp_nav.show()
pp_nav.hide_info()
else:
pp_nav.hide()
# set window titlebar info
symbol = self.rt_linked.mkt
if symbol is not None:
self.window.setWindowTitle(
f'{symbol.fqme} '
f'tick:{symbol.size_tick}'
)
return order_mode_started
def focus(self) -> None:
'''
Focus the top level widget which in turn focusses the chart
ala "view mode".
'''
# go back to view-mode focus (aka chart focus)
self.clearFocus()
chart = self.rt_linked.chart
if chart:
chart.setFocus()
def reg_for_resize(
self,
widget: QWidget,
) -> None:
getattr(widget, 'on_resize')
self._widgets[widget.mode_name] = widget
def on_win_resize(self, event: QtCore.QEvent) -> None:
'''
Top level god widget handler from window (the real yaweh) resize
events such that any registered widgets which wish to be
notified are invoked using our pythonic `.on_resize()` method
api.
Where we do UX magic to make things not suck B)
'''
if self._resizing:
return
self._resizing = True
log.info('God widget resize')
for name, widget in self._widgets.items():
widget.on_resize()
self._resizing = False
# on_resize = on_win_resize
def get_cursor(self) -> Cursor:
return self._active_cursor
def iter_linked(self) -> Iterator[LinkedSplits]:
for linked in [self.hist_linked, self.rt_linked]:
yield linked
def resize_all(self) -> None:
'''
Dynamic resize sequence: adjusts all sub-widgets/charts to
sensible default ratios of what space is detected as available
on the display / window.
'''
rt_linked = self.rt_linked
rt_linked.set_split_sizes()
self.rt_linked.resize_sidepanes()
self.hist_linked.resize_sidepanes(from_linked=rt_linked)
self.search.on_resize()
class ChartnPane(QFrame):
'''
One-off ``QFrame`` composite which pairs a chart
@ -141,6 +438,7 @@ class LinkedSplits(QWidget):
'''
def __init__(
self,
godwidget: GodWidget,
@ -752,7 +1050,7 @@ class ChartPlotWidget(pg.PlotWidget):
) -> None:
'''
Increment the data view `datums`` steps toward y-axis thus
Increment the data view ``datums``` steps toward y-axis thus
"following" the current time slot/step/bar.
'''
@ -762,7 +1060,7 @@ class ChartPlotWidget(pg.PlotWidget):
x_shift = viz.index_step() * datums
if datums >= 300:
log.warning('FUCKING FIX THE GLOBAL STEP BULLSHIT')
print("FUCKING FIX THE GLOBAL STEP BULLSHIT")
# breakpoint()
return

View File

@ -413,18 +413,9 @@ class Cursor(pg.GraphicsObject):
self,
item: pg.GraphicsObject,
) -> None:
assert getattr(
item,
'delete',
), f"{item} must define a ``.delete()``"
assert getattr(item, 'delete'), f"{item} must define a ``.delete()``"
self._hovered.add(item)
def is_hovered(
self,
item: pg.GraphicsObject,
) -> bool:
return item in self._hovered
def add_plot(
self,
plot: ChartPlotWidget, # noqa

View File

@ -45,7 +45,7 @@ from piker.ui.qt import QLineF
from ..data._sharedmem import (
ShmArray,
)
from ..data.flows import Flume
from ..data.feed import Flume
from ..data._formatters import (
IncrementalFormatter,
OHLCBarsFmtr, # Plain OHLC renderer

View File

@ -21,7 +21,6 @@ this module ties together quote and computational (fsp) streams with
graphics update methods via our custom ``pyqtgraph`` charting api.
'''
from functools import partial
import itertools
from math import floor
import time
@ -209,7 +208,6 @@ class DisplayState(Struct):
async def increment_history_view(
# min_istream: tractor.MsgStream,
ds: DisplayState,
loglevel: str = 'warning',
):
hist_chart: ChartPlotWidget = ds.hist_chart
hist_viz: Viz = ds.hist_viz
@ -231,10 +229,7 @@ async def increment_history_view(
hist_viz.reset_graphics()
# hist_viz.update_graphics(force_redraw=True)
async with open_sample_stream(
period_s=1.,
loglevel=loglevel,
) as min_istream:
async with open_sample_stream(1.) as min_istream:
async for msg in min_istream:
profiler = Profiler(
@ -315,6 +310,7 @@ async def increment_history_view(
async def graphics_update_loop(
dss: dict[str, DisplayState],
nurse: trio.Nursery,
godwidget: GodWidget,
@ -323,7 +319,6 @@ async def graphics_update_loop(
pis: dict[str, list[pgo.PlotItem, pgo.PlotItem]] = {},
vlm_charts: dict[str, ChartPlotWidget] = {},
loglevel: str = 'warning',
) -> None:
'''
@ -467,12 +462,9 @@ async def graphics_update_loop(
# })
nurse.start_soon(
partial(
increment_history_view,
# min_istream,
ds=ds,
loglevel=loglevel,
),
ds,
)
await trio.sleep(0)
@ -519,19 +511,14 @@ async def graphics_update_loop(
fast_chart.linked.isHidden()
or not rt_pi.isVisible()
):
log.debug(
f'{fqme} skipping update for HIDDEN CHART'
)
print(f'{fqme} skipping update for HIDDEN CHART')
fast_chart.pause_all_feeds()
continue
ic = fast_chart.view._in_interact
if ic:
fast_chart.pause_all_feeds()
log.debug(
f'Pausing chart updaates during interaction\n'
f'fqme: {fqme!r}'
)
print(f'{fqme} PAUSING DURING INTERACTION')
await ic.wait()
fast_chart.resume_all_feeds()
@ -1604,18 +1591,15 @@ async def display_symbol_data(
# start update loop task
dss: dict[str, DisplayState] = {}
ln.start_soon(
partial(
graphics_update_loop,
dss=dss,
nurse=ln,
godwidget=godwidget,
feed=feed,
dss,
ln,
godwidget,
feed,
# min_istream,
pis=pis,
vlm_charts=vlm_charts,
loglevel=loglevel,
)
pis,
vlm_charts,
)
# boot order-mode

View File

@ -55,11 +55,6 @@ from ._style import (
from ._lines import LevelLine
from ..log import get_logger
# TODO, rm the cycle here!
from ._widget import (
GodWidget,
)
if TYPE_CHECKING:
from ._chart import (
GodWidget,

View File

@ -56,7 +56,7 @@ from . import _style
if TYPE_CHECKING:
from ._widget import GodWidget
from ._chart import GodWidget
log = get_logger(__name__)

View File

@ -183,17 +183,13 @@ async def open_fsp_sidepane(
@acm
async def open_fsp_actor_cluster(
names: list[str] = [
'fsp_0',
'fsp_1',
],
names: list[str] = ['fsp_0', 'fsp_1'],
) -> AsyncGenerator[
int,
dict[str, tractor.Portal]
]:
# TODO! change to .experimental!
from tractor._clustering import open_actor_cluster
# profiler = Profiler(
@ -201,7 +197,7 @@ async def open_fsp_actor_cluster(
# disabled=False
# )
async with open_actor_cluster(
count=len(names),
count=2,
names=names,
modules=['piker.fsp._engine'],
@ -501,8 +497,7 @@ class FspAdmin:
portal: tractor.Portal = (
self.cluster.get(worker_name)
or
self.rr_next_portal()
or self.rr_next_portal()
)
# TODO: this should probably be turned into a

View File

@ -43,7 +43,6 @@ from pyqtgraph import (
functions as fn,
)
import numpy as np
import tractor
import trio
from piker.ui.qt import (
@ -73,10 +72,7 @@ if TYPE_CHECKING:
GodWidget,
)
from ._dataviz import Viz
from .order_mode import (
OrderMode,
Dialog,
)
from .order_mode import OrderMode
from ._display import DisplayState
@ -134,12 +130,7 @@ async def handle_viewmode_kb_inputs(
async for kbmsg in recv_chan:
event, etype, key, mods, text = kbmsg.to_tuple()
log.debug(
f'View-mode kb-msg received,\n'
f'mods: {mods!r}\n'
f'key: {key!r}\n'
f'text: {text!r}\n'
)
log.debug(f'key: {key}, mods: {mods}, text: {text}')
now = time.time()
period = now - last
@ -167,12 +158,8 @@ async def handle_viewmode_kb_inputs(
# have no previous keys or we do and the min_tap period is
# met
if (
not fast_key_seq
or (
period <= min_tap
and
fast_key_seq
)
not fast_key_seq or
period <= min_tap and fast_key_seq
):
fast_key_seq.append(text)
log.debug(f'fast keys seqs {fast_key_seq}')
@ -187,8 +174,7 @@ async def handle_viewmode_kb_inputs(
# UI REPL-shell, with ctrl-p (for "pause")
if (
ctrl
and
key in {
and key in {
Qt.Key_P,
}
):
@ -198,6 +184,7 @@ async def handle_viewmode_kb_inputs(
vlm_chart = chart.linked.subplots['volume'] # noqa
vlm_viz = vlm_chart.main_viz # noqa
dvlm_pi = vlm_chart._vizs['dolla_vlm'].plot # noqa
import tractor
await tractor.pause()
view.interact_graphics_cycle()
@ -205,8 +192,7 @@ async def handle_viewmode_kb_inputs(
# shown data `Viz`s for the current chart app.
if (
ctrl
and
key in {
and key in {
Qt.Key_R,
}
):
@ -245,8 +231,7 @@ async def handle_viewmode_kb_inputs(
key == Qt.Key_Escape
or (
ctrl
and
key == Qt.Key_C
and key == Qt.Key_C
)
):
# ctrl-c as cancel
@ -257,35 +242,17 @@ async def handle_viewmode_kb_inputs(
# cancel order or clear graphics
if (
key == Qt.Key_C
or
key == Qt.Key_Delete
or key == Qt.Key_Delete
):
# log.info('Handling <c> hotkey!')
try:
dialogs: list[Dialog] = order_mode.cancel_orders_under_cursor()
except BaseException:
log.exception('Failed to cancel orders !?\n')
await tractor.pause()
if not dialogs:
log.warning(
'No orders were cancelled?\n'
'Is there an order-line under the cursor?\n'
'If you think there IS your DE might be "hiding the mouse" before '
'we rx the keyboard input via Qt..\n'
'=> Check your DE and/or TWM settings to be sure! <=\n'
)
# ^TODO?, some way to detect if there's lines and
# the DE is cuckin with things?
# await tractor.pause()
order_mode.cancel_orders_under_cursor()
# View modes
if (
ctrl
and (
key == Qt.Key_Equal
or
key == Qt.Key_I
or key == Qt.Key_I
)
):
view.wheelEvent(
@ -297,8 +264,7 @@ async def handle_viewmode_kb_inputs(
ctrl
and (
key == Qt.Key_Minus
or
key == Qt.Key_O
or key == Qt.Key_O
)
):
view.wheelEvent(
@ -309,8 +275,7 @@ async def handle_viewmode_kb_inputs(
elif (
not ctrl
and
key == Qt.Key_R
and key == Qt.Key_R
):
# NOTE: seems that if we don't yield a Qt render
# cycle then the m4 downsampled curves will show here
@ -512,8 +477,7 @@ async def handle_viewmode_mouse(
# view.raiseContextMenu(event)
if (
view.order_mode.active
and
view.order_mode.active and
button == QtCore.Qt.LeftButton
):
# when in order mode, submit execution
@ -817,8 +781,7 @@ class ChartView(ViewBox):
# Scale or translate based on mouse button
if btn & (
QtCore.Qt.LeftButton
| QtCore.Qt.MidButton
QtCore.Qt.LeftButton | QtCore.Qt.MidButton
):
# zoom y-axis ONLY when click-n-drag on it
# if axis == 1:

View File

@ -52,13 +52,10 @@ from ._anchors import (
from ..calc import humanize
from ._label import Label
from ._style import hcolor, _font
from ..log import get_logger
if TYPE_CHECKING:
from ._cursor import Cursor
log = get_logger(__name__)
# TODO: probably worth investigating if we can
# make .boundingRect() faster:
@ -350,7 +347,7 @@ class LevelLine(pg.InfiniteLine):
) -> None:
# TODO: enter labels edit mode
log.debug(f'double click {ev}')
print(f'double click {ev}')
def paint(
self,
@ -464,19 +461,10 @@ class LevelLine(pg.InfiniteLine):
# hovered
if (
not ev.isExit()
and
ev.acceptDrags(QtCore.Qt.LeftButton)
and ev.acceptDrags(QtCore.Qt.LeftButton)
):
# if already hovered we don't need to run again
if (
self.mouseHovering is True
and
cur.is_hovered(self)
):
log.debug(
f'Already hovering ??\n'
f'cur._hovered: {cur._hovered!r}\n'
)
if self.mouseHovering is True:
return
if self.only_show_markers_on_hover:
@ -493,7 +481,6 @@ class LevelLine(pg.InfiniteLine):
cur._y_label_update = False
# add us to cursor state
log.debug(f'Adding line {self!r}\n')
cur.add_hovered(self)
if self._hide_xhair_on_hover:
@ -521,7 +508,6 @@ class LevelLine(pg.InfiniteLine):
self.currentPen = self.pen
log.debug(f'Removing line {self!r}\n')
cur._hovered.remove(self)
if self.only_show_markers_on_hover:

View File

@ -1,352 +0,0 @@
# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Affero General Public License for more details.
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see <https://www.gnu.org/licenses/>.
'''
Root-most (what they call a "central widget") of every Qt-UI-app's
window.
'''
from __future__ import annotations
from typing import (
Iterator,
TYPE_CHECKING,
)
import trio
from piker.ui.qt import (
QtCore,
Qt,
QWidget,
QHBoxLayout,
QVBoxLayout,
)
from ..log import get_logger
if TYPE_CHECKING:
from ._search import SearchWidget
from ._chart import (
LinkedSplits,
)
from ._cursor import (
Cursor,
)
log = get_logger(__name__)
_godw: GodWidget|None = None
def get_godw() -> GodWidget:
'''
Get the top level "god widget", the root/central-most Qt
widget-object set as `QMainWindow.setCentralWidget(_godw)`.
See `piker.ui._exec` for the runtime init details and all the
machinery for running `trio` on the Qt event loop in guest mode.
'''
if _godw is None:
raise RuntimeError(
'No god-widget initialized ??\n'
'Have you called `run_qtractor()` yet?\n'
)
return _godw
class GodWidget(QWidget):
'''
"Our lord and savior, the holy child of window-shua, there is no
widget above thee." - 6|6
The highest level composed widget which contains layouts for
organizing charts as well as other sub-widgets used to control or
modify them.
'''
search: SearchWidget
mode_name: str = 'god'
def __init__(
self,
parent=None,
) -> None:
super().__init__(parent)
self.search: SearchWidget|None = None
self.hbox = QHBoxLayout(self)
self.hbox.setContentsMargins(0, 0, 0, 0)
self.hbox.setSpacing(6)
self.hbox.setAlignment(Qt.AlignTop)
self.vbox = QVBoxLayout()
self.vbox.setContentsMargins(0, 0, 0, 0)
self.vbox.setSpacing(2)
self.vbox.setAlignment(Qt.AlignTop)
self.hbox.addLayout(self.vbox)
self._chart_cache: dict[
str,
tuple[LinkedSplits, LinkedSplits],
] = {}
self.hist_linked: LinkedSplits|None = None
self.rt_linked: LinkedSplits|None = None
self._active_cursor: Cursor|None = None
# assigned in the startup func `_async_main()`
self._root_n: trio.Nursery = None
self._widgets: dict[str, QWidget] = {}
self._resizing: bool = False
# TODO: do we need this, when would god get resized
# and the window does not? Never right?!
# self.reg_for_resize(self)
# TODO: strat loader/saver that we don't need yet.
# def init_strategy_ui(self):
# self.toolbar_layout = QHBoxLayout()
# self.toolbar_layout.setContentsMargins(0, 0, 0, 0)
# self.vbox.addLayout(self.toolbar_layout)
# self.strategy_box = StrategyBoxWidget(self)
# self.toolbar_layout.addWidget(self.strategy_box)
@property
def linkedsplits(self) -> LinkedSplits:
return self.rt_linked
def set_chart_symbols(
self,
group_key: tuple[str], # of form <fqme>.<providername>
all_linked: tuple[LinkedSplits, LinkedSplits], # type: ignore
) -> None:
# re-sort org cache symbol list in LIFO order
cache = self._chart_cache
cache.pop(group_key, None)
cache[group_key] = all_linked
def get_chart_symbols(
self,
symbol_key: str,
) -> tuple[LinkedSplits, LinkedSplits]: # type: ignore
return self._chart_cache.get(symbol_key)
async def load_symbols(
self,
fqmes: list[str],
loglevel: str,
reset: bool = False,
) -> trio.Event:
'''
Load a new contract into the charting app.
Expects a ``numpy`` structured array containing all the ohlcv fields.
'''
# NOTE: for now we use the first symbol in the set as the "key"
# for the overlay of feeds on the chart.
group_key: tuple[str] = tuple(fqmes)
all_linked = self.get_chart_symbols(group_key)
order_mode_started = trio.Event()
if not self.vbox.isEmpty():
# XXX: seems to make switching slower?
# qframe = self.hist_linked.chart.qframe
# if qframe.sidepane is self.search:
# qframe.hbox.removeWidget(self.search)
for linked in [self.rt_linked, self.hist_linked]:
# XXX: this is CRITICAL especially with pixel buffer caching
linked.hide()
linked.unfocus()
# XXX: pretty sure we don't need this
# remove any existing plots?
# XXX: ahh we might want to support cache unloading..
# self.vbox.removeWidget(linked)
# switching to a new viewable chart
if all_linked is None or reset:
from ._display import display_symbol_data
# we must load a fresh linked charts set
from ._chart import LinkedSplits
self.rt_linked = rt_charts = LinkedSplits(self)
self.hist_linked = hist_charts = LinkedSplits(self)
# spawn new task to start up and update new sub-chart instances
self._root_n.start_soon(
display_symbol_data,
self,
fqmes,
loglevel,
order_mode_started,
)
# self.vbox.addWidget(hist_charts)
self.vbox.addWidget(rt_charts)
self.set_chart_symbols(
group_key,
(hist_charts, rt_charts),
)
for linked in [hist_charts, rt_charts]:
linked.show()
linked.focus()
await trio.sleep(0)
else:
# symbol is already loaded and ems ready
order_mode_started.set()
self.hist_linked, self.rt_linked = all_linked
for linked in all_linked:
# TODO:
# - we'll probably want per-instrument/provider state here?
# change the order config form over to the new chart
# chart is already in memory so just focus it
linked.show()
linked.focus()
linked.graphics_cycle()
await trio.sleep(0)
# resume feeds *after* rendering chart view asap
chart = linked.chart
if chart:
chart.resume_all_feeds()
# TODO: we need a check to see if the chart
# last had the xlast in view, if so then shift so it's
# still in view, if the user was viewing history then
# do nothing yah?
self.rt_linked.chart.main_viz.default_view(
do_min_bars=True,
)
# if a history chart instance is already up then
# set the search widget as its sidepane.
hist_chart = self.hist_linked.chart
if hist_chart:
hist_chart.qframe.set_sidepane(self.search)
# NOTE: this is really stupid/hard to follow.
# we have to reposition the active position nav
# **AFTER** applying the search bar as a sidepane
# to the newly switched to symbol.
await trio.sleep(0)
# TODO: probably stick this in some kinda `LooknFeel` API?
for tracker in self.rt_linked.mode.trackers.values():
pp_nav = tracker.nav
if tracker.live_pp.cumsize:
pp_nav.show()
pp_nav.hide_info()
else:
pp_nav.hide()
# set window titlebar info
symbol = self.rt_linked.mkt
if symbol is not None:
self.window.setWindowTitle(
f'{symbol.fqme} '
f'tick:{symbol.size_tick}'
)
return order_mode_started
def focus(self) -> None:
'''
Focus the top level widget which in turn focusses the chart
ala "view mode".
'''
# go back to view-mode focus (aka chart focus)
self.clearFocus()
chart = self.rt_linked.chart
if chart:
chart.setFocus()
def reg_for_resize(
self,
widget: QWidget,
) -> None:
getattr(widget, 'on_resize')
self._widgets[widget.mode_name] = widget
def on_win_resize(
self,
event: QtCore.QEvent,
) -> None:
'''
Top level god widget handler from window (the real yaweh) resize
events such that any registered widgets which wish to be
notified are invoked using our pythonic `.on_resize()` method
api.
Where we do UX magic to make things not suck B)
'''
if self._resizing:
return
self._resizing = True
log.debug(
f'God widget resize\n'
f'{event}\n'
)
for name, widget in self._widgets.items():
widget.on_resize()
self._resizing = False
# on_resize = on_win_resize
def get_cursor(self) -> Cursor:
return self._active_cursor
def iter_linked(self) -> Iterator[LinkedSplits]:
for linked in [self.hist_linked, self.rt_linked]:
yield linked
def resize_all(self) -> None:
'''
Dynamic resize sequence: adjusts all sub-widgets/charts to
sensible default ratios of what space is detected as available
on the display / window.
'''
rt_linked = self.rt_linked
rt_linked.set_split_sizes()
self.rt_linked.resize_sidepanes()
self.hist_linked.resize_sidepanes(from_linked=rt_linked)
self.search.on_resize()

View File

@ -40,7 +40,7 @@ from piker.ui.qt import (
)
from ..log import get_logger
from ._style import _font_small, hcolor
from ._widget import GodWidget
from ._chart import GodWidget
log = get_logger(__name__)
@ -255,16 +255,8 @@ class MainWindow(QMainWindow):
current: QWidget,
) -> None:
'''
Focus handler.
For now updates the "current mode" name.
'''
log.debug(
f'widget focus changed from,\n'
f'{last} -> {current}'
)
log.info(f'widget focus changed from {last} -> {current}')
if current is not None:
# cursor left window?

View File

@ -177,7 +177,7 @@ def chart(
return
# global opts
# brokernames: list[str] = config['brokers']
brokernames = config['brokers']
brokermods = config['brokermods']
assert brokermods
tractorloglevel = config['tractorloglevel']
@ -216,7 +216,6 @@ def chart(
layers['tcp']['port'],
))
# breakpoint()
from tractor.devx import maybe_open_crash_handler
pdb: bool = config['pdb']
with maybe_open_crash_handler(pdb=pdb):

View File

@ -77,6 +77,7 @@ from ._style import _font
from ._forms import open_form_input_handling
from ._notify import notify_from_ems_status_msg
if TYPE_CHECKING:
from ._chart import (
ChartPlotWidget,
@ -435,7 +436,7 @@ class OrderMode:
lines=lines,
last_status_close=self.multistatus.open_status(
f'submitting {order.exec_mode}-{order.action}',
# final_msg=f'submitted {order.exec_mode}-{order.action}',
final_msg=f'submitted {order.exec_mode}-{order.action}',
clear_on_next=True,
)
)
@ -519,8 +520,7 @@ class OrderMode:
'''
Order submitted status event handler.
Commit the order line and registered order uuid, store ack
time stamp.
Commit the order line and registered order uuid, store ack time stamp.
'''
lines = self.lines.commit_line(uuid)
@ -658,7 +658,7 @@ class OrderMode:
return True
def cancel_orders_under_cursor(self) -> list[Dialog]:
def cancel_orders_under_cursor(self) -> list[str]:
return self.cancel_orders(
self.oids_from_lines(
self.lines.lines_under_cursor()
@ -687,28 +687,24 @@ class OrderMode:
self,
oids: list[str],
) -> list[Dialog]:
) -> None:
'''
Cancel all orders from a list of order ids: `oids`.
'''
# key = self.multistatus.open_status(
# f'cancelling {len(oids)} orders',
# final_msg=f'cancelled orders:\n{oids}',
# group_key=True
# )
dialogs: list[Dialog] = []
key = self.multistatus.open_status(
f'cancelling {len(oids)} orders',
final_msg=f'cancelled orders:\n{oids}',
group_key=True
)
for oid in oids:
if dialog := self.dialogs.get(oid):
self.client.cancel_nowait(uuid=oid)
# cancel_status_close = self.multistatus.open_status(
# f'cancelling order {oid}',
# group_key=key,
# )
# dialog.last_status_close = cancel_status_close
dialogs.append(dialog)
return dialogs
cancel_status_close = self.multistatus.open_status(
f'cancelling order {oid}',
group_key=key,
)
dialog.last_status_close = cancel_status_close
def cancel_all_orders(self) -> None:
'''
@ -780,6 +776,7 @@ class OrderMode:
@asynccontextmanager
async def open_order_mode(
feed: Feed,
godw: GodWidget,
fqme: str,

14
uv.lock
View File

@ -1243,11 +1243,11 @@ uis = [
[[package]]
name = "platformdirs"
version = "4.6.0"
version = "4.5.1"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/20/e5/474d0a8508029286b905622e6929470fb84337cfa08f9d09fbb624515249/platformdirs-4.6.0.tar.gz", hash = "sha256:4a13c2db1071e5846c3b3e04e5b095c0de36b2a24be9a3bc0145ca66fce4e328", size = 23433, upload-time = "2026-02-12T14:36:21.288Z" }
sdist = { url = "https://files.pythonhosted.org/packages/cf/86/0248f086a84f01b37aaec0fa567b397df1a119f73c16f6c7a9aac73ea309/platformdirs-4.5.1.tar.gz", hash = "sha256:61d5cdcc6065745cdd94f0f878977f8de9437be93de97c1c12f853c9c0cdcbda", size = 21715, upload-time = "2025-12-05T13:52:58.638Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/da/10/1b0dcf51427326f70e50d98df21b18c228117a743a1fc515a42f8dc7d342/platformdirs-4.6.0-py3-none-any.whl", hash = "sha256:dd7f808d828e1764a22ebff09e60f175ee3c41876606a6132a688d809c7c9c73", size = 19549, upload-time = "2026-02-12T14:36:19.743Z" },
{ url = "https://files.pythonhosted.org/packages/cb/28/3bfe2fa5a7b9c46fe7e13c97bda14c895fb10fa2ebf1d0abb90e0cea7ee1/platformdirs-4.5.1-py3-none-any.whl", hash = "sha256:d03afa3963c806a9bed9d5125c8f4cb2fdaf74a55ab60e5d59b3fde758104d31", size = 18731, upload-time = "2025-12-05T13:52:56.823Z" },
]
[[package]]
@ -2006,7 +2006,7 @@ dev = [
{ name = "pytest", specifier = ">=8.3.5" },
{ name = "stackscope", specifier = ">=0.2.2,<0.3" },
{ name = "typing-extensions", specifier = ">=4.14.1" },
{ name = "xonsh", specifier = ">=0.22.2" },
{ name = "xonsh", specifier = ">=0.19.2" },
]
devx = [
{ name = "greenback", specifier = ">=1.2.1,<2" },
@ -2018,7 +2018,7 @@ repl = [
{ name = "prompt-toolkit", specifier = ">=3.0.50" },
{ name = "psutil", specifier = ">=7.0.0" },
{ name = "pyperclip", specifier = ">=1.9.0" },
{ name = "xonsh", specifier = ">=0.22.2" },
{ name = "xonsh", specifier = ">=0.19.2" },
]
testing = [
{ name = "pexpect", specifier = ">=4.9.0,<5" },
@ -2264,8 +2264,8 @@ wheels = [
[[package]]
name = "xonsh"
version = "0.22.3"
source = { git = "https://github.com/xonsh/xonsh.git?branch=main#b446946fd94c3913e002318db1d1b41ee4fa1f9a" }
version = "0.22.1"
source = { git = "https://github.com/xonsh/xonsh.git?branch=main#336658ff0919f8d7bb96d581136d37d470a8fe99" }
[[package]]
name = "yapic-json"