Compare commits

...

7 Commits

Author SHA1 Message Date
Gud Boi f2ace1b63b Use `ppfmt()` in `order_mode` since it's provided by `tractor` now 2026-03-06 09:22:36 -05:00
Gud Boi 9010f9c7ab Augment `.ib.symbols` search with more logging
Refactor `open_symbol_search()` to use `partial()` for nursery task
spawning and add detailed query->results logging via `ppfmt()`.

Deats,
- change `extend_results()` to accept `target` callable +
  `pattern` + `**kwargs` and invoke inside, instead of receiving
  a pre-called awaitable; use `partial()` to pass args.
- add `ppfmt()` formatted logging of search query params and
  results including client class + method repr.
- change `print()` -> `log.exception()` for `Lagged` overrun.
- bump `upto=5` -> `upto=10` for `search_symbols()` call.

Also for styling,
- add type some missing type annots.
- add multiline style to `or` conditionals in pattern check.
- reformat log msgs to multiline style throughout.
- use `ppfmt()` for fuzzy match debug log.
- rename nursery `sn` -> `tn`.
- add TODO comment about `assert 0` hang.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
Gud Boi 89a145113c Handle `str`-errors in `.ib.broker` trade events
Add `isinstance()` dispatch for the `'error'` event case in
`deliver_trade_events()` to handle `ib_async` sometimes emitting plain
`str` error items instead of the previously expected `dict`.

Deats,
- add `isinstance(err, dict)` branch for the standard case with
  `error_code`, `reason`, and `reqid` fields.
- add `isinstance(err, str)` branch to parse error strings of the
  form `'[code 104] connection failed'` into `code` and `reason`.
- set `reqid: str = '<unknown>'` for string-form errors since
  there's no request ID available.
- update `err` type annot to `dict|str`.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
Gud Boi ec4db30cdc Handle valid null frames and 0-bar cases in backfill
Add guards for empty-array and zero-bar-diff cases in the TSP backfill
loops to avoid crashes and allow graceful loop termination.

In `maybe_fill_null_segments()`,
- add `array.size == 0` guard in `maybe_fill_null_segments()` to detect
  valid (venue closure) gaps from the backend; add a warning + bp
  + break for this case.
- add TODO that we should likely be filling nulls with the close price
  for the gap's duration.

In `start_backfill()`,
- expand the "0 bars after diff" warning msg with
  `backfill_until_dt` and `end_dt_param` context.
- mask the  `await tractor.pause()` and add a `break` to avoid blocking
  the backfill loop.

