From b0766764f03b921b1fe05bbe8586db698c8d2c03 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 10 Jun 2026 20:09:27 -0400 Subject: [PATCH] .ib.broker: eagerly pre-qualify pp/order mkts Follow-up to the lazy order-req qualify: pre-qualify (and cache) contracts for ALL already-open pps and orders during `open_trade_dialog()` startup so live submissions NEVER pay a first-request qualification delay; the warmup runs before the order handler task starts so early reqs just buffer in the ems IPC stream. Any brand-new mkt still lazily qualifies on its first submission. Deats, - factor the `Client` lookup-table writes out of `.symbols.get_mkt_info()` into a new `cache_contract()` helper which now ALSO keys `_contracts` by `mkt.fqme` (read by the fill-time pp-update path in `emit_pp_update()`) alongside `mkt.bs_fqme` (read by `Client.submit_limit()`); resolves the old "post-split mktmap lookup" TODO. - explicitly `cache_contract()` in `handle_order_requests()` since the lifo-cache may deliver a hit (body skipped) when another acct/client already qualified the fqme. - filter `None` entries (ambiguous contracts) from `qualifyContractsAsync()` results in `Client.find_contracts()` before any attr access + raise a better "use a (more) venue-qualified fqme" error msg. - relay ALL (non-cancel) errors from the aio method-relay task to the `trio`-side caller instead of crashing the whole proxy/dialog; critical post-`datad`-split where eg. qualification failures are expected to be caught per-request by order/warmup code. - handle inline `('event', ...)` api-farm status msgs in `MethodProxy._run_method()` at info-level instead of the "UNKNOWN IB MSG" warning. - only `log.setLevel()` in `open_trade_dialog()`; attaching a handler via `get_console_log()` double-prints every record since the daemon fixture already enables the console handler on the parent subsys logger. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code Co-Authored-By: Claude Fable 5 --- piker/brokers/ib/api.py | 48 +++++++++++++++++--- piker/brokers/ib/broker.py | 90 ++++++++++++++++++++++++++++++++++--- piker/brokers/ib/symbols.py | 40 ++++++++++++++--- 3 files changed, 160 insertions(+), 18 deletions(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 7ac36c34..63931bbd 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -967,8 +967,26 @@ class Client: else: raise + # XXX: ambiguous/unqualifiable contracts are + # returned as `None` entries by + # `qualifyContractsAsync()` (which also logs an + # "Ambiguous contract" warning listing the + # possible matches) so filter them BEFORE any + # attr access B) + contracts: list[Contract] = [ + tract + for tract in contracts + if tract is not None + ] if not contracts: - raise ValueError(f"No contract could be found {con}") + raise ValueError( + f'No (unambiguous) contract could be ' + f'qualified for {con!r}\n' + f'\n' + f'If a stonk, you likely need a (more) ' + f'venue-qualified fqme,\n' + f"eg. 'gld.arca.ib' instead of 'gld.ib'\n" + ) # pack all contracts into cache for tract in contracts: @@ -1656,6 +1674,19 @@ class MethodProxy: log.warning(f'IB error relay: {emsg}') continue + # routine (api-farm conn) status events relayed + # inline by `Client.inline_errors()`; not a + # response to our method call so just log at + # info and keep waiting. + elif ( + isinstance(msg, tuple) + and + msg[0] == 'event' + ): + etype, emsg = msg + log.info(f'IB status event relay: {emsg}') + continue + else: log.warning(f'UNKNOWN IB MSG: {msg}') @@ -1719,12 +1750,15 @@ async def open_aio_client_method_relay( # echo the msg back chan.send_nowait({'result': resp}) - except ( - RequestError, - - # TODO: relay all errors to trio? - # BaseException, - ) as err: + # XXX: relay ALL (non-cancel) errors to the + # `trio`-side caller (which re-raises in the + # `MethodProxy._run_method()` frame) instead + # of crashing this relay task and thus the + # whole proxy/dialog; critical post the + # (datad|brokerd)-split where eg. contract + # qualification failures are expected to be + # caught per-request by order/warmup code! + except Exception as err: chan.send_nowait({'exception': err}) case {'error': content}: diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index ecca911d..7f788e0e 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -89,6 +89,7 @@ from .api import ( MethodProxy, ) from .symbols import ( + cache_contract, con2fqme, get_mkt_info, ) @@ -192,13 +193,30 @@ async def handle_order_requests( # resolution ep the feed-side uses so the cache # key (`MktPair.bs_fqme`) matches what # `Client.submit_limit()` looks up. + # NOTE: normally a no-op since + # `open_trade_dialog()` eagerly pre-qualifies all + # already-open pp/order mkts at startup; this + # only fires for a brand-new (to this daemon) + # mkt's first order. if fqme not in client._contracts: proxy: MethodProxy = proxies[account] try: - await get_mkt_info( + ( + mkt, + details, + ) = await get_mkt_info( fqme, proxy=proxy, ) + # XXX: explicit write since the lifo-cache + # may deliver a hit (body skipped) when + # another acct/client already qualified + # this fqme. + cache_contract( + client, + mkt, + details.contract, + ) except Exception as err: log.exception( f'Failed to qualify contract for\n' @@ -603,10 +621,14 @@ async def open_trade_dialog( ) -> AsyncIterator[dict[str, Any]]: - get_console_log( - level=loglevel, - name=__name__, - ) + # XXX: ONLY adjust the level of this (sub)mod's logger; + # attaching a (stderr) handler (via `get_console_log()`) + # here would DOUBLE-print every record since the daemon + # fixture (`.._daemon._setup_persistent_brokerd()`) + # already enables the console handler on the parent + # subsys logger which all our records propagate to! + if loglevel: + log.setLevel(loglevel.upper()) # task local msg dialog tracking flows = OrderDialogs() @@ -843,6 +865,64 @@ async def open_trade_dialog( for msg in order_msgs: await ems_stream.send(msg) + # XXX: eagerly pre-qualify (and cache) the + # contracts for all already-open pps and + # orders so that (live) order submission + # NEVER pays a first-request qualification + # delay; any brand-new mkt is still lazily + # qualified by `handle_order_requests()` on + # its first submission. NOTE: since this + # runs BEFORE the order handler task is + # even started, no order can clear until + # the warmup completes (early reqs just + # buffer in the ems IPC stream) B) + warmup_fqmes: set[str] = { + msg.symbol + for msg in all_positions + } + warmup_fqmes.update( + msg.req.symbol + for msg in order_msgs + ) + unique_clients: set[Client] = set( + aioclients.values() + ) + if ( + warmup_fqmes + and + proxies + ): + a_proxy: MethodProxy = next( + iter(proxies.values()) + ) + for fqme in warmup_fqmes: + try: + ( + mkt, + details, + ) = await get_mkt_info( + fqme, + proxy=a_proxy, + ) + except Exception as err: + # XXX: non-fatal; an + # un-warmed mkt just falls + # back to the lazy qualify + # in the order handler. + log.warning( + f'Failed to pre-qualify\n' + f'fqme: {fqme!r}\n' + f'err: {err!r}\n' + ) + continue + + for _client in unique_clients: + cache_contract( + _client, + mkt, + details.contract, + ) + for client in set(aioclients.values()): trade_event_stream: LinkedTaskChannel = await tn.start( open_trade_event_stream, diff --git a/piker/brokers/ib/symbols.py b/piker/brokers/ib/symbols.py index d74f494d..2d066a76 100644 --- a/piker/brokers/ib/symbols.py +++ b/piker/brokers/ib/symbols.py @@ -639,12 +639,40 @@ async def get_mkt_info( # if possible register the bs_mktid to the just-built # mkt so that it can be retreived by order mode tasks later. - # TODO NOTE: this is going to be problematic if/when we split - # out the datatd vs. brokerd actors since the mktmap lookup - # table will now be inaccessible.. if proxy is not None: - client: Client = proxy._aio_ns - client._contracts[mkt.bs_fqme] = con - client._cons2mkts[con] = mkt + cache_contract( + proxy._aio_ns, + mkt, + con, + ) return mkt, details + + +def cache_contract( + client: Client, + mkt: MktPair, + con: ibis.Contract, + +) -> None: + ''' + Register a (qualified) contract + mkt-info pair on the + given (api) `Client`'s actor-local lookup tables. + + Cached under BOTH fqme key-forms since consumers vary: + - `mkt.bs_fqme` (eg. 'nvda.nasdaq'): read by + `Client.submit_limit()` for order requests, + - `mkt.fqme` (eg. 'nvda.nasdaq.ib'): read by the + fill-time pp-update (symcache-backup-table) path in + `.broker.emit_pp_update()`. + + NOTE: post the (datad|brokerd)-actor-split this MUST be + run (in the trading actor) for every mkt either eagerly + at `.broker.open_trade_dialog()` startup or lazily per + order request; there is no in-proc feed setup doing it + implicitly anymore! + + ''' + client._contracts[mkt.bs_fqme] = con + client._contracts[mkt.fqme] = con + client._cons2mkts[con] = mkt