.ib.api: req-id correlate `MethodProxy` calls

The (single, shared) relay chan has NO ordering guarantee
when a caller task is cancelled (eg. by a search-req
timeout) after sending its request but before consuming the
response: the "orphaned" response gets mis-delivered to the
next caller, off-by-one skewing every result thereafter!

Deats,
- tag each request with a `mid` from an `itertools.count()`
  on the proxy and echo it back in both the result and
  exception resps from `open_aio_client_method_relay()`.
- drop (w/ a warning) any stale resp whose `mid` doesn't
  match the current caller's and keep waiting for ours.
- rewrite the resp-wait loop as a `match:` (resolving the
  old "py3.10 syntax" TODO) incl. the prior inline
  `('error', ...)`/`('event', ...)` relay cases.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code

Co-Authored-By: Claude Fable 5 <noreply@anthropic.com>
datad_service
Gud Boi 2026-06-10 20:19:41 -04:00
parent ba32f286b9
commit 0404e4230e
1 changed files with 64 additions and 33 deletions

View File

@ -1632,6 +1632,17 @@ class MethodProxy:
self.event_table = event_table
self._aio_ns = asyncio_ns
# request-id counter for correlating each method
# call to its (eventual) response; necessary since
# the (single, shared) chan has NO other ordering
# guarantee whenever a caller task is CANCELLED
# (eg. by a search-req timeout) after sending its
# request but before consuming the response: the
# "orphaned" response otherwise gets mis-delivered
# to the next caller causing an off-by-one skew of
# every result thereafter!
self._mids = itertools.count()
async def _run_method(
self,
*,
@ -1645,10 +1656,10 @@ class MethodProxy:
'''
chan = self.chan
await chan.send((meth, kwargs))
mid: int = next(self._mids)
await chan.send((meth, kwargs, mid))
while not chan.closed():
# send through method + ``kwargs: dict`` as pair
msg = await chan.receive()
# TODO: implement reconnect functionality like
@ -1658,37 +1669,51 @@ class MethodProxy:
# except ConnectionError:
# self.reset()
# print(f'NEXT MSG: {msg}')
match msg:
# OUR method-call response B)
case {'mid': resp_mid, 'result': res} if (
resp_mid == mid
):
return res
# TODO: py3.10 ``match:`` syntax B)
if 'result' in msg:
res = msg.get('result')
return res
case {'mid': resp_mid, 'exception': err} if (
resp_mid == mid
):
raise err
elif 'exception' in msg:
err = msg.get('exception')
raise err
# an "orphaned" response to some prior
# (cancelled) caller's request; drop it and
# keep waiting for ours.
case {'mid': resp_mid}:
log.warning(
f'Dropping stale method-resp,\n'
f'mid: {resp_mid} (ours: {mid})\n'
f'(a prior caller prolly got '
f'cancelled before its resp?)\n'
)
continue
elif 'error' in msg:
etype, emsg = msg
log.warning(f'IB error relay: {emsg}')
continue
# out-of-band (inline) client error: raise
# to the current caller as before.
case {'exception': err}:
raise err
# routine (api-farm conn) status events relayed
# inline by `Client.inline_errors()`; not a
# response to our method call so just log at
# info and keep waiting.
elif (
isinstance(msg, tuple)
and
msg[0] == 'event'
):
etype, emsg = msg
log.info(f'IB status event relay: {emsg}')
continue
case ('error', emsg):
log.warning(f'IB error relay: {emsg}')
continue
else:
log.warning(f'UNKNOWN IB MSG: {msg}')
# routine (api-farm conn) status events
# relayed inline by `Client.inline_errors()`;
# not a response to our method call so just
# log at info and keep waiting.
case ('event', emsg):
log.info(
f'IB status event relay: {emsg}'
)
continue
case _:
log.warning(f'UNKNOWN IB MSG: {msg}')
def status_event(
self,
@ -1741,14 +1766,17 @@ async def open_aio_client_method_relay(
log.info('asyncio `Client` method-proxy SHUTDOWN!')
break
case (meth_name, kwargs):
meth_name, kwargs = msg
case (meth_name, kwargs, mid):
meth = getattr(client, meth_name)
try:
resp = await meth(**kwargs)
# echo the msg back
chan.send_nowait({'result': resp})
# echo the msg back tagged with the
# req-id for caller-side correlation.
chan.send_nowait({
'mid': mid,
'result': resp,
})
# XXX: relay ALL (non-cancel) errors to the
# `trio`-side caller (which re-raises in the
@ -1759,7 +1787,10 @@ async def open_aio_client_method_relay(
# qualification failures are expected to be
# caught per-request by order/warmup code!
except Exception as err:
chan.send_nowait({'exception': err})
chan.send_nowait({
'mid': mid,
'exception': err,
})
case {'error': content}:
chan.send_nowait({'exception': content})