diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 63931bbd..44d35609 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -1632,6 +1632,17 @@ class MethodProxy: self.event_table = event_table self._aio_ns = asyncio_ns + # request-id counter for correlating each method + # call to its (eventual) response; necessary since + # the (single, shared) chan has NO other ordering + # guarantee whenever a caller task is CANCELLED + # (eg. by a search-req timeout) after sending its + # request but before consuming the response: the + # "orphaned" response otherwise gets mis-delivered + # to the next caller causing an off-by-one skew of + # every result thereafter! + self._mids = itertools.count() + async def _run_method( self, *, @@ -1645,10 +1656,10 @@ class MethodProxy: ''' chan = self.chan - await chan.send((meth, kwargs)) + mid: int = next(self._mids) + await chan.send((meth, kwargs, mid)) while not chan.closed(): - # send through method + ``kwargs: dict`` as pair msg = await chan.receive() # TODO: implement reconnect functionality like @@ -1658,37 +1669,51 @@ class MethodProxy: # except ConnectionError: # self.reset() - # print(f'NEXT MSG: {msg}') + match msg: + # OUR method-call response B) + case {'mid': resp_mid, 'result': res} if ( + resp_mid == mid + ): + return res - # TODO: py3.10 ``match:`` syntax B) - if 'result' in msg: - res = msg.get('result') - return res + case {'mid': resp_mid, 'exception': err} if ( + resp_mid == mid + ): + raise err - elif 'exception' in msg: - err = msg.get('exception') - raise err + # an "orphaned" response to some prior + # (cancelled) caller's request; drop it and + # keep waiting for ours. + case {'mid': resp_mid}: + log.warning( + f'Dropping stale method-resp,\n' + f'mid: {resp_mid} (ours: {mid})\n' + f'(a prior caller prolly got ' + f'cancelled before its resp?)\n' + ) + continue - elif 'error' in msg: - etype, emsg = msg - log.warning(f'IB error relay: {emsg}') - continue + # out-of-band (inline) client error: raise + # to the current caller as before. + case {'exception': err}: + raise err - # routine (api-farm conn) status events relayed - # inline by `Client.inline_errors()`; not a - # response to our method call so just log at - # info and keep waiting. - elif ( - isinstance(msg, tuple) - and - msg[0] == 'event' - ): - etype, emsg = msg - log.info(f'IB status event relay: {emsg}') - continue + case ('error', emsg): + log.warning(f'IB error relay: {emsg}') + continue - else: - log.warning(f'UNKNOWN IB MSG: {msg}') + # routine (api-farm conn) status events + # relayed inline by `Client.inline_errors()`; + # not a response to our method call so just + # log at info and keep waiting. + case ('event', emsg): + log.info( + f'IB status event relay: {emsg}' + ) + continue + + case _: + log.warning(f'UNKNOWN IB MSG: {msg}') def status_event( self, @@ -1741,14 +1766,17 @@ async def open_aio_client_method_relay( log.info('asyncio `Client` method-proxy SHUTDOWN!') break - case (meth_name, kwargs): - meth_name, kwargs = msg + case (meth_name, kwargs, mid): meth = getattr(client, meth_name) try: resp = await meth(**kwargs) - # echo the msg back - chan.send_nowait({'result': resp}) + # echo the msg back tagged with the + # req-id for caller-side correlation. + chan.send_nowait({ + 'mid': mid, + 'result': resp, + }) # XXX: relay ALL (non-cancel) errors to the # `trio`-side caller (which re-raises in the @@ -1759,7 +1787,10 @@ async def open_aio_client_method_relay( # qualification failures are expected to be # caught per-request by order/warmup code! except Exception as err: - chan.send_nowait({'exception': err}) + chan.send_nowait({ + 'mid': mid, + 'exception': err, + }) case {'error': content}: chan.send_nowait({'exception': content})