Compare commits

..

No commits in common. "f2ace1b63b9a4ee5cceefe45b6d892271f0bc8d3" and "ee09f519a925f1c28819ba146ae7a086d7006d92" have entirely different histories.

7 changed files with 60 additions and 169 deletions

View File

@ -768,48 +768,26 @@ class Client:
expiry: str = '',
front: bool = False,
) -> Contract|list[Contract]:
) -> Contract:
'''
Get an unqualifed contract for the current "continous"
future.
When input params result in a so called "ambiguous contract"
situation, we return the list of all matches provided by,
`IB.qualifyContractsAsync(..., returnAll=True)`
'''
# it's the "front" contract returned here
if front:
cons = (
await self.ib.qualifyContractsAsync(
ContFuture(symbol, exchange=exchange),
returnAll=True,
)
)
con = (await self.ib.qualifyContractsAsync(
ContFuture(symbol, exchange=exchange)
))[0]
else:
cons = (
await self.ib.qualifyContractsAsync(
cons = (await self.ib.qualifyContractsAsync(
Future(
symbol,
exchange=exchange,
lastTradeDateOrContractMonth=expiry,
),
returnAll=True,
)
)
))
con = cons[0]
if isinstance(con, list):
log.warning(
f'{len(con)!r} futes cons matched for input params,\n'
f'symbol={symbol!r}\n'
f'exchange={exchange!r}\n'
f'expiry={expiry!r}\n'
f'\n'
f'cons:\n'
f'{con!r}\n'
)
return con
@ -934,17 +912,11 @@ class Client:
)
exch = 'SMART' if not exch else exch
if isinstance(con, list):
contracts: list[Contract] = con
else:
contracts: list[Contract] = [con]
if qualify:
try:
contracts: list[Contract] = (
await self.ib.qualifyContractsAsync(
*contracts
)
await self.ib.qualifyContractsAsync(con)
)
except RequestError as err:
msg = err.message

View File

