Compare commits

...

22 Commits

Author SHA1 Message Date
Gud Boi deaf7dd1ab Use `platformdirs` for `.config.get_app_dir()`
Replace hand-rolled `click`-based platform branching using
the much saner `platformdirs.user_config_path()`.

Deats,
- remove Windows/macOS/Unix `if/elif` platform dispatch
  (~25 lines) in favour of single `user_config_path()` call.
- move `_posixify()` inside `force_posix` branch since it's
  only used there.
- add `log.info()` reporting platform name and resolved dirs.

Also,
- drop now unneeded `sys` import.
- reformat `assert` in `repodir()` to multiline style.
- convert docstring from `r"""..."""` to `'''...'''` style.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-10 17:43:42 -04:00
Gud Boi 702aae2544 Drop bp from duration mismatch branch in `.ib.api.Client.bars()` 2026-03-10 17:09:49 -04:00
Gud Boi 44d54babeb Handle VNC reset-dialog in `vnc_click_hack()`
Add TAB + ENTER key presses after the `Ctrl+Alt+<key>` hotkey
combo to auto-confirm the "simulate a reset?" dialog that IB
gateway sometimes shows.

Deats,
- press `ISO_Enter` before click to dismiss any prior active
  dialog window.
- add post-hotkey loop sending `Tab` then `KP_Enter` with
  `asyncio.sleep()` delays to handle the confirmation dialog.
- add `asyncio` import.

Also,
- capture VNC connect error as `vnc_err` and log it instead of
  falling through to `try_xdo_manual()`.
- comment-out `try_xdo_manual()` fallback in VNC error path.
- reformat `client.press()` call to multiline style.
- reformat `RuntimeError` raise to multiline style with `!r`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-10 17:06:46 -04:00
Gud Boi b5a33e1217 Exclude crypto futes from `without_src` sym key
Extend the `col_sym_key` asset-type check in `start_backfill()`
to also exclude crypto-denominated futures (where `src` is
`'crypto_currency'` and `dst` is `'future'`) from the
`without_src=True` fqme path.

Also in `.brokers.binance` backend (it being the guilty culprit in the
discovery of this bug; and why i touched styling this code),

- reformat `make_sub()` fn sig to multiline style in
  `.binance.feed`.
- add backtick around `dict` in `make_sub()` docstring.
- reformat `or` conditionals to multiline style in
  `.binance.feed.get_mkt_info()`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 22:13:06 -05:00
Gud Boi 796a831c6e Enable console log (from passed down `loglevel`) in `.tsp._history.manage_history()` 2026-03-06 09:26:34 -05:00
Gud Boi de81d1e905 Drop `Flume.feed`, it's unused yet causes import cycles.. 2026-03-06 09:26:34 -05:00
Gud Boi 170dd9794c Just warn on single-bar nulls instead of bping
Replace the debug breakpoint with a warning-log when a single-bar
null-segment is detected in `get_null_segs()`. This lets the gap
analysis continue while still alerting about the anomaly.

Deats,
- extract the 3-bar window (before, null, after) and calculate
  a `gap: pendulum.Interval` for the warning msg.
- comment-out the old breakpoint block for optional debugging as needed.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:26:34 -05:00
Gud Boi 599c36aba6 Lul, drop long unused poetry lock file 2026-03-06 09:26:34 -05:00
Gud Boi f174f79a1a Pin `pg` at latest official `0.14.0` release
Keep in masked GH sources lines for easy hackin against upstream
`master` branch when needed as well!
2026-03-06 09:26:34 -05:00
Gud Boi 9b284c2256 .ui._editors: log multiline styling and re-leveling 2026-03-06 09:26:34 -05:00
Gud Boi 59f2d46a97 .ui._lines: drop unused graphics-item import 2026-03-06 09:26:34 -05:00
Gud Boi c1b1e99693 Add batch-submit API for gap annotations
Introduce `AnnotCtl.add_batch()` and `serve_rc_annots()` batch
handler to submit 1000s of gaps in single IPC msg instead of
per-annot round-trips. Server builds `GapAnnotations` from specs
and handles vectorized timestamp-to-index lookups.

Deats,
- add `'cmd': 'batch'` handler in `serve_rc_annots()`
- vectorized timestamp lookup via `np.searchsorted()` + masking
- build `gap_specs: list[dict]` from rect+arrow specs client-side
- create single `GapAnnotations` item for all gaps server-side
- handle `GapAnnotations.reposition()` in redraw handler
- add profiling to batch path for perf measurement
- support optional individual arrows for A/B comparison

Also,
- refactor `markup_gaps()` to collect specs + single batch call
- add `no_qt_updates()` context mgr for batch render ops
- add profiling to annotation teardown path
- add `GapAnnotations` case to `rm_annot()` match block

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:26:34 -05:00
Gud Boi 24651d2326 Add a `GapAnnotations` path-renderer
For a ~1000x perf gain says ol' claudy, our boi who wrote this entire
patch! Bo

Introduce `GapAnnotations` in `.ui._annotate` for batch-rendering gap
rects/arrows instead of individual `QGraphicsItem` instances. Uses
upstream's `pyqtgraph.Qt.internals.PrimitiveArray` for rects and
a `QPainterPath` for arrows. This API-replicates our prior annotator's
in view shape-graphics but now using (what we're dubbing)
"single-array-multiple-graphics" tech much like our `.ui._curve`
extensions to `pg` B)

Impl deats,
- batch draw ~1000 gaps in single paint call vs 1000 items
- arrows render in scene coords to maintain pixel size on zoom
- add vectorized timestamp-to-index lookup for repositioning
- cache bounding rect, rebuild on `reposition()` calls
- match `SelectRect` + `ArrowItem` visual style/colors
- skip reposition when timeframe doesn't match gap's period

Other,
- fix typo in `LevelMarker` docstring: "graphich" -> "graphic"
- reflow docstring in `qgo_draw_markers()` to 67 char limit

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:26:34 -05:00
Gud Boi 2d00bb1024 Add info log for shm processing in `ldshm` CLI cmd
Log shm file name and detected period before null segment
processing to aid debugging.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:26:34 -05:00
Gud Boi dd40ad603f Bump to latest official `pyqtgraph` release 2026-03-06 09:26:34 -05:00
Gud Boi f2ace1b63b Use `ppfmt()` in `order_mode` since it's provided by `tractor` now 2026-03-06 09:22:36 -05:00
Gud Boi 9010f9c7ab Augment `.ib.symbols` search with more logging
Refactor `open_symbol_search()` to use `partial()` for nursery task
spawning and add detailed query->results logging via `ppfmt()`.

Deats,
- change `extend_results()` to accept `target` callable +
  `pattern` + `**kwargs` and invoke inside, instead of receiving
  a pre-called awaitable; use `partial()` to pass args.
- add `ppfmt()` formatted logging of search query params and
  results including client class + method repr.
- change `print()` -> `log.exception()` for `Lagged` overrun.
- bump `upto=5` -> `upto=10` for `search_symbols()` call.

Also for styling,
- add type some missing type annots.
- add multiline style to `or` conditionals in pattern check.
- reformat log msgs to multiline style throughout.
- use `ppfmt()` for fuzzy match debug log.
- rename nursery `sn` -> `tn`.
- add TODO comment about `assert 0` hang.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
Gud Boi 89a145113c Handle `str`-errors in `.ib.broker` trade events
Add `isinstance()` dispatch for the `'error'` event case in
`deliver_trade_events()` to handle `ib_async` sometimes emitting plain
`str` error items instead of the previously expected `dict`.

Deats,
- add `isinstance(err, dict)` branch for the standard case with
  `error_code`, `reason`, and `reqid` fields.
- add `isinstance(err, str)` branch to parse error strings of the
  form `'[code 104] connection failed'` into `code` and `reason`.
- set `reqid: str = '<unknown>'` for string-form errors since
  there's no request ID available.
- update `err` type annot to `dict|str`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
Gud Boi ec4db30cdc Handle valid null frames and 0-bar cases in backfill
Add guards for empty-array and zero-bar-diff cases in the TSP backfill
loops to avoid crashes and allow graceful loop termination.

In `maybe_fill_null_segments()`,
- add `array.size == 0` guard in `maybe_fill_null_segments()` to detect
  valid (venue closure) gaps from the backend; add a warning + bp
  + break for this case.
- add TODO that we should likely be filling nulls with the close price
  for the gap's duration.

In `start_backfill()`,
- expand the "0 bars after diff" warning msg with
  `backfill_until_dt` and `end_dt_param` context.
- mask the  `await tractor.pause()` and add a `break` to avoid blocking
  the backfill loop.

