From 456c6a55677febd0e61bf394a1acc835c1941223 Mon Sep 17 00:00:00 2001 From: goodboy Date: Wed, 10 Jun 2026 17:36:51 -0400 Subject: [PATCH] .ib.broker: lazily qualify contracts on order req Post (datad|brokerd)-split the trading actor's `Client._contracts` cache is never warmed by in-proc feed setup (that now happens in the `datad.ib` sibling) so ALL live submissions failed with "no live feed?" at `Client.submit_limit()`; `brokerd` must be able to submit orders without any feed registered in its own subactor. Deats, - thread the acct `proxies` table into `handle_order_requests()` and, on a `_contracts` cache-miss for the req's fqme, lazily run the same `get_mkt_info(fqme, proxy=...)` symbology ep the feed-side uses; it writes the `mkt.bs_fqme` key `submit_limit()` looks up (and warms `_cons2mkts` which the position-audit path also needs) on exactly the same aio `Client` instance. - guard `submit_limit()` w/ a try/except -> `BrokerdError` relay so a single bad submission degrades to an EMS error msg instead of crashing the dialog (and causing the `TrioTaskExited` teardown storm seen in testing). - fix the (non-f-string..) raise msg in `Client.submit_limit()` and doc the new lazy-qualify contract; the bug was foretold by the TODO in `.symbols.get_mkt_info()` B) (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code Co-Authored-By: Claude Fable 5 Prompt-IO: ai/prompt-io/claude/20260610T213549Z_f084e899_prompt_io.md --- .../20260610T213549Z_f084e899_prompt_io.md | 60 +++++++++++++++ ...20260610T213549Z_f084e899_prompt_io.raw.md | 57 ++++++++++++++ piker/brokers/ib/api.py | 14 +++- piker/brokers/ib/broker.py | 76 ++++++++++++++++--- 4 files changed, 193 insertions(+), 14 deletions(-) create mode 100644 ai/prompt-io/claude/20260610T213549Z_f084e899_prompt_io.md create mode 100644 ai/prompt-io/claude/20260610T213549Z_f084e899_prompt_io.raw.md diff --git a/ai/prompt-io/claude/20260610T213549Z_f084e899_prompt_io.md b/ai/prompt-io/claude/20260610T213549Z_f084e899_prompt_io.md new file mode 100644 index 00000000..f7a87871 --- /dev/null +++ b/ai/prompt-io/claude/20260610T213549Z_f084e899_prompt_io.md @@ -0,0 +1,60 @@ +--- +model: claude-fable-5[1m] +service: claude +session: 32d15f9a-b2d3-4c26-bdc9-190219141a25 +timestamp: 2026-06-10T21:35:49Z +git_ref: datad_service +diff_cmd: git log -1 -p --follow -- ai/prompt-io/claude/20260610T213549Z_f084e899_prompt_io.md +scope: code +substantive: true +raw_file: 20260610T213549Z_f084e899_prompt_io.raw.md +--- + +## Prompt + +User bug report from live `ib` testing of the +(datad|brokerd)-split branch: + +> ok doing some testing and noticing live orders do not +> work otb, pikerd show this on live submission, +> [pdb traceback: `RuntimeError("Can not order +> {symbol}, no live feed?")` at `ib/api.py:1152` +> `submit_limit()` with `self._contracts == {}` in the +> `brokerd.ib` actor + a `TrioTaskExited` teardown +> cascade] + +Follow-up user direction fixing the design intent: + +> i think the main thing here is that [brokerd] should +> be able to always submit orders without a live feed +> being up and registered in the same subactor right? + +## Response summary + +Root cause: pre-split the feed code populated the +actor-local `Client._contracts` cache (via +`get_mkt_info()` writing `mkt.bs_fqme` keys) in the +SAME process as order submission; post-split the +trading actor's client is never warmed. Fix: lazily +qualify + cache the contract per order request in +`handle_order_requests()` by running the same +`get_mkt_info(fqme, proxy=...)` ep the feed side uses, +plus per-order error relay (`BrokerdError`) so one bad +submission can't crash the whole trades dialog (the +`TrioTaskExited` storm was teardown cascade from the +original raise). + +## Files changed + +- `piker/brokers/ib/broker.py` — thread `proxies` into + `handle_order_requests()`; lazy contract qualify on + cache-miss; guard `submit_limit()` w/ `BrokerdError` + relay; uncomment the (anticipatory) `get_mkt_info` + import +- `piker/brokers/ib/api.py` — fix the non-f-string + raise msg + document the new qualification contract + +## Human edits + +None — committed as generated. Live `ib` order retest +performed by the human post-commit. diff --git a/ai/prompt-io/claude/20260610T213549Z_f084e899_prompt_io.raw.md b/ai/prompt-io/claude/20260610T213549Z_f084e899_prompt_io.raw.md new file mode 100644 index 00000000..cae0c2de --- /dev/null +++ b/ai/prompt-io/claude/20260610T213549Z_f084e899_prompt_io.raw.md @@ -0,0 +1,57 @@ +--- +model: claude-fable-5[1m] +service: claude +timestamp: 2026-06-10T21:35:49Z +git_ref: datad_service +diff_cmd: git log -1 -p --follow -- ai/prompt-io/claude/20260610T213549Z_f084e899_prompt_io.md +--- + +NOTE: diff-ref mode entry (code committed in the same +commit as this log); recorded from the live debug +session per the `/prompt-io` skill rules. + +> `git log -1 -p --follow -- piker/brokers/ib/broker.py` +> `git log -1 -p --follow -- piker/brokers/ib/api.py` + +Key diagnostic chain (from session): + +- pdb showed `Client._contracts == {}` inside + `brokerd.ib`'s `submit_limit()`; the cache has + exactly TWO write sites: `Client.find_contracts()` + (api.py, keys `f'{sym}.{exch}.ib'`) and + `symbols.get_mkt_info()` (key `mkt.bs_fqme`, eg. + 'nvda.nasdaq' — NO `.ib` suffix). +- `BrokerdOrder.symbol` arrives as the bs_fqme form + ('nvda.nasdaq') so ONLY the `get_mkt_info()` write + site produces the key `submit_limit()` reads — + ie. pre-split it was the feed's in-proc + `get_mkt_info(sym, proxy=proxy)` call keeping orders + working, NOT `find_contracts()`. +- the existing TODO at `symbols.py:642-644` literally + predicted this: "this is going to be problematic + if/when we split out the datad vs. brokerd actors + since the mktmap lookup table will now be + inaccessible.." +- instance identity verified: `proxy._aio_ns` IS the + same `Client` obj as `_accounts2clients[account]` + (both sourced from the `load_aio_clients()` cache via + `open_client_proxies()`), so a brokerd-side + `get_mkt_info(fqme, proxy=proxies[account])` warms + exactly the dict `submit_limit()` reads. It also + populates `client._cons2mkts` which the + position-audit path (`broker.py` backup-table code) + needs in this actor anyway. +- the `TrioTaskExited` storm in the user's log + (`recv_trade_updates`, `open_aio_client_method_relay` + aio tasks) is teardown cascade: the raise crashed + `handle_order_requests` -> nursery teardown ripped + the trio sides of still-running aio relay tasks. + Hence the added per-order try/except -> + `BrokerdError` relay hardening so a single bad + submission degrades to an EMS error msg instead of + killing the backend's entire order-ctl dialog. + +Verification: `tests/test_services.py` (5 passed) + +`tests/test_ems.py` (6 passed) regression-green; live +`ib` submission retest delegated to the human (needs a +running TWS/gw). diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 836afae2..7ac36c34 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -1146,10 +1146,16 @@ class Client: try: con: Contract = self._contracts[symbol] except KeyError: - # require that the symbol has been previously cached by - # a data feed request - ensure we aren't making orders - # against non-known prices. - raise RuntimeError("Can not order {symbol}, no live feed?") + # require that the mkt's contract has been previously + # qualified and cached (see `.symbols.get_mkt_info()` + # which is run for any feed-init OR lazily by the + # order-request handler in `.broker`) - ensures we + # aren't making orders against unknown contracts. + raise RuntimeError( + f'Can not order {symbol!r}, ' + f'no qualified contract cached!?\n' + f'_contracts: {list(self._contracts)!r}\n' + ) try: trade = self.ib.placeOrder( diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 3a283254..ecca911d 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -90,7 +90,7 @@ from .api import ( ) from .symbols import ( con2fqme, - # get_mkt_info, + get_mkt_info, ) from .ledger import ( norm_trade_records, @@ -138,6 +138,7 @@ async def handle_order_requests( ems_order_stream: tractor.MsgStream, accounts_def: dict[str, str], flows: OrderDialogs, + proxies: dict[str, MethodProxy], ) -> None: @@ -180,6 +181,41 @@ async def handle_order_requests( if action in {'buy', 'sell'}: # validate order = BrokerdOrder(**request_msg) + fqme: str = order.symbol + + # XXX: lazily qualify and cache the contract for + # this mkt since, post the (datad|brokerd)-split, + # this trading-only actor will NOT have had its + # (api) `Client._contracts` pre-warmed by any + # in-proc feed setup (which now runs in the + # `datad.ib` sibling); we run the SAME symbology + # resolution ep the feed-side uses so the cache + # key (`MktPair.bs_fqme`) matches what + # `Client.submit_limit()` looks up. + if fqme not in client._contracts: + proxy: MethodProxy = proxies[account] + try: + await get_mkt_info( + fqme, + proxy=proxy, + ) + except Exception as err: + log.exception( + f'Failed to qualify contract for\n' + f'fqme: {fqme!r}\n' + ) + await ems_order_stream.send( + BrokerdError( + oid=oid, + symbol=fqme, + reason=( + f'No contract could be ' + f'qualified for {fqme!r}:\n' + f'{err!r}' + ), + ) + ) + continue # XXX: by default 0 tells ``ib_async`` methods that # there is no existing order so ask the client to create @@ -191,15 +227,34 @@ async def handle_order_requests( reqid = int(reqid) # call our client api to submit the order - reqid = client.submit_limit( - oid=order.oid, - symbol=order.symbol, - price=order.price, - action=order.action, - size=order.size, - account=acct_number, - reqid=reqid, - ) + # NOTE: guard with order-error relay (vs. crashing + # this dialog and thus ALL order ctl for the + # backend) so one bad submission can't take down + # the daemon's clearing loop. + try: + reqid = client.submit_limit( + oid=order.oid, + symbol=fqme, + price=order.price, + action=order.action, + size=order.size, + account=acct_number, + reqid=reqid, + ) + except Exception as err: + log.exception( + f'Order submission failed for\n' + f'fqme: {fqme!r}\n' + ) + await ems_order_stream.send( + BrokerdError( + oid=oid, + symbol=fqme, + reason=f'Submission error: {err!r}', + ) + ) + continue + str_reqid: str = str(reqid) if reqid is None: err_msg = BrokerdError( @@ -801,6 +856,7 @@ async def open_trade_dialog( ems_stream, accounts_def, flows, + proxies, ) # allocate event relay tasks for each client connection