From 6f06f646cf1e40089a52813ab0bfa33c17680beb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Mar 2022 13:49:19 -0400 Subject: [PATCH] Get ib data feed hackzorz workin ib has a throttle limit for "hft" bars but contained in here is some hackery using ``xdotool`` to reset data farms auto-magically B) This copies the working script into the ib backend mod as a routine and now uses `trio.run_process()` and calls into it from the `get_bars()` history retriever and then waits for "data re-established" events to be received from the client before making more history queries. TL;DR summary of changes: - relay ib's "system status" events (like for data farm statuses) as a new "event" msg that can be processed by registers of `Client.inline_errors()` (though we should probably make a new method for this). - add `MethodProxy.status_event()` which allows a proxy user to register for a particular "system event" (as mentioned above), which puts a `trio.Event` entry in a small table can be set by an relay task if there are any detected waiters. - start a "msg relay task" when opening the method proxy which does the event setting mentioned above in the background. - drop the request error handling around the proxy creation, doesn't seem necessary any more now that we have better error propagation from `asyncio`. - add event waiting logic around the data feed reset hackzorin. - change the order relay task to only log system events for now (though we need to do some better parsing/logic to get tws-external order updates to work again.. --- piker/brokers/ib.py | 333 +++++++++++++++++++++++++++++++++----------- 1 file changed, 255 insertions(+), 78 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index cb525a13..65194ff5 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -294,7 +294,8 @@ class Client: bars_kwargs = {'whatToShow': 'TRADES'} global _enters - print(f'ENTER BARS {_enters} @ end={end_dt}') + # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}') + print(f'REQUESTING BARS {_enters} @ end={end_dt}') _enters += 1 contract = await self.find_contract(fqsn) @@ -304,6 +305,7 @@ class Client: bars = await self.ib.reqHistoricalDataAsync( contract, endDateTime=end_dt, + formatDate=2, # time history length values format: # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` @@ -738,25 +740,38 @@ class Client: def inline_errors( self, to_trio: trio.abc.SendChannel, - ) -> None: - # connect error msgs + ) -> None: + ''' + Setup error relay to the provided ``trio`` mem chan such that + trio tasks can retreive and parse ``asyncio``-side API request + errors. + + ''' def push_err( reqId: int, errorCode: int, errorString: str, contract: Contract, + ) -> None: log.error(errorString) + reason = errorString + + if reqId == -1: + # it's a general event? + key = 'event' + else: + key = 'error' try: to_trio.send_nowait(( - 'error', + key, # error "object" {'reqid': reqId, - 'reason': errorString, + 'reason': reason, 'contract': contract} )) except trio.BrokenResourceError: @@ -1123,9 +1138,11 @@ class MethodProxy: def __init__( self, chan: to_asyncio.LinkedTaskChannel, + event_table: dict[str, trio.Event], ) -> None: self.chan = chan + self.event_table = event_table async def _run_method( self, @@ -1140,18 +1157,43 @@ class MethodProxy: ''' chan = self.chan - # send through method + ``kwargs: dict`` as pair await chan.send((meth, kwargs)) - msg = await chan.receive() - res = msg.get('result') - if res: - return res - err = msg.get('error') - if not err: - raise ValueError(f'Received unexpected asyncio msg {msg}') + while not chan.closed(): + # send through method + ``kwargs: dict`` as pair + msg = await chan.receive() + # print(f'NEXT MSG: {msg}') - raise err + # TODO: py3.10 ``match:`` syntax B) + if 'result' in msg: + res = msg.get('result') + return res + + elif 'exception' in msg: + err = msg.get('exception') + raise err + + elif 'error' in msg: + etype, emsg = msg + log.warning(f'IB error relay: {emsg}') + continue + + else: + log.warning(f'UNKNOWN IB MSG: {msg}') + + def status_event( + self, + pattern: str, + + ) -> Union[dict[str, Any], trio.Event]: + + ev = self.event_table.get(pattern) + + if not ev or ev.is_set(): + # print(f'inserting new data reset event item') + ev = self.event_table[pattern] = trio.Event() + + return ev async def wait_for_data_reset(self) -> None: ''' @@ -1166,6 +1208,7 @@ class MethodProxy: async def open_aio_client_method_relay( from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, + event_consumers: dict[str, trio.Event], ) -> None: @@ -1177,7 +1220,7 @@ async def open_aio_client_method_relay( to_trio.send_nowait(client) # TODO: separate channel for error handling? - # client.inline_errors(to_trio) + client.inline_errors(to_trio) # relay all method requests to ``asyncio``-side client and # deliver back results @@ -1188,8 +1231,8 @@ async def open_aio_client_method_relay( break meth_name, kwargs = msg - meth = getattr(client, meth_name) + try: resp = await meth(**kwargs) # echo the msg back @@ -1201,54 +1244,61 @@ async def open_aio_client_method_relay( # TODO: relay all errors to trio? # BaseException, ) as err: - to_trio.send_nowait({'error': err}) + to_trio.send_nowait({'exception': err}) @acm async def open_client_proxy() -> MethodProxy: - try: - async with to_asyncio.open_channel_from( + # try: + event_table = {} + + async with ( + to_asyncio.open_channel_from( open_aio_client_method_relay, - ) as (first, chan): + event_consumers=event_table, + ) as (first, chan), + trio.open_nursery() as relay_n, + ): - assert isinstance(first, Client) - proxy = MethodProxy(chan) + assert isinstance(first, Client) + proxy = MethodProxy(chan, event_table) + + # mock all remote methods on ib ``Client``. + for name, method in inspect.getmembers( + Client, predicate=inspect.isfunction + ): + if '_' == name[0]: + continue + setattr(proxy, name, partial(proxy._run_method, meth=name)) + + async def relay_events(): + + async with chan.subscribe() as msg_stream: + + async for msg in msg_stream: + if 'event' not in msg: + continue + + # if 'event' in msg: + # wake up any system event waiters. + etype, status_msg = msg + reason = status_msg['reason'] + + ev = proxy.event_table.pop(reason, None) + + if ev and ev.statistics().tasks_waiting: + log.info(f'Relaying ib status message: {msg}') + ev.set() - # mock all remote methods on ib ``Client``. - for name, method in inspect.getmembers( - Client, predicate=inspect.isfunction - ): - if '_' == name[0]: continue - setattr(proxy, name, partial(proxy._run_method, meth=name)) - yield proxy + relay_n.start_soon(relay_events) - # terminate asyncio side task - await chan.send(None) + yield proxy - except ( - RequestError, - # BaseException, - )as err: - code = getattr(err, 'code', None) - if code: - msg = err.message - - # TODO: retreive underlying ``ib_insync`` error? - if ( - code == 162 and ( - 'HMDS query returned no data' in msg - or 'No market data permissions for' in msg - ) - or code == 200 - ): - # these cases should not cause a task crash - log.warning(msg) - - else: - raise + # terminate asyncio side task + await chan.send(None) @acm @@ -1378,6 +1428,12 @@ def normalize( return data +_pacing: str = ( + 'Historical Market Data Service error ' + 'message:Historical data request pacing violation' +) + + async def get_bars( proxy: MethodProxy, @@ -1396,14 +1452,13 @@ async def get_bars( fails = 0 bars: Optional[list] = None - in_throttle: bool = False first_dt: datetime = None last_dt: datetime = None if end_dt: last_dt = pendulum.from_timestamp(end_dt.timestamp()) - for _ in range(10): + for _ in range(2): try: bars, bars_array = await proxy.bars( fqsn=fqsn, @@ -1449,26 +1504,43 @@ async def get_bars( continue - else: - log.exception( - "Data query rate reached: Press `ctrl-alt-f`" - "in TWS" + elif _pacing in msg: + + log.warning( + 'History throttle rate reached!\n' + 'Resetting farms with `ctrl-alt-f` hack\n' ) + # TODO: we might have to put a task lock around this + # method.. + hist_ev = proxy.status_event( + 'HMDS data farm connection is OK:ushmds' + ) + # live_ev = proxy.status_event( + # # 'Market data farm connection is OK:usfuture' + # 'Market data farm connection is OK:usfarm' + # ) + # TODO: some kinda resp here that indicates success + # otherwise retry? + await data_reset_hack() - # TODO: should probably create some alert on screen - # and then somehow get that to trigger an event here - # that restarts/resumes this task? - if not in_throttle: - await tractor.breakpoint() + # TODO: a while loop here if we timeout? + for name, ev in [ + ('history', hist_ev), + # ('live', live_ev), + ]: + with trio.move_on_after(22) as cs: + await ev.wait() + log.info(f"{name} DATA RESET") - # TODO: wait on data con reset event - # then begin backfilling again. - # await proxy.wait_for_data() + if cs.cancelled_caught: + log.warning("reset hack failed on first try?") + # await tractor.breakpoint() - in_throttle = True fails += 1 continue + else: + raise return None, None # else: # throttle wasn't fixed so error out immediately @@ -1520,8 +1592,7 @@ async def backfill_bars( # on that until we have the `marketstore` daemon in place in which # case the shm size will be driven by user config and available sys # memory. - # count: int = 120, - count: int = 36, + count: int = 65, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -1566,9 +1637,6 @@ async def backfill_bars( out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) - if fails is None or fails > 1: - break - if out == (None, None): # could be trying to retreive bars over weekend # TODO: add logic here to handle tradable hours and @@ -2222,18 +2290,26 @@ async def deliver_trade_events( msg = pack_position(item) msg.account = accounts_def.inverse[msg.account] - if getattr(msg, 'reqid', 0) < -1: + elif event_name == 'event': - # it's a trade event generated by TWS usage. - log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + # it's either a general system status event or an external + # trade event? + log.info(f"TWS system status: \n{pformat(item)}") - msg.reqid = 'tws-' + str(-1 * msg.reqid) + # TODO: support this again but needs parsing at the callback + # level... + # reqid = item.get('reqid', 0) + # if getattr(msg, 'reqid', 0) < -1: + # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + + continue + + # msg.reqid = 'tws-' + str(-1 * reqid) # mark msg as from "external system" # TODO: probably something better then this.. and start # considering multiplayer/group trades tracking - msg.broker_details['external_src'] = 'tws' - continue + # msg.broker_details['external_src'] = 'tws' # XXX: we always serialize to a dict for msgpack # translations, ideally we can move to an msgspec (or other) @@ -2336,3 +2412,104 @@ async def open_symbol_search( log.debug(f"sending matches: {matches.keys()}") await stream.send(matches) + + +async def data_reset_hack( + reset_type: str = 'data', + +) -> None: + ''' + Run key combos for resetting data feeds and yield back to caller + when complete. + + This is a linux-only hack around: + + https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations + + TODOs: + - a return type that hopefully determines if the hack was + successful. + - other OS support? + - integration with ``ib-gw`` run in docker + Xorg? + + ''' + # TODO: try out this lib instead, seems to be the most modern + # and usess the underlying lib: + # https://github.com/rshk/python-libxdo + + # TODO: seems to be a few libs for python but not sure + # if they support all the sub commands we need, order of + # most recent commit history: + # https://github.com/rr-/pyxdotool + # https://github.com/ShaneHutter/pyxdotool + # https://github.com/cphyc/pyxdotool + + try: + import i3ipc + except ImportError: + return False + log.warning('IB data hack no-supported on ur platformz') + + i3 = i3ipc.Connection() + t = i3.get_tree() + + orig_win_id = t.find_focused().window + + # for tws + win_names: list[str] = [ + 'Interactive Brokers', # tws running in i3 + 'IB Gateway', # gw running in i3 + # 'IB', # gw running in i3 (newer version?) + ] + + combos: dict[str, str] = { + # only required if we need a connection reset. + 'connection': ('ctrl+alt+r', 12), + + # data feed reset. + 'data': ('ctrl+alt+f', 6) + } + + for name in win_names: + results = t.find_titled(name) + print(f'results for {name}: {results}') + if results: + con = results[0] + print(f'Resetting data feed for {name}') + win_id = str(con.window) + w, h = con.rect.width, con.rect.height + + # TODO: only run the reconnect (2nd) kc on a detected + # disconnect? + key_combo, timeout = combos[reset_type] + # for key_combo, timeout in [ + # # only required if we need a connection reset. + # # ('ctrl+alt+r', 12), + # # data feed reset. + # ('ctrl+alt+f', 6) + # ]: + await trio.run_process([ + 'xdotool', + 'windowactivate', '--sync', win_id, + + # move mouse to bottom left of window (where there should + # be nothing to click). + 'mousemove_relative', '--sync', str(w-4), str(h-4), + + # NOTE: we may need to stick a `--retry 3` in here.. + 'click', '--window', win_id, + '--repeat', '3', '1', + + # hackzorzes + 'key', key_combo, + # ], + # timeout=timeout, + ]) + + # re-activate and focus original window + await trio.run_process([ + 'xdotool', + 'windowactivate', '--sync', str(orig_win_id), + 'click', '--window', str(orig_win_id), '1', + ]) + return True