(this commit msg was generated in some part by
[`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
Gud Boi 2a394dba03 Warn instead of raise on `start_dt`-trimmed frames
Downgrade the `start_dt`-trimming check in `open_history_client()`
from a `RuntimeError` raise to a warning log, allowing the caller
to still receive a (shorter) frame of bars (though we may need to still
specially handle such cases in the backfiller's biz logic layer).

Deats,
- add `trimmed_bars.size` guard to skip check on empty results.
- change condition to `>=` and log a warning with the short-frame
  size instead of raising.
- comment-out `raise RuntimeError` and breakpoint for future
  removal once confident.
- add docstring-style comment on `start_dt=` kwarg noting that
  `Client.bars()` doesn't truly support it (uses duration-style
  queries internally).

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
Gud Boi 19f16e1df3 Handle ambiguous futes contracts in `get_fute()`
Use (the only available in `ib_async`) `returnAll=True` in
`qualifyContractsAsync()` calls within `get_fute()` and handle the case
where IB returns a list of ambiguous contract matches instead of
a single result.

Deats,
- add `returnAll=True` to both `ContFuture` and `Future`
  qualification calls.
- add `isinstance(con, list)` check after unpacking first result
  to detect ambiguous contract sets.
- log warning with input params and matched contracts when
  ambiguous.
- update return type annot to `Contract|list[Contract]`.

Also,
- handle list-of-contracts case in `find_contracts()` by unpacking
  `*contracts` into the `qualifyContractsAsync()` call.
- reformat `qualifyContractsAsync()` calls to multiline style.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
Gud Boi 3adb0d8b9d Fall back to `Contract.exchange` in `has_holiday()`
Use `con.exchange` as fallback when `con.primaryExchange` is empty
in `has_holiday()` to handle contracts like futures that don't
always set a `primaryExchange`.

Deats,
- extract `con: Contract` from `con_deats.contract` for reuse.
- use `con.primaryExchange or con.exchange` to ensure a valid
  exchange code is always passed to the calendar lookup.
- add `Contract` to `TYPE_CHECKING` imports.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
22 changed files with 1341 additions and 1469 deletions

View File

@ -203,9 +203,13 @@ async def stream_messages(
yield 'trade', piker_quote yield 'trade', piker_quote
def make_sub(pairs: list[str], sub_name: str, uid: int) -> dict[str, str]: def make_sub(
pairs: list[str],
sub_name: str,
uid: int,
) -> dict[str, str]:
''' '''
Create a request subscription packet dict. Create a request subscription packet `dict`.
- spot: - spot:
https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams https://binance-docs.github.io/apidocs/spot/en/#live-subscribing-unsubscribing-to-streams
@ -332,7 +336,8 @@ async def get_mkt_info(
# TODO: handle coinm futes which have a margin asset that # TODO: handle coinm futes which have a margin asset that
# is some crypto token! # is some crypto token!
# https://binance-docs.github.io/apidocs/delivery/en/#exchange-information # https://binance-docs.github.io/apidocs/delivery/en/#exchange-information
or 'btc' in venue_lower or
'btc' in venue_lower
): ):
return None return None
@ -343,12 +348,14 @@ async def get_mkt_info(
if ( if (
venue venue
and 'spot' not in venue_lower and
'spot' not in venue_lower
# XXX: catch all in case user doesn't know which # XXX: catch all in case user doesn't know which
# venue they want (usdtm vs. coinm) and we can choose # venue they want (usdtm vs. coinm) and we can choose
# a default (via config?) once we support coin-m APIs. # a default (via config?) once we support coin-m APIs.
or 'perp' in venue_lower or
'perp' in venue_lower
): ):
if not mkt_mode: if not mkt_mode:
mkt_mode: str = f'{venue_lower}_futes' mkt_mode: str = f'{venue_lower}_futes'

View File

@ -20,6 +20,7 @@ runnable script-programs.
''' '''
from __future__ import annotations from __future__ import annotations
import asyncio
from datetime import ( # noqa from datetime import ( # noqa
datetime, datetime,
date, date,
@ -140,7 +141,8 @@ async def data_reset_hack(
except ( except (
OSError, # no VNC server avail.. OSError, # no VNC server avail..
PermissionError, # asyncvnc pw fail.. PermissionError, # asyncvnc pw fail..
): ) as _vnc_err:
vnc_err = _vnc_err
try: try:
import i3ipc # noqa (since a deps dynamic check) import i3ipc # noqa (since a deps dynamic check)
except ModuleNotFoundError: except ModuleNotFoundError:
@ -166,14 +168,22 @@ async def data_reset_hack(
# localhost but no vnc-client or it borked.. # localhost but no vnc-client or it borked..
else: else:
try_xdo_manual(client) log.error(
'VNC CLICK HACK FAILE with,\n'
f'{vnc_err!r}\n'
)
# breakpoint()
# try_xdo_manual(client)
case 'i3ipc_xdotool': case 'i3ipc_xdotool':
try_xdo_manual(client) try_xdo_manual(client)
# i3ipc_xdotool_manual_click_hack() # i3ipc_xdotool_manual_click_hack()
case _ as tech: case _ as tech:
raise RuntimeError(f'{tech} is not supported for reset tech!?') raise RuntimeError(
f'{tech!r} is not supported for reset tech!?'
)
# we don't really need the ``xdotool`` approach any more B) # we don't really need the ``xdotool`` approach any more B)
return True return True
@ -269,10 +279,35 @@ async def vnc_click_hack(
500, 500,
) )
) )
# in case a prior dialog win is open/active.
await client.press('ISO_Enter')
# ensure the ib-gw window is active # ensure the ib-gw window is active
await client.click(MOUSE_BUTTON_LEFT) await client.click(MOUSE_BUTTON_LEFT)
# send the hotkeys combo B) # send the hotkeys combo B)
await client.press('Ctrl', 'Alt', key) # keys are stacked await client.press(
'Ctrl',
'Alt',
key,
) # NOTE, keys are stacked
# XXX, sometimes a dialog asking if you want to "simulate
# a reset" will show, in which case we want to select
# "Yes" (by tabbing) and then hit enter.
iters: int = 1
delay: float = 0.3
await asyncio.sleep(delay)
for i in range(iters):
log.info(f'Sending TAB {i}')
await client.press('Tab')
await asyncio.sleep(delay)
for i in range(iters):
log.info(f'Sending ENTER {i}')
await client.press('KP_Enter')
await asyncio.sleep(delay)
def i3ipc_fin_wins_titled( def i3ipc_fin_wins_titled(

View File

@ -561,7 +561,7 @@ class Client:
# f'Recursing for more bars:\n' # f'Recursing for more bars:\n'
) )
# XXX, debug! # XXX, debug!
breakpoint() # breakpoint()
# XXX ? TODO? recursively try to re-request? # XXX ? TODO? recursively try to re-request?
# => i think *NO* right? # => i think *NO* right?
# #
@ -768,26 +768,48 @@ class Client:
expiry: str = '', expiry: str = '',
front: bool = False, front: bool = False,
) -> Contract: ) -> Contract|list[Contract]:
''' '''
Get an unqualifed contract for the current "continous" Get an unqualifed contract for the current "continous"
future. future.
When input params result in a so called "ambiguous contract"
situation, we return the list of all matches provided by,
`IB.qualifyContractsAsync(..., returnAll=True)`
''' '''
# it's the "front" contract returned here # it's the "front" contract returned here
if front: if front:
con = (await self.ib.qualifyContractsAsync( cons = (
ContFuture(symbol, exchange=exchange) await self.ib.qualifyContractsAsync(
))[0] ContFuture(symbol, exchange=exchange),
else: returnAll=True,
cons = (await self.ib.qualifyContractsAsync(
Future(
symbol,
exchange=exchange,
lastTradeDateOrContractMonth=expiry,
) )
)) )
con = cons[0] else:
cons = (
await self.ib.qualifyContractsAsync(
Future(
symbol,
exchange=exchange,
lastTradeDateOrContractMonth=expiry,
),
returnAll=True,
)
)
con = cons[0]
if isinstance(con, list):
log.warning(
f'{len(con)!r} futes cons matched for input params,\n'
f'symbol={symbol!r}\n'
f'exchange={exchange!r}\n'
f'expiry={expiry!r}\n'
f'\n'
f'cons:\n'
f'{con!r}\n'
)
return con return con
@ -912,11 +934,17 @@ class Client:
) )
exch = 'SMART' if not exch else exch exch = 'SMART' if not exch else exch
contracts: list[Contract] = [con] if isinstance(con, list):
contracts: list[Contract] = con
else:
contracts: list[Contract] = [con]
if qualify: if qualify:
try: try:
contracts: list[Contract] = ( contracts: list[Contract] = (
await self.ib.qualifyContractsAsync(con) await self.ib.qualifyContractsAsync(
*contracts
)
) )
except RequestError as err: except RequestError as err:
msg = err.message msg = err.message

View File

@ -1291,13 +1291,23 @@ async def deliver_trade_events(
case 'error': case 'error':
# NOTE: see impl deats in # NOTE: see impl deats in
# `Client.inline_errors()::push_err()` # `Client.inline_errors()::push_err()`
err: dict = item err: dict|str = item
# never relay errors for non-broker related issues # std case, never relay errors for non-order-control
# related issues.
# https://interactivebrokers.github.io/tws-api/message_codes.html # https://interactivebrokers.github.io/tws-api/message_codes.html
code: int = err['error_code'] if isinstance(err, dict):
reason: str = err['reason'] code: int = err['error_code']
reqid: str = str(err['reqid']) reason: str = err['reason']
reqid: str = str(err['reqid'])
# XXX, sometimes you'll get just a `str` of the form,
# '[code 104] connection failed' or something..
elif isinstance(err, str):
code_part, _, reason = err.rpartition(']')
if code_part:
_, _, code = code_part.partition('[code')
reqid: str = '<unknown>'
# "Warning:" msg codes, # "Warning:" msg codes,
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes # https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes

View File

@ -201,6 +201,15 @@ async def open_history_client(
fqme, fqme,
timeframe, timeframe,
end_dt=end_dt, end_dt=end_dt,
# XXX WARNING, we don't actually use this inside
# `Client.bars()` since it isn't really supported,
# the API instead supports a "duration" of time style
# from the `end_dt` (or at least that was the best
# way to get it working sanely)..
#
# SO, with that in mind be aware that any downstream
# logic based on this may be mostly futile Xp
start_dt=start_dt, start_dt=start_dt,
) )
latency = time.time() - query_start latency = time.time() - query_start
@ -278,19 +287,27 @@ async def open_history_client(
trimmed_bars = bars_array[ trimmed_bars = bars_array[
bars_array['time'] >= start_dt.timestamp() bars_array['time'] >= start_dt.timestamp()
] ]
if ( # XXX, should NEVER get HERE!
trimmed_first_dt := from_timestamp(trimmed_bars['time'][0]) if trimmed_bars.size:
!= trimmed_first_dt: datetime = from_timestamp(trimmed_bars['time'][0])
start_dt if (
): trimmed_first_dt
# TODO! rm this once we're more confident it never hits! >=
# breakpoint() start_dt
raise RuntimeError( ):
f'OHLC-bars array start is gt `start_dt` limit !!\n' msg: str = (
f'start_dt: {start_dt}\n' f'OHLC-bars array start is gt `start_dt` limit !!\n'
f'first_dt: {first_dt}\n' f'start_dt: {start_dt}\n'
f'trimmed_first_dt: {trimmed_first_dt}\n' f'first_dt: {first_dt}\n'
) f'trimmed_first_dt: {trimmed_first_dt}\n'
f'\n'
f'Delivering shorted frame of {trimmed_bars.size!r}\n'
)
log.warning(msg)
# TODO! rm this once we're more confident it
# never breaks anything (in the caller)!
# breakpoint()
# raise RuntimeError(msg)
# XXX, overwrite with start_dt-limited frame # XXX, overwrite with start_dt-limited frame
bars_array = trimmed_bars bars_array = trimmed_bars

View File

@ -23,6 +23,7 @@ from contextlib import (
nullcontext, nullcontext,
) )
from decimal import Decimal from decimal import Decimal
from functools import partial
import time import time
from typing import ( from typing import (
Awaitable, Awaitable,
@ -32,6 +33,7 @@ from typing import (
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import ib_async as ibis import ib_async as ibis
import tractor import tractor
from tractor.devx.pformat import ppfmt
import trio import trio
from piker.accounting import ( from piker.accounting import (
@ -215,18 +217,19 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
f'{ib_client}\n' f'{ib_client}\n'
) )
last = time.time() last: float = time.time()
async for pattern in stream: async for pattern in stream:
log.info(f'received {pattern}') log.info(f'received {pattern}')
now: float = time.time() now: float = time.time()
# TODO? check this is no longer true?
# this causes tractor hang... # this causes tractor hang...
# assert 0 # assert 0
assert pattern, 'IB can not accept blank search pattern' assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz # throttle search requests to no faster then 1Hz
diff = now - last diff: float = now - last
if diff < 1.0: if diff < 1.0:
log.debug('throttle sleeping') log.debug('throttle sleeping')
await trio.sleep(diff) await trio.sleep(diff)
@ -237,11 +240,12 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
if ( if (
not pattern not pattern
or pattern.isspace() or
pattern.isspace()
or
# XXX: not sure if this is a bad assumption but it # XXX: not sure if this is a bad assumption but it
# seems to make search snappier? # seems to make search snappier?
or len(pattern) < 1 len(pattern) < 1
): ):
log.warning('empty pattern received, skipping..') log.warning('empty pattern received, skipping..')
@ -254,36 +258,58 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# XXX: this unblocks the far end search task which may # XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block # hold up a multi-search nursery block
await stream.send({}) await stream.send({})
continue continue
log.info(f'searching for {pattern}') log.info(
f'Searching for FQME with,\n'
f'pattern: {pattern!r}\n'
)
last = time.time() last: float = time.time()
# async batch search using api stocks endpoint and module # async batch search using api stocks endpoint and
# defined adhoc symbol set. # module defined adhoc symbol set.
stock_results = [] stock_results: list[dict] = []
async def extend_results( async def extend_results(
target: Awaitable[list] # ?TODO, how to type async-fn!?
target: Awaitable[list],
pattern: str,
**kwargs,
) -> None: ) -> None:
try: try:
results = await target results = await target(
pattern=pattern,
**kwargs,
)
client_repr: str = proxy._aio_ns.ib.client.__class__.__name__
meth_repr: str = target.keywords["meth"]
log.info(
f'Search query,\n'
f'{client_repr}.{meth_repr}(\n'
f' pattern={pattern!r}\n'
f' **kwargs={kwargs!r},\n'
f') = {ppfmt(list(results))}'
# XXX ^ just the keys since that's what
# shows in UI results table.
)
except tractor.trionics.Lagged: except tractor.trionics.Lagged:
print("IB SYM-SEARCH OVERRUN?!?") log.exception(
'IB SYM-SEARCH OVERRUN?!?\n'
)
return return
stock_results.extend(results) stock_results.extend(results)
for _ in range(10): for _ in range(10):
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery() as sn: async with trio.open_nursery() as tn:
sn.start_soon( tn.start_soon(
extend_results, partial(
proxy.search_symbols( extend_results,
pattern=pattern, pattern=pattern,
upto=5, target=proxy.search_symbols,
upto=10,
), ),
) )
@ -313,7 +339,9 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# adhoc_match_results = {i[0]: {} for i in # adhoc_match_results = {i[0]: {} for i in
# adhoc_matches} # adhoc_matches}
log.debug(f'fuzzy matching stocks {stock_results}') log.debug(
f'fuzzy matching stocks {ppfmt(stock_results)}'
)
stock_matches = fuzzy.extract( stock_matches = fuzzy.extract(
pattern, pattern,
stock_results, stock_results,
@ -327,7 +355,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# TODO: we used to deliver contract details # TODO: we used to deliver contract details
# {item[2]: item[0] for item in stock_matches} # {item[2]: item[0] for item in stock_matches}
log.debug(f"sending matches: {matches.keys()}") log.debug(
f'Sending final matches\n'
f'{matches.keys()}'
)
await stream.send(matches) await stream.send(matches)

View File

@ -43,6 +43,7 @@ from pendulum import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ib_async import ( from ib_async import (
TradingSession, TradingSession,
Contract,
ContractDetails, ContractDetails,
) )
from exchange_calendars.exchange_calendars import ( from exchange_calendars.exchange_calendars import (
@ -82,7 +83,12 @@ def has_holiday(
''' '''
tz: str = con_deats.timeZoneId tz: str = con_deats.timeZoneId
exch: str = con_deats.contract.primaryExchange con: Contract = con_deats.contract
exch: str = (
con.primaryExchange
or
con.exchange
)
# XXX, ad-hoc handle any IB exchange which are non-std # XXX, ad-hoc handle any IB exchange which are non-std
# via lookup table.. # via lookup table..

View File

@ -19,7 +19,6 @@ Platform configuration (files) mgmt.
""" """
import platform import platform
import sys
import os import os
import shutil import shutil
from typing import ( from typing import (
@ -29,6 +28,7 @@ from typing import (
from pathlib import Path from pathlib import Path
from bidict import bidict from bidict import bidict
import platformdirs
import tomlkit import tomlkit
try: try:
import tomllib import tomllib
@ -41,7 +41,7 @@ from .log import get_logger
log = get_logger('broker-config') log = get_logger('broker-config')
# XXX NOTE: taken from `click` # XXX NOTE: orig impl was taken from `click`
# |_https://github.com/pallets/click/blob/main/src/click/utils.py#L449 # |_https://github.com/pallets/click/blob/main/src/click/utils.py#L449
# #
# (since apparently they have some super weirdness with SIGINT and # (since apparently they have some super weirdness with SIGINT and
@ -54,44 +54,21 @@ def get_app_dir(
force_posix: bool = False, force_posix: bool = False,
) -> str: ) -> str:
r"""Returns the config folder for the application. The default behavior '''
Returns the config folder for the application. The default behavior
is to return whatever is most appropriate for the operating system. is to return whatever is most appropriate for the operating system.
To give you an idea, for an app called ``"Foo Bar"``, something like ----
the following folders could be returned: NOTE, below is originally from `click` impl fn, we can prolly remove?
----
Mac OS X:
``~/Library/Application Support/Foo Bar``
Mac OS X (POSIX):
``~/.foo-bar``
Unix:
``~/.config/foo-bar``
Unix (POSIX):
``~/.foo-bar``
Win XP (roaming):
``C:\Documents and Settings\<user>\Local Settings\Application Data\Foo``
Win XP (not roaming):
``C:\Documents and Settings\<user>\Application Data\Foo Bar``
Win 7 (roaming):
``C:\Users\<user>\AppData\Roaming\Foo Bar``
Win 7 (not roaming):
``C:\Users\<user>\AppData\Local\Foo Bar``
.. versionadded:: 2.0
:param app_name: the application name. This should be properly capitalized
and can contain whitespace.
:param roaming: controls if the folder should be roaming or not on Windows. :param roaming: controls if the folder should be roaming or not on Windows.
Has no affect otherwise. Has no affect otherwise.
:param force_posix: if this is set to `True` then on any POSIX system the :param force_posix: if this is set to `True` then on any POSIX system the
folder will be stored in the home folder with a leading folder will be stored in the home folder with a leading
dot instead of the XDG config home or darwin's dot instead of the XDG config home or darwin's
application support folder. application support folder.
""" '''
def _posixify(name):
return "-".join(name.split()).lower()
# NOTE: for testing with `pytest` we leverage the `tmp_dir` # NOTE: for testing with `pytest` we leverage the `tmp_dir`
# fixture to generate (and clean up) a test-request-specific # fixture to generate (and clean up) a test-request-specific
# directory for isolated configuration files such that, # directory for isolated configuration files such that,
@ -117,23 +94,30 @@ def get_app_dir(
# assert testdirpath.exists(), 'piker test harness might be borked!?' # assert testdirpath.exists(), 'piker test harness might be borked!?'
# app_name = str(testdirpath) # app_name = str(testdirpath)
if platform.system() == 'Windows': os_name: str = platform.system()
key = "APPDATA" if roaming else "LOCALAPPDATA" conf_dir: Path = platformdirs.user_config_path()
folder = os.environ.get(key) app_dir: Path = conf_dir / app_name
if folder is None:
folder = os.path.expanduser("~") # ?TODO, from `click`; can remove?
return os.path.join(folder, app_name)
if force_posix: if force_posix:
def _posixify(name):
return "-".join(name.split()).lower()
return os.path.join( return os.path.join(
os.path.expanduser("~/.{}".format(_posixify(app_name)))) os.path.expanduser(
if sys.platform == "darwin": "~/.{}".format(
return os.path.join( _posixify(app_name)
os.path.expanduser("~/Library/Application Support"), app_name )
)
) )
return os.path.join(
os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")), log.info(
_posixify(app_name), f'Using user config directory,\n'
f'platform.system(): {os_name!r}\n'
f'conf_dir: {conf_dir!r}\n'
f'app_dir: {conf_dir!r}\n'
) )
return app_dir
_click_config_dir: Path = Path(get_app_dir('piker')) _click_config_dir: Path = Path(get_app_dir('piker'))
@ -250,7 +234,9 @@ def repodir() -> Path:
repodir: Path = Path(os.environ.get('GITHUB_WORKSPACE')) repodir: Path = Path(os.environ.get('GITHUB_WORKSPACE'))
confdir: Path = repodir / 'config' confdir: Path = repodir / 'config'
assert confdir.is_dir(), f'{confdir} DNE, {repodir} is likely incorrect!' assert confdir.is_dir(), (
f'{confdir} DNE, {repodir} is likely incorrect!'
)
return repodir return repodir

View File

@ -973,9 +973,6 @@ async def open_feed(
# assert flume.mkt.fqme == fqme # assert flume.mkt.fqme == fqme
feed.flumes[fqme] = flume feed.flumes[fqme] = flume
# TODO: do we need this?
flume.feed = feed
# attach and cache shm handles # attach and cache shm handles
rt_shm = flume.rt_shm rt_shm = flume.rt_shm
assert rt_shm assert rt_shm

View File

@ -22,9 +22,6 @@ real-time data processing data-structures.
""" """
from __future__ import annotations from __future__ import annotations
from typing import (
TYPE_CHECKING,
)
import tractor import tractor
import pendulum import pendulum
@ -38,9 +35,6 @@ from ._sharedmem import (
) )
from piker.accounting import MktPair from piker.accounting import MktPair
if TYPE_CHECKING:
from piker.data.feed import Feed
class Flume(Struct): class Flume(Struct):
''' '''
@ -80,10 +74,6 @@ class Flume(Struct):
izero_rt: int = 0 izero_rt: int = 0
throttle_rate: int | None = None throttle_rate: int | None = None
# TODO: do we need this really if we can pull the `Portal` from
# ``tractor``'s internals?
feed: Feed|None = None
@property @property
def rt_shm(self) -> ShmArray: def rt_shm(self) -> ShmArray:
@ -156,7 +146,6 @@ class Flume(Struct):
# will get instead some kind of msg-compat version # will get instead some kind of msg-compat version
# that it can load. # that it can load.
msg.pop('stream') msg.pop('stream')
msg.pop('feed')
msg.pop('_rt_shm') msg.pop('_rt_shm')
msg.pop('_hist_shm') msg.pop('_hist_shm')

View File

@ -294,6 +294,11 @@ def ldshm(
f'Something is wrong with time period for {shm}:\n{times}' f'Something is wrong with time period for {shm}:\n{times}'
) )
period_s: float = float(max(d1, d2, med)) period_s: float = float(max(d1, d2, med))
log.info(
f'Processing shm buffer:\n'
f' file: {shmfile.name}\n'
f' period: {period_s}s\n'
)
null_segs: tuple = tsp.get_null_segs( null_segs: tuple = tsp.get_null_segs(
frame=shm.array, frame=shm.array,

View File

@ -276,14 +276,41 @@ def get_null_segs(
absi_zdiff: np.ndarray = np.diff(absi_zeros) absi_zdiff: np.ndarray = np.diff(absi_zeros)
if zero_t.size < 2: if zero_t.size < 2:
try: idx: int = zero_t['index'][0]
breakpoint() idx_before: int = idx - 1
except RuntimeError: idx_after: int = idx + 1
# XXX, if greenback not active from index = frame['index']
# piker store ldshm cmd.. before_cond = idx_before <= index
log.exception( after_cond = index <= idx_after
"Can't debug single-sample null!\n" bars: np.ndarray = frame[
) before_cond
&
after_cond
]
time: np.ndarray = bars['time']
from pendulum import (
from_timestamp,
Interval,
)
gap: Interval = (
from_timestamp(time[-1])
-
from_timestamp(time[0])
)
log.warning(
f'Single OHLCV-bar null-segment detected??\n'
f'gap -> {gap}\n'
)
# ^^XXX, if you want to debug the above bar-gap^^
# try:
# breakpoint()
# except RuntimeError:
# # XXX, if greenback not active from
# # piker store ldshm cmd..
# log.exception(
# "Can't debug single-sample null!\n"
# )
return None return None

View File

@ -30,6 +30,11 @@ import tractor
from piker.data._formatters import BGM from piker.data._formatters import BGM
from piker.storage import log from piker.storage import log
from piker.toolz.profile import (
Profiler,
pg_profile_enabled,
ms_slower_then,
)
from piker.ui._style import get_fonts from piker.ui._style import get_fonts
if TYPE_CHECKING: if TYPE_CHECKING:
@ -92,12 +97,22 @@ async def markup_gaps(
# gap's duration. # gap's duration.
show_txt: bool = False, show_txt: bool = False,
# A/B comparison: render individual arrows alongside batch
# for visual comparison
show_individual_arrows: bool = False,
) -> dict[int, dict]: ) -> dict[int, dict]:
''' '''
Remote annotate time-gaps in a dt-fielded ts (normally OHLC) Remote annotate time-gaps in a dt-fielded ts (normally OHLC)
with rectangles. with rectangles.
''' '''
profiler = Profiler(
msg=f'markup_gaps() for {gaps.height} gaps',
disabled=False,
ms_threshold=0.0,
)
# XXX: force chart redraw FIRST to ensure PlotItem coordinate # XXX: force chart redraw FIRST to ensure PlotItem coordinate
# system is properly initialized before we position annotations! # system is properly initialized before we position annotations!
# Without this, annotations may be misaligned on first creation # Without this, annotations may be misaligned on first creation
@ -106,6 +121,19 @@ async def markup_gaps(
fqme=fqme, fqme=fqme,
timeframe=timeframe, timeframe=timeframe,
) )
profiler('first `.redraw()` before annot creation')
log.info(
f'markup_gaps() called:\n'
f' fqme: {fqme}\n'
f' timeframe: {timeframe}s\n'
f' gaps.height: {gaps.height}\n'
)
# collect all annotation specs for batch submission
rect_specs: list[dict] = []
arrow_specs: list[dict] = []
text_specs: list[dict] = []
aids: dict[int] = {} aids: dict[int] = {}
for i in range(gaps.height): for i in range(gaps.height):
@ -217,56 +245,38 @@ async def markup_gaps(
# 1: 'wine', # down-gap # 1: 'wine', # down-gap
# }[sgn] # }[sgn]
rect_kwargs: dict[str, Any] = dict( # collect rect spec (no fqme/timeframe, added by batch
fqme=fqme, # API)
timeframe=timeframe, rect_spec: dict[str, Any] = dict(
meth='set_view_pos',
start_pos=lc, start_pos=lc,
end_pos=ro, end_pos=ro,
color=color, color=color,
update_label=False,
start_time=start_time, start_time=start_time,
end_time=end_time, end_time=end_time,
) )
rect_specs.append(rect_spec)
# add up/down rects
aid: int|None = await actl.add_rect(**rect_kwargs)
if aid is None:
log.error(
f'Failed to add rect for,\n'
f'{rect_kwargs!r}\n'
f'\n'
f'Skipping to next gap!\n'
)
continue
assert aid
aids[aid] = rect_kwargs
direction: str = ( direction: str = (
'down' if down_gap 'down' if down_gap
else 'up' else 'up'
) )
# TODO! mk this a `msgspec.Struct` which we deserialize
# on the server side! # collect arrow spec
# XXX: send timestamp for server-side index lookup
# to ensure alignment with current shm state
gap_time: float = row['time'][0] gap_time: float = row['time'][0]
arrow_kwargs: dict[str, Any] = dict( arrow_spec: dict[str, Any] = dict(
fqme=fqme,
timeframe=timeframe,
x=iend, # fallback if timestamp lookup fails x=iend, # fallback if timestamp lookup fails
y=cls, y=cls,
time=gap_time, # for server-side index lookup time=gap_time, # for server-side index lookup
color=color, color=color,
alpha=169, alpha=169,
pointing=direction, pointing=direction,
# TODO: expose these as params to markup_gaps()?
headLen=10, headLen=10,
headWidth=2.222, headWidth=2.222,
pxMode=True, pxMode=True,
) )
arrow_specs.append(arrow_spec)
aid: int = await actl.add_arrow(
**arrow_kwargs
)
# add duration label to RHS of arrow # add duration label to RHS of arrow
if up_gap: if up_gap:
@ -278,15 +288,12 @@ async def markup_gaps(
assert flat assert flat
anchor = (0, 0) # up from bottom anchor = (0, 0) # up from bottom
# use a slightly smaller font for gap label txt. # collect text spec if enabled
font, small_font = get_fonts()
font_size: int = small_font.px_size - 1
assert isinstance(font_size, int)
if show_txt: if show_txt:
text_aid: int = await actl.add_text( font, small_font = get_fonts()
fqme=fqme, font_size: int = small_font.px_size - 1
timeframe=timeframe,
text_spec: dict[str, Any] = dict(
text=gap_label, text=gap_label,
x=iend + 1, # fallback if timestamp lookup fails x=iend + 1, # fallback if timestamp lookup fails
y=cls, y=cls,
@ -295,12 +302,46 @@ async def markup_gaps(
anchor=anchor, anchor=anchor,
font_size=font_size, font_size=font_size,
) )
aids[text_aid] = {'text': gap_label} text_specs.append(text_spec)
# tell chart to redraw all its # submit all annotations in single batch IPC msg
# graphics view layers Bo log.info(
f'Submitting batch annotations:\n'
f' rects: {len(rect_specs)}\n'
f' arrows: {len(arrow_specs)}\n'
f' texts: {len(text_specs)}\n'
)
profiler('built all annotation specs')
result: dict[str, list[int]] = await actl.add_batch(
fqme=fqme,
timeframe=timeframe,
rects=rect_specs,
arrows=arrow_specs,
texts=text_specs,
show_individual_arrows=show_individual_arrows,
)
profiler('batch `.add_batch()` IPC call complete')
# build aids dict from batch results
for aid in result['rects']:
aids[aid] = {'type': 'rect'}
for aid in result['arrows']:
aids[aid] = {'type': 'arrow'}
for aid in result['texts']:
aids[aid] = {'type': 'text'}
log.info(
f'Batch submission complete: {len(aids)} annotation(s) '
f'created'
)
profiler('built aids result dict')
# tell chart to redraw all its graphics view layers
await actl.redraw( await actl.redraw(
fqme=fqme, fqme=fqme,
timeframe=timeframe, timeframe=timeframe,
) )
profiler('final `.redraw()` after annot creation')
return aids return aids

View File

@ -58,7 +58,10 @@ from piker.brokers import NoData
from piker.accounting import ( from piker.accounting import (
MktPair, MktPair,
) )
from piker.log import get_logger from piker.log import (
get_logger,
get_console_log,
)
from ..data._sharedmem import ( from ..data._sharedmem import (
maybe_open_shm_array, maybe_open_shm_array,
ShmArray, ShmArray,
@ -248,10 +251,20 @@ async def maybe_fill_null_segments(
end_dt=end_dt, end_dt=end_dt,
) )
if array.size == 0:
log.warning(
f'Valid gap from backend ??\n'
f'{end_dt} -> {start_dt}\n'
)
# ?TODO? do we want to remove the nulls and push
# the close price here for the gap duration?
await tractor.pause()
break
if ( if (
frame_start_dt := ( frame_start_dt := (from_timestamp(array['time'][0]))
from_timestamp(array['time'][0]) <
) < backfill_until_dt backfill_until_dt
): ):
log.error( log.error(
f'Invalid frame_start !?\n' f'Invalid frame_start !?\n'
@ -613,10 +626,17 @@ async def start_backfill(
else: else:
log.warning( log.warning(
'0 BARS TO PUSH after diff!?\n' f'0 BARS TO PUSH after diff!?\n'
f'{next_start_dt} -> {last_start_dt}' f'{next_start_dt} -> {last_start_dt}'
f'\n'
f'This might mean we rxed a gap frame which starts BEFORE,\n'
f'backfill_until_dt: {backfill_until_dt}\n'
f'end_dt_param: {end_dt_param}\n'
) )
await tractor.pause() # XXX, to debug it and be sure.
# await tractor.pause()
break
# Check if we're about to exceed buffer capacity BEFORE # Check if we're about to exceed buffer capacity BEFORE
# attempting the push # attempting the push
@ -719,12 +739,21 @@ async def start_backfill(
# including the dst[/src] source asset token. SO, # including the dst[/src] source asset token. SO,
# 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for
# historical reasons ONLY. # historical reasons ONLY.
if mkt.dst.atype not in { if (
'crypto', mkt.dst.atype not in {
'crypto_currency', 'crypto',
'fiat', # a "forex pair" 'crypto_currency',
'perpetual_future', # stupid "perps" from cex land 'fiat', # a "forex pair"
}: 'perpetual_future', # stupid "perps" from cex land
}
and not (
mkt.src.atype == 'crypto_currency'
and
mkt.dst.atype in {
'future',
}
)
):
col_sym_key: str = mkt.get_fqme( col_sym_key: str = mkt.get_fqme(
delim_char='', delim_char='',
without_src=True, without_src=True,
@ -1368,6 +1397,10 @@ async def manage_history(
engages. engages.
''' '''
get_console_log(
name=__name__,
level=loglevel,
)
# TODO: is there a way to make each shm file key # TODO: is there a way to make each shm file key
# actor-tree-discovery-addr unique so we avoid collisions # actor-tree-discovery-addr unique so we avoid collisions
# when doing tests which also allocate shms for certain instruments # when doing tests which also allocate shms for certain instruments

View File

@ -24,8 +24,11 @@ from pyqtgraph import (
Point, Point,
functions as fn, functions as fn,
Color, Color,
GraphicsObject,
) )
from pyqtgraph.Qt import internals
import numpy as np import numpy as np
import pyqtgraph as pg
from piker.ui.qt import ( from piker.ui.qt import (
QtCore, QtCore,
@ -35,6 +38,10 @@ from piker.ui.qt import (
QRectF, QRectF,
QGraphicsPathItem, QGraphicsPathItem,
) )
from piker.ui._style import hcolor
from piker.log import get_logger
log = get_logger(__name__)
def mk_marker_path( def mk_marker_path(
@ -104,7 +111,7 @@ def mk_marker_path(
class LevelMarker(QGraphicsPathItem): class LevelMarker(QGraphicsPathItem):
''' '''
An arrow marker path graphich which redraws itself An arrow marker path graphic which redraws itself
to the specified view coordinate level on each paint cycle. to the specified view coordinate level on each paint cycle.
''' '''
@ -251,9 +258,9 @@ def qgo_draw_markers(
) -> float: ) -> float:
''' '''
Paint markers in ``pg.GraphicsItem`` style by first Paint markers in ``pg.GraphicsItem`` style by first removing the
removing the view transform for the painter, drawing the markers view transform for the painter, drawing the markers in scene
in scene coords, then restoring the view coords. coords, then restoring the view coords.
''' '''
# paint markers in native coordinate system # paint markers in native coordinate system
@ -295,3 +302,449 @@ def qgo_draw_markers(
p.setTransform(orig_tr) p.setTransform(orig_tr)
return max(sizes) return max(sizes)
class GapAnnotations(GraphicsObject):
'''
Batch-rendered gap annotations using Qt's efficient drawing
APIs.
Instead of creating individual `QGraphicsItem` instances per
gap (which is very slow for 1000+ gaps), this class stores all
gap rectangles and arrows in numpy-backed arrays and renders
them in single batch paint calls.
Performance: ~1000x faster than individual items for large gap
counts.
Based on patterns from:
- `pyqtgraph.BarGraphItem` (batch rect rendering)
- `pyqtgraph.ScatterPlotItem` (fragment rendering)
- `piker.ui._curve.FlowGraphic` (single path pattern)
'''
def __init__(
self,
gap_specs: list[dict],
array: np.ndarray|None = None,
color: str = 'dad_blue',
alpha: int = 169,
arrow_size: float = 10.0,
fqme: str|None = None,
timeframe: float|None = None,
) -> None:
'''
gap_specs: list of dicts with keys:
- start_pos: (x, y) tuple for left corner of rect
- end_pos: (x, y) tuple for right corner of rect
- arrow_x: x position for arrow
- arrow_y: y position for arrow
- pointing: 'up' or 'down' for arrow direction
- start_time: (optional) timestamp for repositioning
- end_time: (optional) timestamp for repositioning
array: optional OHLC numpy array for repositioning on
backfill updates (when abs-index changes)
fqme: symbol name for these gaps (for logging/debugging)
timeframe: period in seconds that these gaps were
detected on (used to skip reposition when
called with wrong timeframe's array)
'''
super().__init__()
self._gap_specs = gap_specs
self._array = array
self._fqme = fqme
self._timeframe = timeframe
n_gaps = len(gap_specs)
# shared pen/brush matching original SelectRect/ArrowItem style
base_color = pg.mkColor(hcolor(color))
# rect pen: base color, fully opaque for outline
self._rect_pen = pg.mkPen(base_color, width=1)
# rect brush: base color with alpha=66 (SelectRect default)
rect_fill = pg.mkColor(hcolor(color))
rect_fill.setAlpha(66)
self._rect_brush = pg.functions.mkBrush(rect_fill)
# arrow pen: same as rects
self._arrow_pen = pg.mkPen(base_color, width=1)
# arrow brush: base color with user-specified alpha (default 169)
arrow_fill = pg.mkColor(hcolor(color))
arrow_fill.setAlpha(alpha)
self._arrow_brush = pg.functions.mkBrush(arrow_fill)
# allocate rect array using Qt's efficient storage
self._rectarray = internals.PrimitiveArray(
QtCore.QRectF,
4,
)
self._rectarray.resize(n_gaps)
rect_memory = self._rectarray.ndarray()
# fill rect array from gap specs
for (
i,
spec,
) in enumerate(gap_specs):
(
start_x,
start_y,
) = spec['start_pos']
(
end_x,
end_y,
) = spec['end_pos']
# QRectF expects (x, y, width, height)
rect_memory[i, 0] = start_x
rect_memory[i, 1] = min(start_y, end_y)
rect_memory[i, 2] = end_x - start_x
rect_memory[i, 3] = abs(end_y - start_y)
# build single QPainterPath for all arrows
self._arrow_path = QtGui.QPainterPath()
self._arrow_size = arrow_size
for spec in gap_specs:
arrow_x = spec['arrow_x']
arrow_y = spec['arrow_y']
pointing = spec['pointing']
# create arrow polygon
if pointing == 'down':
# arrow points downward
arrow_poly = QtGui.QPolygonF([
QPointF(arrow_x, arrow_y), # tip
QPointF(
arrow_x - arrow_size/2,
arrow_y - arrow_size,
), # left
QPointF(
arrow_x + arrow_size/2,
arrow_y - arrow_size,
), # right
])
else: # up
# arrow points upward
arrow_poly = QtGui.QPolygonF([
QPointF(arrow_x, arrow_y), # tip
QPointF(
arrow_x - arrow_size/2,
arrow_y + arrow_size,
), # left
QPointF(
arrow_x + arrow_size/2,
arrow_y + arrow_size,
), # right
])
self._arrow_path.addPolygon(arrow_poly)
self._arrow_path.closeSubpath()
# cache bounding rect
self._br: QRectF|None = None
def boundingRect(self) -> QRectF:
'''
Compute bounding rect from rect array and arrow path.
'''
if self._br is not None:
return self._br
# get rect bounds
rect_memory = self._rectarray.ndarray()
if len(rect_memory) == 0:
self._br = QRectF()
return self._br
x_min = rect_memory[:, 0].min()
y_min = rect_memory[:, 1].min()
x_max = (rect_memory[:, 0] + rect_memory[:, 2]).max()
y_max = (rect_memory[:, 1] + rect_memory[:, 3]).max()
# expand for arrow path
arrow_br = self._arrow_path.boundingRect()
x_min = min(x_min, arrow_br.left())
y_min = min(y_min, arrow_br.top())
x_max = max(x_max, arrow_br.right())
y_max = max(y_max, arrow_br.bottom())
self._br = QRectF(
x_min,
y_min,
x_max - x_min,
y_max - y_min,
)
return self._br
def paint(
self,
p: QtGui.QPainter,
opt: QtWidgets.QStyleOptionGraphicsItem,
w: QtWidgets.QWidget,
) -> None:
'''
Batch render all rects and arrows in minimal paint calls.
'''
# draw all rects in single batch call (data coordinates)
p.setPen(self._rect_pen)
p.setBrush(self._rect_brush)
drawargs = self._rectarray.drawargs()
p.drawRects(*drawargs)
# draw arrows in scene/pixel coordinates so they maintain
# size regardless of zoom level
orig_tr = p.transform()
p.resetTransform()
# rebuild arrow path in scene coordinates
arrow_path_scene = QtGui.QPainterPath()
# arrow geometry matching pg.ArrowItem defaults
# headLen=10, headWidth=2.222
# headWidth is the half-width (center to edge distance)
head_len = self._arrow_size
head_width = head_len * 0.2222 # 2.222 at size=10
for spec in self._gap_specs:
if 'arrow_x' not in spec:
continue
arrow_x = spec['arrow_x']
arrow_y = spec['arrow_y']
pointing = spec['pointing']
# transform data coords to scene coords
scene_pt = orig_tr.map(QPointF(arrow_x, arrow_y))
sx = scene_pt.x()
sy = scene_pt.y()
# create arrow polygon in scene/pixel coords
# matching pg.ArrowItem geometry but rotated for up/down
if pointing == 'down':
# tip points downward (negative y direction)
arrow_poly = QtGui.QPolygonF([
QPointF(sx, sy), # tip
QPointF(
sx - head_width,
sy - head_len,
), # left base
QPointF(
sx + head_width,
sy - head_len,
), # right base
])
else: # up
# tip points upward (positive y direction)
arrow_poly = QtGui.QPolygonF([
QPointF(sx, sy), # tip
QPointF(
sx - head_width,
sy + head_len,
), # left base
QPointF(
sx + head_width,
sy + head_len,
), # right base
])
arrow_path_scene.addPolygon(arrow_poly)
arrow_path_scene.closeSubpath()
p.setPen(self._arrow_pen)
p.setBrush(self._arrow_brush)
p.drawPath(arrow_path_scene)
# restore original transform
p.setTransform(orig_tr)
def reposition(
self,
array: np.ndarray|None = None,
fqme: str|None = None,
timeframe: float|None = None,
) -> None:
'''
Reposition all annotations based on timestamps.
Used when viz is updated (eg during backfill) and abs-index
range changes - we need to lookup new indices from timestamps.
'''
# skip reposition if timeframe doesn't match
# (e.g., 1s gaps being repositioned with 60s array)
if (
timeframe is not None
and
self._timeframe is not None
and
timeframe != self._timeframe
):
log.debug(
f'Skipping reposition for {self._fqme} gaps:\n'
f' gap timeframe: {self._timeframe}s\n'
f' array timeframe: {timeframe}s\n'
)
return
if array is None:
array = self._array
if array is None:
log.warning(
'GapAnnotations.reposition() called but no array '
'provided'
)
return
# collect all unique timestamps we need to lookup
timestamps: set[float] = set()
for spec in self._gap_specs:
if spec.get('start_time') is not None:
timestamps.add(spec['start_time'])
if spec.get('end_time') is not None:
timestamps.add(spec['end_time'])
if spec.get('time') is not None:
timestamps.add(spec['time'])
# vectorized timestamp -> row lookup using binary search
time_to_row: dict[float, dict] = {}
if timestamps:
import numpy as np
time_arr = array['time']
ts_array = np.array(list(timestamps))
search_indices = np.searchsorted(
time_arr,
ts_array,
)
# vectorized bounds check and exact match verification
valid_mask = (
(search_indices < len(array))
& (time_arr[search_indices] == ts_array)
)
valid_indices = search_indices[valid_mask]
valid_timestamps = ts_array[valid_mask]
matched_rows = array[valid_indices]
time_to_row = {
float(ts): {
'index': float(row['index']),
'open': float(row['open']),
'close': float(row['close']),
}
for ts, row in zip(
valid_timestamps,
matched_rows,
)
}
# rebuild rect array from gap specs with new indices
rect_memory = self._rectarray.ndarray()
for (
i,
spec,
) in enumerate(self._gap_specs):
start_time = spec.get('start_time')
end_time = spec.get('end_time')
if (
start_time is None
or end_time is None
):
continue
start_row = time_to_row.get(start_time)
end_row = time_to_row.get(end_time)
if (
start_row is None
or end_row is None
):
log.warning(
f'Timestamp lookup failed for gap[{i}] during '
f'reposition:\n'
f' fqme: {fqme}\n'
f' timeframe: {timeframe}s\n'
f' start_time: {start_time}\n'
f' end_time: {end_time}\n'
f' array time range: '
f'{array["time"][0]} -> {array["time"][-1]}\n'
)
continue
start_idx = start_row['index']
end_idx = end_row['index']
start_close = start_row['close']
end_open = end_row['open']
from_idx: float = 0.16 - 0.06
start_x = start_idx + 1 - from_idx
end_x = end_idx + from_idx
# update rect in array
rect_memory[i, 0] = start_x
rect_memory[i, 1] = min(start_close, end_open)
rect_memory[i, 2] = end_x - start_x
rect_memory[i, 3] = abs(end_open - start_close)
# rebuild arrow path with new indices
self._arrow_path.clear()
for spec in self._gap_specs:
time_val = spec.get('time')
if time_val is None:
continue
arrow_row = time_to_row.get(time_val)
if arrow_row is None:
continue
arrow_x = arrow_row['index']
arrow_y = arrow_row['close']
pointing = spec['pointing']
# create arrow polygon
if pointing == 'down':
arrow_poly = QtGui.QPolygonF([
QPointF(arrow_x, arrow_y),
QPointF(
arrow_x - self._arrow_size/2,
arrow_y - self._arrow_size,
),
QPointF(
arrow_x + self._arrow_size/2,
arrow_y - self._arrow_size,
),
])
else: # up
arrow_poly = QtGui.QPolygonF([
QPointF(arrow_x, arrow_y),
QPointF(
arrow_x - self._arrow_size/2,
arrow_y + self._arrow_size,
),
QPointF(
arrow_x + self._arrow_size/2,
arrow_y + self._arrow_size,
),
])
self._arrow_path.addPolygon(arrow_poly)
self._arrow_path.closeSubpath()
# invalidate bounding rect cache
self._br = None
self.prepareGeometryChange()
self.update()

View File

@ -168,7 +168,7 @@ class ArrowEditor(Struct):
''' '''
uid: str = arrow._uid uid: str = arrow._uid
arrows: list[pg.ArrowItem] = self._arrows[uid] arrows: list[pg.ArrowItem] = self._arrows[uid]
log.info( log.debug(
f'Removing arrow from views\n' f'Removing arrow from views\n'
f'uid: {uid!r}\n' f'uid: {uid!r}\n'
f'{arrow!r}\n' f'{arrow!r}\n'
@ -286,7 +286,9 @@ class LineEditor(Struct):
for line in lines: for line in lines:
line.show_labels() line.show_labels()
line.hide_markers() line.hide_markers()
log.debug(f'Level active for level: {line.value()}') log.debug(
f'Line active @ level: {line.value()!r}'
)
# TODO: other flashy things to indicate the order is active # TODO: other flashy things to indicate the order is active
return lines return lines
@ -329,7 +331,11 @@ class LineEditor(Struct):
if line in hovered: if line in hovered:
hovered.remove(line) hovered.remove(line)
log.debug(f'deleting {line} with oid: {uuid}') log.debug(
f'Deleting level-line\n'
f'line: {line!r}\n'
f'oid: {uuid!r}\n'
)
line.delete() line.delete()
# make sure the xhair doesn't get left off # make sure the xhair doesn't get left off
@ -337,7 +343,11 @@ class LineEditor(Struct):
cursor.show_xhair() cursor.show_xhair()
else: else:
log.warning(f'Could not find line for {line}') log.warning(
f'Could not find line for removal ??\n'
f'\n'
f'{line!r}\n'
)
return lines return lines
@ -569,11 +579,11 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
if update_label: if update_label:
self.init_label(view_rect) self.init_label(view_rect)
print( log.debug(
'SelectRect modify:\n' f'SelectRect modify,\n'
f'QRectF: {view_rect}\n' f'QRectF: {view_rect}\n'
f'start_pos: {start_pos}\n' f'start_pos: {start_pos!r}\n'
f'end_pos: {end_pos}\n' f'end_pos: {end_pos!r}\n'
) )
self.show() self.show()
@ -640,8 +650,11 @@ class SelectRect(QtWidgets.QGraphicsRectItem):
dmn=dmn, dmn=dmn,
)) ))
# print(f'x2, y2: {(x2, y2)}') # tracing
# print(f'xmn, ymn: {(xmn, ymx)}') # log.info(
# f'x2, y2: {(x2, y2)}\n'
# f'xmn, ymn: {(xmn, ymx)}\n'
# )
label_anchor = Point( label_anchor = Point(
xmx + 2, xmx + 2,

View File

@ -38,7 +38,6 @@ from piker.ui.qt import (
QtGui, QtGui,
QGraphicsPathItem, QGraphicsPathItem,
QStyleOptionGraphicsItem, QStyleOptionGraphicsItem,
QGraphicsItem,
QGraphicsScene, QGraphicsScene,
QWidget, QWidget,
QPointF, QPointF,

View File

@ -22,6 +22,7 @@ a chart from some other actor.
from __future__ import annotations from __future__ import annotations
from contextlib import ( from contextlib import (
asynccontextmanager as acm, asynccontextmanager as acm,
contextmanager as cm,
AsyncExitStack, AsyncExitStack,
) )
from functools import partial from functools import partial
@ -46,6 +47,7 @@ from piker.log import get_logger
from piker.types import Struct from piker.types import Struct
from piker.service import find_service from piker.service import find_service
from piker.brokers import SymbolNotFound from piker.brokers import SymbolNotFound
from piker.toolz import Profiler
from piker.ui.qt import ( from piker.ui.qt import (
QGraphicsItem, QGraphicsItem,
) )
@ -98,6 +100,8 @@ def rm_annot(
annot: ArrowEditor|SelectRect|pg.TextItem annot: ArrowEditor|SelectRect|pg.TextItem
) -> bool: ) -> bool:
global _editors global _editors
from piker.ui._annotate import GapAnnotations
match annot: match annot:
case pg.ArrowItem(): case pg.ArrowItem():
editor = _editors[annot._uid] editor = _editors[annot._uid]
@ -122,9 +126,35 @@ def rm_annot(
scene.removeItem(annot) scene.removeItem(annot)
return True return True
case GapAnnotations():
scene = annot.scene()
if scene:
scene.removeItem(annot)
return True
return False return False
@cm
def no_qt_updates(*items):
'''
Disable Qt widget/item updates during context to batch
render operations and only trigger single repaint on exit.
Accepts both QWidgets and QGraphicsItems.
'''
for item in items:
if hasattr(item, 'setUpdatesEnabled'):
item.setUpdatesEnabled(False)
try:
yield
finally:
for item in items:
if hasattr(item, 'setUpdatesEnabled'):
item.setUpdatesEnabled(True)
async def serve_rc_annots( async def serve_rc_annots(
ipc_key: str, ipc_key: str,
annot_req_stream: MsgStream, annot_req_stream: MsgStream,
@ -429,6 +459,333 @@ async def serve_rc_annots(
aids.add(aid) aids.add(aid)
await annot_req_stream.send(aid) await annot_req_stream.send(aid)
case {
'cmd': 'batch',
'fqme': fqme,
'timeframe': timeframe,
'rects': list(rect_specs),
'arrows': list(arrow_specs),
'texts': list(text_specs),
'show_individual_arrows': bool(show_individual_arrows),
}:
# batch submission handler - process multiple
# annotations in single IPC round-trip
ds: DisplayState = _dss[fqme]
try:
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
except KeyError:
msg: str = (
f'No chart for timeframe={timeframe}s, '
f'skipping batch annotation'
)
log.error(msg)
await annot_req_stream.send({'error': msg})
continue
cv: ChartView = chart.cv
viz: Viz = chart.get_viz(fqme)
shm = viz.shm
arr = shm.array
result: dict[str, list[int]] = {
'rects': [],
'arrows': [],
'texts': [],
}
profiler = Profiler(
msg=(
f'Batch annotate {len(rect_specs)} gaps '
f'on {fqme}@{timeframe}s'
),
disabled=False,
delayed=False,
)
aids_set: set[int] = ctxs[ipc_key][1]
# build unified gap_specs for GapAnnotations class
from piker.ui._annotate import GapAnnotations
gap_specs: list[dict] = []
n_gaps: int = max(
len(rect_specs),
len(arrow_specs),
)
profiler('setup batch annot creation')
# collect all unique timestamps for vectorized lookup
timestamps: list[float] = []
for rect_spec in rect_specs:
if start_time := rect_spec.get('start_time'):
timestamps.append(start_time)
if end_time := rect_spec.get('end_time'):
timestamps.append(end_time)
for arrow_spec in arrow_specs:
if time_val := arrow_spec.get('time'):
timestamps.append(time_val)
profiler('collect `timestamps: list` complet!')
# build timestamp -> row mapping using binary search
# O(m log n) instead of O(n*m) with np.isin
time_to_row: dict[float, dict] = {}
if timestamps:
import numpy as np
time_arr = arr['time']
ts_array = np.array(timestamps)
# binary search for each timestamp in sorted time array
search_indices = np.searchsorted(
time_arr,
ts_array,
)
profiler('`np.searchsorted()` complete!')
# vectorized bounds check and exact match verification
valid_mask = (
(search_indices < len(arr))
& (time_arr[search_indices] == ts_array)
)
# get all valid indices and timestamps
valid_indices = search_indices[valid_mask]
valid_timestamps = ts_array[valid_mask]
# use fancy indexing to get all rows at once
matched_rows = arr[valid_indices]
# extract fields to plain arrays BEFORE dict building
indices_arr = matched_rows['index'].astype(float)
opens_arr = matched_rows['open'].astype(float)
closes_arr = matched_rows['close'].astype(float)
profiler('extracted field arrays')
# build dict from plain arrays (much faster)
time_to_row: dict[float, dict] = {
float(ts): {
'index': idx,
'open': opn,
'close': cls,
}
for (
ts,
idx,
opn,
cls,
) in zip(
valid_timestamps,
indices_arr,
opens_arr,
closes_arr,
)
}
profiler('`time_to_row` creation complete!')
profiler(f'built timestamp lookup for {len(timestamps)} times')
# build gap_specs from rect+arrow specs
for i in range(n_gaps):
gap_spec: dict = {}
# get rect spec for this gap
if i < len(rect_specs):
rect_spec: dict = rect_specs[i].copy()
start_time = rect_spec.get('start_time')
end_time = rect_spec.get('end_time')
if (
start_time is not None
and end_time is not None
):
# lookup from pre-built mapping
start_row = time_to_row.get(start_time)
end_row = time_to_row.get(end_time)
if (
start_row is None
or end_row is None
):
log.warning(
f'Timestamp lookup failed for '
f'gap[{i}], skipping'
)
continue
start_idx = start_row['index']
end_idx = end_row['index']
start_close = start_row['close']
end_open = end_row['open']
from_idx: float = 0.16 - 0.06
gap_spec['start_pos'] = (
start_idx + 1 - from_idx,
start_close,
)
gap_spec['end_pos'] = (
end_idx + from_idx,
end_open,
)
gap_spec['start_time'] = start_time
gap_spec['end_time'] = end_time
gap_spec['color'] = rect_spec.get(
'color',
'dad_blue',
)
# get arrow spec for this gap
if i < len(arrow_specs):
arrow_spec: dict = arrow_specs[i].copy()
x: float = float(arrow_spec.get('x', 0))
y: float = float(arrow_spec.get('y', 0))
time_val: float|None = arrow_spec.get('time')
# timestamp-based index lookup (only for x, NOT y!)
# y is already set to the PREVIOUS bar's close
if time_val is not None:
arrow_row = time_to_row.get(time_val)
if arrow_row is not None:
x = arrow_row['index']
# NOTE: do NOT update y! it's the
# previous bar's close, not current
else:
log.warning(
f'Arrow timestamp {time_val} not '
f'found for gap[{i}], using x={x}'
)
gap_spec['arrow_x'] = x
gap_spec['arrow_y'] = y
gap_spec['time'] = time_val
gap_spec['pointing'] = arrow_spec.get(
'pointing',
'down',
)
gap_spec['alpha'] = arrow_spec.get('alpha', 169)
gap_specs.append(gap_spec)
profiler(f'built {len(gap_specs)} gap_specs')
# create single GapAnnotations item for all gaps
if gap_specs:
gaps_item = GapAnnotations(
gap_specs=gap_specs,
array=arr,
color=gap_specs[0].get('color', 'dad_blue'),
alpha=gap_specs[0].get('alpha', 169),
arrow_size=10.0,
fqme=fqme,
timeframe=timeframe,
)
chart.plotItem.addItem(gaps_item)
# register single item for repositioning
aid: int = id(gaps_item)
annots[aid] = gaps_item
aids_set.add(aid)
result['rects'].append(aid)
profiler(
f'created GapAnnotations item for {len(gap_specs)} '
f'gaps'
)
# A/B comparison: optionally create individual arrows
# alongside batch for visual comparison
if show_individual_arrows:
godw = chart.linked.godwidget
arrows: ArrowEditor = ArrowEditor(godw=godw)
for i, spec in enumerate(gap_specs):
if 'arrow_x' not in spec:
continue
aid_str: str = str(uuid4())
arrow: pg.ArrowItem = arrows.add(
plot=chart.plotItem,
uid=aid_str,
x=spec['arrow_x'],
y=spec['arrow_y'],
pointing=spec['pointing'],
color='bracket', # different color
alpha=spec.get('alpha', 169),
headLen=10.0,
headWidth=2.222,
pxMode=True,
)
arrow._abs_x = spec['arrow_x']
arrow._abs_y = spec['arrow_y']
annots[aid_str] = arrow
_editors[aid_str] = arrows
aids_set.add(aid_str)
result['arrows'].append(aid_str)
profiler(
f'created {len(gap_specs)} individual arrows '
f'for comparison'
)
# handle text items separately (less common, keep
# individual items)
n_texts: int = 0
for text_spec in text_specs:
kwargs: dict = text_spec.copy()
text: str = kwargs.pop('text')
x: float = float(kwargs.pop('x'))
y: float = float(kwargs.pop('y'))
time_val: float|None = kwargs.pop('time', None)
# timestamp-based index lookup
if time_val is not None:
matches = arr[arr['time'] == time_val]
if len(matches) > 0:
x = float(matches[0]['index'])
y = float(matches[0]['close'])
color = kwargs.pop('color', 'dad_blue')
anchor = kwargs.pop('anchor', (0, 1))
font_size = kwargs.pop('font_size', None)
text_item: pg.TextItem = pg.TextItem(
text,
color=hcolor(color),
anchor=anchor,
)
if font_size is None:
from ._style import get_fonts
font, font_small = get_fonts()
font_size = font_small.px_size - 1
qfont: QFont = text_item.textItem.font()
qfont.setPixelSize(font_size)
text_item.setFont(qfont)
text_item.setPos(float(x), float(y))
chart.plotItem.addItem(text_item)
text_item._abs_x = float(x)
text_item._abs_y = float(y)
aid: str = str(uuid4())
annots[aid] = text_item
aids_set.add(aid)
result['texts'].append(aid)
n_texts += 1
profiler(
f'created text annotations: {n_texts} texts'
)
profiler.finish()
await annot_req_stream.send(result)
case { case {
'cmd': 'remove', 'cmd': 'remove',
'aid': int(aid)|str(aid), 'aid': int(aid)|str(aid),
@ -471,10 +828,26 @@ async def serve_rc_annots(
# XXX: reposition all annotations to ensure they # XXX: reposition all annotations to ensure they
# stay aligned with viz data after reset (eg during # stay aligned with viz data after reset (eg during
# backfill when abs-index range changes) # backfill when abs-index range changes)
chart: ChartPlotWidget = {
60: ds.hist_chart,
1: ds.chart,
}[timeframe]
viz: Viz = chart.get_viz(fqme)
arr = viz.shm.array
n_repositioned: int = 0 n_repositioned: int = 0
for aid, annot in annots.items(): for aid, annot in annots.items():
# GapAnnotations batch items have .reposition()
if hasattr(annot, 'reposition'):
annot.reposition(
array=arr,
fqme=fqme,
timeframe=timeframe,
)
n_repositioned += 1
# arrows and text items use abs x,y coords # arrows and text items use abs x,y coords
if ( elif (
hasattr(annot, '_abs_x') hasattr(annot, '_abs_x')
and and
hasattr(annot, '_abs_y') hasattr(annot, '_abs_y')
@ -539,12 +912,21 @@ async def remote_annotate(
finally: finally:
# ensure all annots for this connection are deleted # ensure all annots for this connection are deleted
# on any final teardown # on any final teardown
profiler = Profiler(
msg=f'Annotation teardown for ctx {ctx.cid}',
disabled=False,
ms_threshold=0.0,
)
(_ctx, aids) = _ctxs[ctx.cid] (_ctx, aids) = _ctxs[ctx.cid]
assert _ctx is ctx assert _ctx is ctx
profiler(f'got {len(aids)} aids to remove')
for aid in aids: for aid in aids:
annot: QGraphicsItem = _annots[aid] annot: QGraphicsItem = _annots[aid]
assert rm_annot(annot) assert rm_annot(annot)
profiler(f'removed all {len(aids)} annotations')
class AnnotCtl(Struct): class AnnotCtl(Struct):
''' '''
@ -746,6 +1128,64 @@ class AnnotCtl(Struct):
) )
return aid return aid
async def add_batch(
self,
fqme: str,
timeframe: float,
rects: list[dict]|None = None,
arrows: list[dict]|None = None,
texts: list[dict]|None = None,
show_individual_arrows: bool = False,
from_acm: bool = False,
) -> dict[str, list[int]]:
'''
Batch submit multiple annotations in single IPC msg for
much faster remote annotation vs. per-annot round-trips.
Returns dict of annotation IDs:
{
'rects': [aid1, aid2, ...],
'arrows': [aid3, aid4, ...],
'texts': [aid5, aid6, ...],
}
'''
ipc: MsgStream = self._get_ipc(fqme)
with trio.fail_after(10):
await ipc.send({
'fqme': fqme,
'cmd': 'batch',
'timeframe': timeframe,
'rects': rects or [],
'arrows': arrows or [],
'texts': texts or [],
'show_individual_arrows': show_individual_arrows,
})
result: dict = await ipc.receive()
match result:
case {'error': str(msg)}:
log.error(msg)
return {
'rects': [],
'arrows': [],
'texts': [],
}
# register all AIDs with their IPC streams
for aid_list in result.values():
for aid in aid_list:
self._ipcs[aid] = ipc
if not from_acm:
self._annot_stack.push_async_callback(
partial(
self.remove,
aid,
)
)
return result
async def add_text( async def add_text(
self, self,
fqme: str, fqme: str,
@ -881,3 +1321,14 @@ async def open_annot_ctl(
_annot_stack=annots_stack, _annot_stack=annots_stack,
) )
yield client yield client
# client exited, measure teardown time
teardown_profiler = Profiler(
msg='Client AnnotCtl teardown',
disabled=False,
ms_threshold=0.0,
)
teardown_profiler('exiting annots_stack')
teardown_profiler('annots_stack exited')
teardown_profiler('exiting gather_contexts')

View File

@ -34,6 +34,7 @@ import uuid
from bidict import bidict from bidict import bidict
import tractor import tractor
from tractor.devx.pformat import ppfmt
import trio import trio
from piker import config from piker import config
@ -1207,11 +1208,10 @@ async def process_trade_msg(
f'\n' f'\n'
f'=> CANCELLING ORDER DIALOG <=\n' f'=> CANCELLING ORDER DIALOG <=\n'
# from tractor.devx.pformat import ppfmt
# !TODO LOL, wtf the msg is causing # !TODO LOL, wtf the msg is causing
# a recursion bug! # a recursion bug!
# -[ ] get this shit on msgspec stat! # -[ ] get this shit on msgspec stat!
# f'{ppfmt(broker_msg)}' f'{ppfmt(broker_msg)}'
) )
# do all the things for a cancel: # do all the things for a cancel:
# - drop order-msg dialog from client table # - drop order-msg dialog from client table

1263
poetry.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -106,7 +106,7 @@ default-groups = [
[dependency-groups] [dependency-groups]
uis = [ uis = [
"pyqtgraph", "pyqtgraph >= 0.14.0",
"qdarkstyle >=3.0.2, <4.0.0", "qdarkstyle >=3.0.2, <4.0.0",
"pyqt6 >=6.7.0, <7.0.0", "pyqt6 >=6.7.0, <7.0.0",
@ -193,9 +193,12 @@ include = ["piker"]
[tool.uv.sources] [tool.uv.sources]
pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" } tomlkit = { git = "https://github.com/pikers/tomlkit.git", branch ="piker_pin" }
pyvnc = { git = "https://github.com/regulad/pyvnc.git" } pyvnc = { git = "https://github.com/regulad/pyvnc.git" }
# pyqtgraph = { git = "https://github.com/pyqtgraph/pyqtgraph.git", branch = 'master' }
# pyqtgraph = { path = '../pyqtgraph', editable = true }
# ?TODO, resync our fork?
# pyqtgraph = { git = "https://github.com/pikers/pyqtgraph.git" }
# to get fancy next-cmd/suggestion feats prior to 0.22.2 B) # to get fancy next-cmd/suggestion feats prior to 0.22.2 B)
# https://github.com/xonsh/xonsh/pull/6037 # https://github.com/xonsh/xonsh/pull/6037

12
uv.lock
View File

@ -1221,7 +1221,7 @@ dev = [
{ name = "prompt-toolkit", specifier = "==3.0.40" }, { name = "prompt-toolkit", specifier = "==3.0.40" },
{ name = "pyperclip", specifier = ">=1.9.0" }, { name = "pyperclip", specifier = ">=1.9.0" },
{ name = "pyqt6", specifier = ">=6.7.0,<7.0.0" }, { name = "pyqt6", specifier = ">=6.7.0,<7.0.0" },
{ name = "pyqtgraph", git = "https://github.com/pikers/pyqtgraph.git" }, { name = "pyqtgraph", specifier = ">=0.14.0" },
{ name = "pytest" }, { name = "pytest" },
{ name = "qdarkstyle", specifier = ">=3.0.2,<4.0.0" }, { name = "qdarkstyle", specifier = ">=3.0.2,<4.0.0" },
{ name = "rapidfuzz", specifier = ">=3.2.0,<4.0.0" }, { name = "rapidfuzz", specifier = ">=3.2.0,<4.0.0" },
@ -1239,7 +1239,7 @@ repl = [
testing = [{ name = "pytest" }] testing = [{ name = "pytest" }]
uis = [ uis = [
{ name = "pyqt6", specifier = ">=6.7.0,<7.0.0" }, { name = "pyqt6", specifier = ">=6.7.0,<7.0.0" },
{ name = "pyqtgraph", git = "https://github.com/pikers/pyqtgraph.git" }, { name = "pyqtgraph", specifier = ">=0.14.0" },
{ name = "qdarkstyle", specifier = ">=3.0.2,<4.0.0" }, { name = "qdarkstyle", specifier = ">=3.0.2,<4.0.0" },
{ name = "rapidfuzz", specifier = ">=3.2.0,<4.0.0" }, { name = "rapidfuzz", specifier = ">=3.2.0,<4.0.0" },
] ]
@ -1606,11 +1606,15 @@ wheels = [
[[package]] [[package]]
name = "pyqtgraph" name = "pyqtgraph"
version = "0.12.3" version = "0.14.0"
source = { git = "https://github.com/pikers/pyqtgraph.git#373f9561ea8ec4fef9b4e8bdcdd4bbf372dd6512" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "colorama" },
{ name = "numpy" }, { name = "numpy" },
] ]
wheels = [
{ url = "https://files.pythonhosted.org/packages/32/36/4c242f81fdcbfa4fb62a5645f6af79191f4097a0577bd5460c24f19cc4ef/pyqtgraph-0.14.0-py3-none-any.whl", hash = "sha256:7abb7c3e17362add64f8711b474dffac5e7b0e9245abdf992e9a44119b7aa4f5", size = 1924755, upload-time = "2025-11-16T19:43:22.251Z" },
]
[[package]] [[package]]
name = "pyreadline3" name = "pyreadline3"