.ib.broker: eagerly pre-qualify pp/order mkts
Follow-up to the lazy order-req qualify: pre-qualify (and
cache) contracts for ALL already-open pps and orders during
`open_trade_dialog()` startup so live submissions NEVER pay
a first-request qualification delay; the warmup runs before
the order handler task starts so early reqs just buffer in
the ems IPC stream. Any brand-new mkt still lazily
qualifies on its first submission.
Deats,
- factor the `Client` lookup-table writes out of
`.symbols.get_mkt_info()` into a new `cache_contract()`
helper which now ALSO keys `_contracts` by `mkt.fqme`
(read by the fill-time pp-update path in
`emit_pp_update()`) alongside `mkt.bs_fqme` (read by
`Client.submit_limit()`); resolves the old "post-split
mktmap lookup" TODO.
- explicitly `cache_contract()` in
`handle_order_requests()` since the lifo-cache may
deliver a hit (body skipped) when another acct/client
already qualified the fqme.
- filter `None` entries (ambiguous contracts) from
`qualifyContractsAsync()` results in
`Client.find_contracts()` before any attr access + raise
a better "use a (more) venue-qualified fqme" error msg.
- relay ALL (non-cancel) errors from the aio method-relay
task to the `trio`-side caller instead of crashing the
whole proxy/dialog; critical post-`datad`-split where eg.
qualification failures are expected to be caught
per-request by order/warmup code.
- handle inline `('event', ...)` api-farm status msgs in
`MethodProxy._run_method()` at info-level instead of the
"UNKNOWN IB MSG" warning.
- only `log.setLevel()` in `open_trade_dialog()`;
attaching a handler via `get_console_log()` double-prints
every record since the daemon fixture already enables the
console handler on the parent subsys logger.
(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
datad_service
parent
456c6a5567
commit
b0766764f0
|
|
@ -967,8 +967,26 @@ class Client:
|
|||
|
||||
else:
|
||||
raise
|
||||
# XXX: ambiguous/unqualifiable contracts are
|
||||
# returned as `None` entries by
|
||||
# `qualifyContractsAsync()` (which also logs an
|
||||
# "Ambiguous contract" warning listing the
|
||||
# possible matches) so filter them BEFORE any
|
||||
# attr access B)
|
||||
contracts: list[Contract] = [
|
||||
tract
|
||||
for tract in contracts
|
||||
if tract is not None
|
||||
]
|
||||
if not contracts:
|
||||
raise ValueError(f"No contract could be found {con}")
|
||||
raise ValueError(
|
||||
f'No (unambiguous) contract could be '
|
||||
f'qualified for {con!r}\n'
|
||||
f'\n'
|
||||
f'If a stonk, you likely need a (more) '
|
||||
f'venue-qualified fqme,\n'
|
||||
f"eg. 'gld.arca.ib' instead of 'gld.ib'\n"
|
||||
)
|
||||
|
||||
# pack all contracts into cache
|
||||
for tract in contracts:
|
||||
|
|
@ -1656,6 +1674,19 @@ class MethodProxy:
|
|||
log.warning(f'IB error relay: {emsg}')
|
||||
continue
|
||||
|
||||
# routine (api-farm conn) status events relayed
|
||||
# inline by `Client.inline_errors()`; not a
|
||||
# response to our method call so just log at
|
||||
# info and keep waiting.
|
||||
elif (
|
||||
isinstance(msg, tuple)
|
||||
and
|
||||
msg[0] == 'event'
|
||||
):
|
||||
etype, emsg = msg
|
||||
log.info(f'IB status event relay: {emsg}')
|
||||
continue
|
||||
|
||||
else:
|
||||
log.warning(f'UNKNOWN IB MSG: {msg}')
|
||||
|
||||
|
|
@ -1719,12 +1750,15 @@ async def open_aio_client_method_relay(
|
|||
# echo the msg back
|
||||
chan.send_nowait({'result': resp})
|
||||
|
||||
except (
|
||||
RequestError,
|
||||
|
||||
# TODO: relay all errors to trio?
|
||||
# BaseException,
|
||||
) as err:
|
||||
# XXX: relay ALL (non-cancel) errors to the
|
||||
# `trio`-side caller (which re-raises in the
|
||||
# `MethodProxy._run_method()` frame) instead
|
||||
# of crashing this relay task and thus the
|
||||
# whole proxy/dialog; critical post the
|
||||
# (datad|brokerd)-split where eg. contract
|
||||
# qualification failures are expected to be
|
||||
# caught per-request by order/warmup code!
|
||||
except Exception as err:
|
||||
chan.send_nowait({'exception': err})
|
||||
|
||||
case {'error': content}:
|
||||
|
|
|
|||
|
|
@ -89,6 +89,7 @@ from .api import (
|
|||
MethodProxy,
|
||||
)
|
||||
from .symbols import (
|
||||
cache_contract,
|
||||
con2fqme,
|
||||
get_mkt_info,
|
||||
)
|
||||
|
|
@ -192,13 +193,30 @@ async def handle_order_requests(
|
|||
# resolution ep the feed-side uses so the cache
|
||||
# key (`MktPair.bs_fqme`) matches what
|
||||
# `Client.submit_limit()` looks up.
|
||||
# NOTE: normally a no-op since
|
||||
# `open_trade_dialog()` eagerly pre-qualifies all
|
||||
# already-open pp/order mkts at startup; this
|
||||
# only fires for a brand-new (to this daemon)
|
||||
# mkt's first order.
|
||||
if fqme not in client._contracts:
|
||||
proxy: MethodProxy = proxies[account]
|
||||
try:
|
||||
await get_mkt_info(
|
||||
(
|
||||
mkt,
|
||||
details,
|
||||
) = await get_mkt_info(
|
||||
fqme,
|
||||
proxy=proxy,
|
||||
)
|
||||
# XXX: explicit write since the lifo-cache
|
||||
# may deliver a hit (body skipped) when
|
||||
# another acct/client already qualified
|
||||
# this fqme.
|
||||
cache_contract(
|
||||
client,
|
||||
mkt,
|
||||
details.contract,
|
||||
)
|
||||
except Exception as err:
|
||||
log.exception(
|
||||
f'Failed to qualify contract for\n'
|
||||
|
|
@ -603,10 +621,14 @@ async def open_trade_dialog(
|
|||
|
||||
) -> AsyncIterator[dict[str, Any]]:
|
||||
|
||||
get_console_log(
|
||||
level=loglevel,
|
||||
name=__name__,
|
||||
)
|
||||
# XXX: ONLY adjust the level of this (sub)mod's logger;
|
||||
# attaching a (stderr) handler (via `get_console_log()`)
|
||||
# here would DOUBLE-print every record since the daemon
|
||||
# fixture (`.._daemon._setup_persistent_brokerd()`)
|
||||
# already enables the console handler on the parent
|
||||
# subsys logger which all our records propagate to!
|
||||
if loglevel:
|
||||
log.setLevel(loglevel.upper())
|
||||
|
||||
# task local msg dialog tracking
|
||||
flows = OrderDialogs()
|
||||
|
|
@ -843,6 +865,64 @@ async def open_trade_dialog(
|
|||
for msg in order_msgs:
|
||||
await ems_stream.send(msg)
|
||||
|
||||
# XXX: eagerly pre-qualify (and cache) the
|
||||
# contracts for all already-open pps and
|
||||
# orders so that (live) order submission
|
||||
# NEVER pays a first-request qualification
|
||||
# delay; any brand-new mkt is still lazily
|
||||
# qualified by `handle_order_requests()` on
|
||||
# its first submission. NOTE: since this
|
||||
# runs BEFORE the order handler task is
|
||||
# even started, no order can clear until
|
||||
# the warmup completes (early reqs just
|
||||
# buffer in the ems IPC stream) B)
|
||||
warmup_fqmes: set[str] = {
|
||||
msg.symbol
|
||||
for msg in all_positions
|
||||
}
|
||||
warmup_fqmes.update(
|
||||
msg.req.symbol
|
||||
for msg in order_msgs
|
||||
)
|
||||
unique_clients: set[Client] = set(
|
||||
aioclients.values()
|
||||
)
|
||||
if (
|
||||
warmup_fqmes
|
||||
and
|
||||
proxies
|
||||
):
|
||||
a_proxy: MethodProxy = next(
|
||||
iter(proxies.values())
|
||||
)
|
||||
for fqme in warmup_fqmes:
|
||||
try:
|
||||
(
|
||||
mkt,
|
||||
details,
|
||||
) = await get_mkt_info(
|
||||
fqme,
|
||||
proxy=a_proxy,
|
||||
)
|
||||
except Exception as err:
|
||||
# XXX: non-fatal; an
|
||||
# un-warmed mkt just falls
|
||||
# back to the lazy qualify
|
||||
# in the order handler.
|
||||
log.warning(
|
||||
f'Failed to pre-qualify\n'
|
||||
f'fqme: {fqme!r}\n'
|
||||
f'err: {err!r}\n'
|
||||
)
|
||||
continue
|
||||
|
||||
for _client in unique_clients:
|
||||
cache_contract(
|
||||
_client,
|
||||
mkt,
|
||||
details.contract,
|
||||
)
|
||||
|
||||
for client in set(aioclients.values()):
|
||||
trade_event_stream: LinkedTaskChannel = await tn.start(
|
||||
open_trade_event_stream,
|
||||
|
|
|
|||
|
|
@ -639,12 +639,40 @@ async def get_mkt_info(
|
|||
|
||||
# if possible register the bs_mktid to the just-built
|
||||
# mkt so that it can be retreived by order mode tasks later.
|
||||
# TODO NOTE: this is going to be problematic if/when we split
|
||||
# out the datatd vs. brokerd actors since the mktmap lookup
|
||||
# table will now be inaccessible..
|
||||
if proxy is not None:
|
||||
client: Client = proxy._aio_ns
|
||||
client._contracts[mkt.bs_fqme] = con
|
||||
client._cons2mkts[con] = mkt
|
||||
cache_contract(
|
||||
proxy._aio_ns,
|
||||
mkt,
|
||||
con,
|
||||
)
|
||||
|
||||
return mkt, details
|
||||
|
||||
|
||||
def cache_contract(
|
||||
client: Client,
|
||||
mkt: MktPair,
|
||||
con: ibis.Contract,
|
||||
|
||||
) -> None:
|
||||
'''
|
||||
Register a (qualified) contract + mkt-info pair on the
|
||||
given (api) `Client`'s actor-local lookup tables.
|
||||
|
||||
Cached under BOTH fqme key-forms since consumers vary:
|
||||
- `mkt.bs_fqme` (eg. 'nvda.nasdaq'): read by
|
||||
`Client.submit_limit()` for order requests,
|
||||
- `mkt.fqme` (eg. 'nvda.nasdaq.ib'): read by the
|
||||
fill-time pp-update (symcache-backup-table) path in
|
||||
`.broker.emit_pp_update()`.
|
||||
|
||||
NOTE: post the (datad|brokerd)-actor-split this MUST be
|
||||
run (in the trading actor) for every mkt either eagerly
|
||||
at `.broker.open_trade_dialog()` startup or lazily per
|
||||
order request; there is no in-proc feed setup doing it
|
||||
implicitly anymore!
|
||||
|
||||
'''
|
||||
client._contracts[mkt.bs_fqme] = con
|
||||
client._contracts[mkt.fqme] = con
|
||||
client._cons2mkts[con] = mkt
|
||||
|
|
|
|||
Loading…
Reference in New Issue