@ -1291,24 +1291,14 @@ async def deliver_trade_events(
case 'error':
# NOTE: see impl deats in
# `Client.inline_errors()::push_err()`
err: dict|str = item
err: dict = item
# std case, never relay errors for non-order-control
# related issues.
# never relay errors for non-broker related issues
# https://interactivebrokers.github.io/tws-api/message_codes.html
if isinstance(err, dict):
code: int = err['error_code']
reason: str = err['reason']
reqid: str = str(err['reqid'])
# XXX, sometimes you'll get just a `str` of the form,
# '[code 104] connection failed' or something..
elif isinstance(err, str):
code_part, _, reason = err.rpartition(']')
if code_part:
_, _, code = code_part.partition('[code')
reqid: str = '<unknown>'
# "Warning:" msg codes,
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# - 2109: 'Outside Regular Trading Hours'

View File

@ -201,15 +201,6 @@ async def open_history_client(
fqme,
timeframe,
end_dt=end_dt,
# XXX WARNING, we don't actually use this inside
# `Client.bars()` since it isn't really supported,
# the API instead supports a "duration" of time style
# from the `end_dt` (or at least that was the best
# way to get it working sanely)..
#
# SO, with that in mind be aware that any downstream
# logic based on this may be mostly futile Xp
start_dt=start_dt,
)
latency = time.time() - query_start
@ -287,27 +278,19 @@ async def open_history_client(
trimmed_bars = bars_array[
bars_array['time'] >= start_dt.timestamp()
]
# XXX, should NEVER get HERE!
if trimmed_bars.size:
trimmed_first_dt: datetime = from_timestamp(trimmed_bars['time'][0])
if (
trimmed_first_dt
>=
trimmed_first_dt := from_timestamp(trimmed_bars['time'][0])
!=
start_dt
):
msg: str = (
# TODO! rm this once we're more confident it never hits!
# breakpoint()
raise RuntimeError(
f'OHLC-bars array start is gt `start_dt` limit !!\n'
f'start_dt: {start_dt}\n'
f'first_dt: {first_dt}\n'
f'trimmed_first_dt: {trimmed_first_dt}\n'
f'\n'
f'Delivering shorted frame of {trimmed_bars.size!r}\n'
)
log.warning(msg)
# TODO! rm this once we're more confident it
# never breaks anything (in the caller)!
# breakpoint()
# raise RuntimeError(msg)
# XXX, overwrite with start_dt-limited frame
bars_array = trimmed_bars

View File

@ -23,7 +23,6 @@ from contextlib import (
nullcontext,
)
from decimal import Decimal
from functools import partial
import time
from typing import (
Awaitable,
@ -33,7 +32,6 @@ from typing import (
from rapidfuzz import process as fuzzy
import ib_async as ibis
import tractor
from tractor.devx.pformat import ppfmt
import trio
from piker.accounting import (
@ -217,19 +215,18 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
f'{ib_client}\n'
)
last: float = time.time()
last = time.time()
async for pattern in stream:
log.info(f'received {pattern}')
now: float = time.time()
# TODO? check this is no longer true?
# this causes tractor hang...
# assert 0
assert pattern, 'IB can not accept blank search pattern'
# throttle search requests to no faster then 1Hz
diff: float = now - last
diff = now - last
if diff < 1.0:
log.debug('throttle sleeping')
await trio.sleep(diff)
@ -240,12 +237,11 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
if (
not pattern
or
pattern.isspace()
or
or pattern.isspace()
# XXX: not sure if this is a bad assumption but it
# seems to make search snappier?
len(pattern) < 1
or len(pattern) < 1
):
log.warning('empty pattern received, skipping..')
@ -258,58 +254,36 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# XXX: this unblocks the far end search task which may
# hold up a multi-search nursery block
await stream.send({})
continue
log.info(
f'Searching for FQME with,\n'
f'pattern: {pattern!r}\n'
)
log.info(f'searching for {pattern}')
last: float = time.time()
last = time.time()
# async batch search using api stocks endpoint and
# module defined adhoc symbol set.
stock_results: list[dict] = []
# async batch search using api stocks endpoint and module
# defined adhoc symbol set.
stock_results = []
async def extend_results(
# ?TODO, how to type async-fn!?
target: Awaitable[list],
pattern: str,
**kwargs,
target: Awaitable[list]
) -> None:
try:
results = await target(
pattern=pattern,
**kwargs,
)
client_repr: str = proxy._aio_ns.ib.client.__class__.__name__
meth_repr: str = target.keywords["meth"]
log.info(
f'Search query,\n'
f'{client_repr}.{meth_repr}(\n'
f' pattern={pattern!r}\n'
f' **kwargs={kwargs!r},\n'
f') = {ppfmt(list(results))}'
# XXX ^ just the keys since that's what
# shows in UI results table.
)
results = await target
except tractor.trionics.Lagged:
log.exception(
'IB SYM-SEARCH OVERRUN?!?\n'
)
print("IB SYM-SEARCH OVERRUN?!?")
return
stock_results.extend(results)
for _ in range(10):
with trio.move_on_after(3) as cs:
async with trio.open_nursery() as tn:
tn.start_soon(
partial(
async with trio.open_nursery() as sn:
sn.start_soon(
extend_results,
proxy.search_symbols(
pattern=pattern,
target=proxy.search_symbols,
upto=10,
upto=5,
),
)
@ -339,9 +313,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# adhoc_match_results = {i[0]: {} for i in
# adhoc_matches}
log.debug(
f'fuzzy matching stocks {ppfmt(stock_results)}'
)
log.debug(f'fuzzy matching stocks {stock_results}')
stock_matches = fuzzy.extract(
pattern,
stock_results,
@ -355,10 +327,7 @@ async def open_symbol_search(ctx: tractor.Context) -> None:
# TODO: we used to deliver contract details
# {item[2]: item[0] for item in stock_matches}
log.debug(
f'Sending final matches\n'
f'{matches.keys()}'
)
log.debug(f"sending matches: {matches.keys()}")
await stream.send(matches)

View File

@ -43,7 +43,6 @@ from pendulum import (
if TYPE_CHECKING:
from ib_async import (
TradingSession,
Contract,
ContractDetails,
)
from exchange_calendars.exchange_calendars import (
@ -83,12 +82,7 @@ def has_holiday(
'''
tz: str = con_deats.timeZoneId
con: Contract = con_deats.contract
exch: str = (
con.primaryExchange
or
con.exchange
)
exch: str = con_deats.contract.primaryExchange
# XXX, ad-hoc handle any IB exchange which are non-std
# via lookup table..

View File

@ -248,20 +248,10 @@ async def maybe_fill_null_segments(
end_dt=end_dt,
)
if array.size == 0:
log.warning(
f'Valid gap from backend ??\n'
f'{end_dt} -> {start_dt}\n'
)
# ?TODO? do we want to remove the nulls and push
# the close price here for the gap duration?
await tractor.pause()
break
if (
frame_start_dt := (from_timestamp(array['time'][0]))
<
backfill_until_dt
frame_start_dt := (
from_timestamp(array['time'][0])
) < backfill_until_dt
):
log.error(
f'Invalid frame_start !?\n'
@ -623,17 +613,10 @@ async def start_backfill(
else:
log.warning(
f'0 BARS TO PUSH after diff!?\n'
'0 BARS TO PUSH after diff!?\n'
f'{next_start_dt} -> {last_start_dt}'
f'\n'
f'This might mean we rxed a gap frame which starts BEFORE,\n'
f'backfill_until_dt: {backfill_until_dt}\n'
f'end_dt_param: {end_dt_param}\n'
)
# XXX, to debug it and be sure.
# await tractor.pause()
break
await tractor.pause()
# Check if we're about to exceed buffer capacity BEFORE
# attempting the push

View File

@ -34,7 +34,6 @@ import uuid
from bidict import bidict
import tractor
from tractor.devx.pformat import ppfmt
import trio
from piker import config
@ -1208,10 +1207,11 @@ async def process_trade_msg(
f'\n'
f'=> CANCELLING ORDER DIALOG <=\n'
# from tractor.devx.pformat import ppfmt
# !TODO LOL, wtf the msg is causing
# a recursion bug!
# -[ ] get this shit on msgspec stat!
f'{ppfmt(broker_msg)}'
# f'{ppfmt(broker_msg)}'
)
# do all the things for a cancel:
# - drop order-msg dialog from client table