(this commit msg was generated in some part by
[`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
Gud Boi 2a394dba03 Warn instead of raise on `start_dt`-trimmed frames
Downgrade the `start_dt`-trimming check in `open_history_client()`
from a `RuntimeError` raise to a warning log, allowing the caller
to still receive a (shorter) frame of bars (though we may need to still
specially handle such cases in the backfiller's biz logic layer).

Deats,
- add `trimmed_bars.size` guard to skip check on empty results.
- change condition to `>=` and log a warning with the short-frame
  size instead of raising.
- comment-out `raise RuntimeError` and breakpoint for future
  removal once confident.
- add docstring-style comment on `start_dt=` kwarg noting that
  `Client.bars()` doesn't truly support it (uses duration-style
  queries internally).

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
Gud Boi 19f16e1df3 Handle ambiguous futes contracts in `get_fute()`
Use (the only available in `ib_async`) `returnAll=True` in
`qualifyContractsAsync()` calls within `get_fute()` and handle the case
where IB returns a list of ambiguous contract matches instead of
a single result.

Deats,
- add `returnAll=True` to both `ContFuture` and `Future`
  qualification calls.
- add `isinstance(con, list)` check after unpacking first result
  to detect ambiguous contract sets.
- log warning with input params and matched contracts when
  ambiguous.
- update return type annot to `Contract|list[Contract]`.

Also,
- handle list-of-contracts case in `find_contracts()` by unpacking
  `*contracts` into the `qualifyContractsAsync()` call.
- reformat `qualifyContractsAsync()` calls to multiline style.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
Gud Boi 3adb0d8b9d Fall back to `Contract.exchange` in `has_holiday()`
Use `con.exchange` as fallback when `con.primaryExchange` is empty
in `has_holiday()` to handle contracts like futures that don't
always set a `primaryExchange`.

Deats,
- extract `con: Contract` from `con_deats.contract` for reuse.
- use `con.primaryExchange or con.exchange` to ensure a valid
  exchange code is always passed to the calendar lookup.
- add `Contract` to `TYPE_CHECKING` imports.

(this commit msg was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
2026-03-06 09:22:36 -05:00
7 changed files with 170 additions and 61 deletions

View File

@ -768,26 +768,48 @@ class Client:
expiry: str = '', expiry: str = '',
front: bool = False, front: bool = False,
) -> Contract: ) -> Contract|list[Contract]:
''' '''
Get an unqualifed contract for the current "continous" Get an unqualifed contract for the current "continous"
future. future.
When input params result in a so called "ambiguous contract"
situation, we return the list of all matches provided by,
`IB.qualifyContractsAsync(..., returnAll=True)`
''' '''
# it's the "front" contract returned here # it's the "front" contract returned here
if front: if front:
con = (await self.ib.qualifyContractsAsync( cons = (
ContFuture(symbol, exchange=exchange) await self.ib.qualifyContractsAsync(
))[0] ContFuture(symbol, exchange=exchange),
returnAll=True,
)
)
else: else:
cons = (await self.ib.qualifyContractsAsync( cons = (
await self.ib.qualifyContractsAsync(
Future( Future(
symbol, symbol,
exchange=exchange, exchange=exchange,
lastTradeDateOrContractMonth=expiry, lastTradeDateOrContractMonth=expiry,
),
returnAll=True,
) )
)) )
con = cons[0] con = cons[0]
if isinstance(con, list):
log.warning(
f'{len(con)!r} futes cons matched for input params,\n'
f'symbol={symbol!r}\n'
f'exchange={exchange!r}\n'
f'expiry={expiry!r}\n'
f'\n'
f'cons:\n'
f'{con!r}\n'
)
return con return con
@ -912,11 +934,17 @@ class Client:
) )
exch = 'SMART' if not exch else exch exch = 'SMART' if not exch else exch
if isinstance(con, list):
contracts: list[Contract] = con
else:
contracts: list[Contract] = [con] contracts: list[Contract] = [con]
if qualify: if qualify:
try: try:
contracts: list[Contract] = ( contracts: list[Contract] = (
await self.ib.qualifyContractsAsync(con) await self.ib.qualifyContractsAsync(
*contracts
)
) )
except RequestError as err: except RequestError as err:
msg = err.message msg = err.message

View File

@ -1291,14 +1291,24 @@ async def deliver_trade_events(
case 'error': case 'error':
# NOTE: see impl deats in # NOTE: see impl deats in
# `Client.inline_errors()::push_err()` # `Client.inline_errors()::push_err()`
err: dict = item err: dict|str = item
# never relay errors for non-broker related issues # std case, never relay errors for non-order-control
# related issues.
# https://interactivebrokers.github.io/tws-api/message_codes.html # https://interactivebrokers.github.io/tws-api/message_codes.html
if isinstance(err, dict):
code: int = err['error_code'] code: int = err['error_code']
reason: str = err['reason'] reason: str = err['reason']
reqid: str = str(err['reqid']) reqid: str = str(err['reqid'])
# XXX, sometimes you'll get just a `str` of the form,
# '[code 104] connection failed' or something..
elif isinstance(err, str):
code_part, _, reason = err.rpartition(']')
if code_part:
_, _, code = code_part.partition('[code')
reqid: str = '<unknown>'
# "Warning:" msg codes, # "Warning:" msg codes,
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes # https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# - 2109: 'Outside Regular Trading Hours' # - 2109: 'Outside Regular Trading Hours'

View File

@ -201,6 +201,15 @@ async def open_history_client(
fqme, fqme,
timeframe, timeframe,
end_dt=end_dt, end_dt=end_dt,
# XXX WARNING, we don't actually use this inside
# `Client.bars()` since it isn't really supported,
# the API instead supports a "duration" of time style
# from the `end_dt` (or at least that was the best
# way to get it working sanely)..
#
# SO, with that in mind be aware that any downstream
# logic based on this may be mostly futile Xp
start_dt=start_dt, start_dt=start_dt,
) )
latency = time.time() - query_start latency = time.time() - query_start
@ -278,19 +287,27 @@ async def open_history_client(
trimmed_bars = bars_array[ trimmed_bars = bars_array[
bars_array['time'] >= start_dt.timestamp() bars_array['time'] >= start_dt.timestamp()
] ]
# XXX, should NEVER get HERE!
if trimmed_bars.size:
trimmed_first_dt: datetime = from_timestamp(trimmed_bars['time'][0])
if ( if (
trimmed_first_dt := from_timestamp(trimmed_bars['time'][0]) trimmed_first_dt
!= >=
start_dt start_dt
): ):
# TODO! rm this once we're more confident it never hits! msg: str = (
# breakpoint()
raise RuntimeError(
f'OHLC-bars array start is gt `start_dt` limit !!\n' f'OHLC-bars array start is gt `start_dt` limit !!\n'
f'start_dt: {start_dt}\n' f'start_dt: {start_dt}\n'
f'first_dt: {first_dt}\n' f'first_dt: {first_dt}\n'
f'trimmed_first_dt: {trimmed_first_dt}\n' f'trimmed_first_dt: {trimmed_first_dt}\n'
f'\n'
f'Delivering shorted frame of {trimmed_bars.size!r}\n'
) )
log.warning(msg)
# TODO! rm this once we're more confident it
# never breaks anything (in the caller)!
# breakpoint()
# raise RuntimeError(msg)
# XXX, overwrite with start_dt-limited frame # XXX, overwrite with start_dt-limited frame
bars_array = trimmed_bars bars_array = trimmed_bars

View File

@ -23,6 +23,7 @@ from contextlib import (
nullcontext, nullcontext,
) )
from decimal import Decimal from decimal import Decimal
from functools import partial
import time import time
from typing import ( from typing import (
Awaitable, Awaitable,
@ -32,6 +33,7 @@ from typing import (
from rapidfuzz import process as fuzzy from rapidfuzz import process as fuzzy
import ib_async as ibis import ib_async as ibis
import tractor import tractor
from tractor.devx.pformat import ppfmt
import trio import trio
from piker.accounting import ( from piker.accounting import (
@ -215,18 +217,19 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
f'{ib_client}\n' f'{ib_client}\n'
) )
last = time.time() last: float = time.time()
async for pattern in stream: async for pattern in stream:
log.info(f'received {pattern}') log.info(f'received {pattern}')
now: float = time.time() now: float = time.time()
# TODO? check this is no longer true?
# this causes tractor hang... # this causes tractor hang...
# assert 0 # assert 0
assert pattern, 'IB can not accept blank search pattern' assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz # throttle search requests to no faster then 1Hz
diff = now - last diff: float = now - last
if diff < 1.0: if diff < 1.0:
log.debug('throttle sleeping') log.debug('throttle sleeping')
await trio.sleep(diff) await trio.sleep(diff)
@ -237,11 +240,12 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
if ( if (
not pattern not pattern
or pattern.isspace() or
pattern.isspace()
or
# XXX: not sure if this is a bad assumption but it # XXX: not sure if this is a bad assumption but it
# seems to make search snappier? # seems to make search snappier?
or len(pattern) < 1 len(pattern) < 1
): ):
log.warning('empty pattern received, skipping..') log.warning('empty pattern received, skipping..')
@ -254,36 +258,58 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# XXX: this unblocks the far end search task which may # XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block # hold up a multi-search nursery block
await stream.send({}) await stream.send({})
continue continue
log.info(f'searching for {pattern}') log.info(
f'Searching for FQME with,\n'
f'pattern: {pattern!r}\n'
)
last = time.time() last: float = time.time()
# async batch search using api stocks endpoint and module # async batch search using api stocks endpoint and
# defined adhoc symbol set. # module defined adhoc symbol set.
stock_results = [] stock_results: list[dict] = []
async def extend_results( async def extend_results(
target: Awaitable[list] # ?TODO, how to type async-fn!?
target: Awaitable[list],
pattern: str,
**kwargs,
) -> None: ) -> None:
try: try:
results = await target results = await target(
pattern=pattern,
**kwargs,
)
client_repr: str = proxy._aio_ns.ib.client.__class__.__name__
meth_repr: str = target.keywords["meth"]
log.info(
f'Search query,\n'
f'{client_repr}.{meth_repr}(\n'
f' pattern={pattern!r}\n'
f' **kwargs={kwargs!r},\n'
f') = {ppfmt(list(results))}'
# XXX ^ just the keys since that's what
# shows in UI results table.
)
except tractor.trionics.Lagged: except tractor.trionics.Lagged:
print("IB SYM-SEARCH OVERRUN?!?") log.exception(
'IB SYM-SEARCH OVERRUN?!?\n'
)
return return
stock_results.extend(results) stock_results.extend(results)
for _ in range(10): for _ in range(10):
with trio.move_on_after(3) as cs: with trio.move_on_after(3) as cs:
async with trio.open_nursery() as sn: async with trio.open_nursery() as tn:
sn.start_soon( tn.start_soon(
partial(
extend_results, extend_results,
proxy.search_symbols(
pattern=pattern, pattern=pattern,
upto=5, target=proxy.search_symbols,
upto=10,
), ),
) )
@ -313,7 +339,9 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# adhoc_match_results = {i[0]: {} for i in # adhoc_match_results = {i[0]: {} for i in
# adhoc_matches} # adhoc_matches}
log.debug(f'fuzzy matching stocks {stock_results}') log.debug(
f'fuzzy matching stocks {ppfmt(stock_results)}'
)
stock_matches = fuzzy.extract( stock_matches = fuzzy.extract(
pattern, pattern,
stock_results, stock_results,
@ -327,7 +355,10 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# TODO: we used to deliver contract details # TODO: we used to deliver contract details
# {item[2]: item[0] for item in stock_matches} # {item[2]: item[0] for item in stock_matches}
log.debug(f"sending matches: {matches.keys()}") log.debug(
f'Sending final matches\n'
f'{matches.keys()}'
)
await stream.send(matches) await stream.send(matches)

View File

@ -43,6 +43,7 @@ from pendulum import (
if TYPE_CHECKING: if TYPE_CHECKING:
from ib_async import ( from ib_async import (
TradingSession, TradingSession,
Contract,
ContractDetails, ContractDetails,
) )
from exchange_calendars.exchange_calendars import ( from exchange_calendars.exchange_calendars import (
@ -82,7 +83,12 @@ def has_holiday(
''' '''
tz: str = con_deats.timeZoneId tz: str = con_deats.timeZoneId
exch: str = con_deats.contract.primaryExchange con: Contract = con_deats.contract
exch: str = (
con.primaryExchange
or
con.exchange
)
# XXX, ad-hoc handle any IB exchange which are non-std # XXX, ad-hoc handle any IB exchange which are non-std
# via lookup table.. # via lookup table..

View File

@ -248,10 +248,20 @@ async def maybe_fill_null_segments(
end_dt=end_dt, end_dt=end_dt,
) )
if array.size == 0:
log.warning(
f'Valid gap from backend ??\n'
f'{end_dt} -> {start_dt}\n'
)
# ?TODO? do we want to remove the nulls and push
# the close price here for the gap duration?
await tractor.pause()
break
if ( if (
frame_start_dt := ( frame_start_dt := (from_timestamp(array['time'][0]))
from_timestamp(array['time'][0]) <
) < backfill_until_dt backfill_until_dt
): ):
log.error( log.error(
f'Invalid frame_start !?\n' f'Invalid frame_start !?\n'
@ -613,10 +623,17 @@ async def start_backfill(
else: else:
log.warning( log.warning(
'0 BARS TO PUSH after diff!?\n' f'0 BARS TO PUSH after diff!?\n'
f'{next_start_dt} -> {last_start_dt}' f'{next_start_dt} -> {last_start_dt}'
f'\n'
f'This might mean we rxed a gap frame which starts BEFORE,\n'
f'backfill_until_dt: {backfill_until_dt}\n'
f'end_dt_param: {end_dt_param}\n'
) )
await tractor.pause() # XXX, to debug it and be sure.
# await tractor.pause()
break
# Check if we're about to exceed buffer capacity BEFORE # Check if we're about to exceed buffer capacity BEFORE
# attempting the push # attempting the push

View File

@ -34,6 +34,7 @@ import uuid
from bidict import bidict from bidict import bidict
import tractor import tractor
from tractor.devx.pformat import ppfmt
import trio import trio
from piker import config from piker import config
@ -1207,11 +1208,10 @@ async def process_trade_msg(
f'\n' f'\n'
f'=> CANCELLING ORDER DIALOG <=\n' f'=> CANCELLING ORDER DIALOG <=\n'
# from tractor.devx.pformat import ppfmt
# !TODO LOL, wtf the msg is causing # !TODO LOL, wtf the msg is causing
# a recursion bug! # a recursion bug!
# -[ ] get this shit on msgspec stat! # -[ ] get this shit on msgspec stat!
# f'{ppfmt(broker_msg)}' f'{ppfmt(broker_msg)}'
) )
# do all the things for a cancel: # do all the things for a cancel:
# - drop order-msg dialog from client table # - drop order-msg dialog from client table