From b1111bf9b006e960aadf26b629c9ba2946d407cf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 15 Sep 2025 16:53:25 -0400 Subject: [PATCH 01/15] ib: jig `.data_reset_hack()` with vnc-client failover Since apparently porting to the new docker container enforces using a vnc password and `asyncvnc` seems to have a bug/mis-config whenever i've tried a pw over a wg tunnel..? Soo, this tries out the old `i3ipc`-win-focus + `xdo` click hack when the above fails. Deats, - add a mod-level `try_xdo_manual()` to wrap calling `i3ipc_xdotool_manual_click_hack()` with an oserr handler, ensure we don't bother trying if `i3ipc` import fails beforehand tho. - call ^ from both the orig case block and the failover from the vnc-client case. - factor the `+no_setup_msg: str` out to mod level and expect it to be `.format()`-ed. - refresh todo around `asyncvnc` pw ish.. - add a new `i3ipc_fin_wins_titled()` window-title scanner which predicates input `titles` and delivers any matches alongside the orig focused win at call time. - tweak `i3ipc_xdotool_manual_click_hack()` to call ^ and remove prior unfactored window scanning logic. --- piker/brokers/ib/_util.py | 224 +++++++++++++++++++++++++------------- 1 file changed, 147 insertions(+), 77 deletions(-) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index 2c71bc46..512e5358 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -34,6 +34,7 @@ from piker.brokers._util import get_logger if TYPE_CHECKING: from .api import Client from ib_insync import IB + import i3ipc log = get_logger('piker.brokers.ib') @@ -48,6 +49,37 @@ _reset_tech: Literal[ ] = 'vnc' +no_setup_msg:str = ( + 'No data reset hack test setup for {vnc_sockaddr}!\n' + 'See config setup tips @\n' + 'https://github.com/pikers/piker/tree/master/piker/brokers/ib' +) + + +def try_xdo_manual( + vnc_sockaddr: str, +): + ''' + Do the "manual" `xdo`-based screen switch + click + combo since apparently the `asyncvnc` client ain't workin.. + + Note this is only meant as a backup method for Xorg users, + ideally you can use a real vnc client and the `vnc_click_hack()` + impl! + + ''' + global _reset_tech + try: + i3ipc_xdotool_manual_click_hack() + _reset_tech = 'i3ipc_xdotool' + return True + except OSError: + log.exception( + no_setup_msg.format(vnc_sockaddr) + ) + return False + + async def data_reset_hack( # vnc_host: str, client: Client, @@ -90,15 +122,9 @@ async def data_reset_hack( vnc_port: int vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs') - no_setup_msg:str = ( - f'No data reset hack test setup for {vnc_sockaddr}!\n' - 'See config setup tips @\n' - 'https://github.com/pikers/piker/tree/master/piker/brokers/ib' - ) - if not vnc_sockaddr: log.warning( - no_setup_msg + no_setup_msg.format(vnc_sockaddr) + 'REQUIRES A `vnc_addrs: array` ENTRY' ) @@ -119,27 +145,38 @@ async def data_reset_hack( port=vnc_port, ) ) - except OSError: - if vnc_host != 'localhost': - log.warning(no_setup_msg) - return False - + except ( + OSError, # no VNC server avail.. + PermissionError, # asyncvnc pw fail.. + ): try: import i3ipc # noqa (since a deps dynamic check) except ModuleNotFoundError: - log.warning(no_setup_msg) + log.warning( + no_setup_msg.format(vnc_sockaddr) + ) return False - try: - i3ipc_xdotool_manual_click_hack() - _reset_tech = 'i3ipc_xdotool' - return True - except OSError: - log.exception(no_setup_msg) - return False + if vnc_host not in { + 'localhost', + '127.0.0.1', + }: + focussed, matches = i3ipc_fin_wins_titled() + if not matches: + log.warning( + no_setup_msg.format(vnc_sockaddr) + ) + return False + else: + try_xdo_manual(vnc_sockaddr) + + # localhost but no vnc-client or it borked.. + else: + try_xdo_manual(vnc_sockaddr) case 'i3ipc_xdotool': - i3ipc_xdotool_manual_click_hack() + try_xdo_manual(vnc_sockaddr) + # i3ipc_xdotool_manual_click_hack() case _ as tech: raise RuntimeError(f'{tech} is not supported for reset tech!?') @@ -178,9 +215,9 @@ async def vnc_click_hack( host, port=port, - # TODO: doesn't work see: - # https://github.com/barneygale/asyncvnc/issues/7 - # password='ibcansmbz', + # TODO: doesn't work? + # see, https://github.com/barneygale/asyncvnc/issues/7 + password='doggy', ) as client: @@ -194,70 +231,103 @@ async def vnc_click_hack( client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked +def i3ipc_fin_wins_titled( + titles: list[str] = [ + 'Interactive Brokers', # tws running in i3 + 'IB Gateway', # gw running in i3 + # 'IB', # gw running in i3 (newer version?) + + # !TODO, remote vnc instance + # -[ ] something in title (or other Con-props) that indicates + # this is explicitly for ibrk sw? + # |_[ ] !can use modden spawn eventually! + 'TigerVNC', + # 'vncviewer', # the terminal.. + ], +) -> tuple[ + i3ipc.Con, # orig focussed win + list[tuple[str, i3ipc.Con]], # matching wins by title +]: + ''' + Attempt to find a local-DE window titled with an entry in + `titles`. + + If found deliver the current focussed window and all matching + `i3ipc.Con`s in a list. + + ''' + import i3ipc + ipc = i3ipc.Connection() + + # TODO: might be worth offering some kinda api for grabbing + # the window id from the pid? + # https://stackoverflow.com/a/2250879 + tree = ipc.get_tree() + focussed: i3ipc.Con = tree.find_focused() + + matches: list[i3ipc.Con] = [] + for name in titles: + results = tree.find_titled(name) + print(f'results for {name}: {results}') + if results: + con = results[0] + matches.append(( + name, + con, + )) + + return ( + focussed, + matches, + ) + + + def i3ipc_xdotool_manual_click_hack() -> None: ''' Do the data reset hack but expecting a local X-window using `xdotool`. ''' - import i3ipc - i3 = i3ipc.Connection() - - # TODO: might be worth offering some kinda api for grabbing - # the window id from the pid? - # https://stackoverflow.com/a/2250879 - t = i3.get_tree() - - orig_win_id = t.find_focused().window - - # for tws - win_names: list[str] = [ - 'Interactive Brokers', # tws running in i3 - 'IB Gateway', # gw running in i3 - # 'IB', # gw running in i3 (newer version?) - ] - + focussed, matches = i3ipc_fin_wins_titled() + orig_win_id = focussed.window try: - for name in win_names: - results = t.find_titled(name) - print(f'results for {name}: {results}') - if results: - con = results[0] - print(f'Resetting data feed for {name}') - win_id = str(con.window) - w, h = con.rect.width, con.rect.height + for name, con in matches: + print(f'Resetting data feed for {name}') + win_id = str(con.window) + w, h = con.rect.width, con.rect.height - # TODO: seems to be a few libs for python but not sure - # if they support all the sub commands we need, order of - # most recent commit history: - # https://github.com/rr-/pyxdotool - # https://github.com/ShaneHutter/pyxdotool - # https://github.com/cphyc/pyxdotool + # TODO: seems to be a few libs for python but not sure + # if they support all the sub commands we need, order of + # most recent commit history: + # https://github.com/rr-/pyxdotool + # https://github.com/ShaneHutter/pyxdotool + # https://github.com/cphyc/pyxdotool - # TODO: only run the reconnect (2nd) kc on a detected - # disconnect? - for key_combo, timeout in [ - # only required if we need a connection reset. - # ('ctrl+alt+r', 12), - # data feed reset. - ('ctrl+alt+f', 6) - ]: - subprocess.call([ - 'xdotool', - 'windowactivate', '--sync', win_id, + # TODO: only run the reconnect (2nd) kc on a detected + # disconnect? + for key_combo, timeout in [ + # only required if we need a connection reset. + # ('ctrl+alt+r', 12), + # data feed reset. + ('ctrl+alt+f', 6) + ]: + subprocess.call([ + 'xdotool', + 'windowactivate', '--sync', win_id, - # move mouse to bottom left of window (where - # there should be nothing to click). - 'mousemove_relative', '--sync', str(w-4), str(h-4), + # move mouse to bottom left of window (where + # there should be nothing to click). + 'mousemove_relative', '--sync', str(w-4), str(h-4), - # NOTE: we may need to stick a `--retry 3` in here.. - 'click', '--window', win_id, - '--repeat', '3', '1', + # NOTE: we may need to stick a `--retry 3` in here.. + 'click', '--window', win_id, + '--repeat', '3', '1', - # hackzorzes - 'key', key_combo, - ], - timeout=timeout, - ) + # hackzorzes + 'key', key_combo, + ], + timeout=timeout, + ) # re-activate and focus original window subprocess.call([ From e92d5baf9925fdf168ac714c32852bfa23a1ef49 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 17 Sep 2025 18:54:47 -0400 Subject: [PATCH 02/15] ib: never relay "Warning:" errors to EMS.. You'd think they could be bothered to make either a "log" or "warning" msg type instead of a `type='error'`.. but alas, this attempts to detect all such "warning"-errors and never proxy them to the clearing engine thus avoiding the cancellation of any associated (by `reqid`) pre-existing orders (control dialogs). Also update all surrounding log messages to a more multiline style. --- piker/brokers/ib/broker.py | 43 +++++++++++++++++++++++++------------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index b78f2880..f9c27bba 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -1243,32 +1243,47 @@ async def deliver_trade_events( # never relay errors for non-broker related issues # https://interactivebrokers.github.io/tws-api/message_codes.html code: int = err['error_code'] - if code in { - 200, # uhh + reason: str = err['reason'] + reqid: str = str(err['reqid']) + + # "Warning:" msg codes, + # https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes + # - 2109: 'Outside Regular Trading Hours' + if 'Warning:' in reason: + log.warning( + f'Order-API-warning: {code!r}\n' + f'reqid: {reqid!r}\n' + f'\n' + f'{pformat(err)}\n' + # ^TODO? should we just print the `reason` + # not the full `err`-dict? + ) + continue + + # XXX known special (ignore) cases + elif code in { + 200, # uhh.. ni idea # hist pacing / connectivity 162, 165, - # WARNING codes: - # https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes - # Attribute 'Outside Regular Trading Hours' is - # " 'ignored based on the order type and - # destination. PlaceOrder is now ' 'being - # processed.', - 2109, - # XXX: lol this isn't even documented.. # 'No market data during competing live session' 1669, }: + log.error( + f'Order-API-error which is non-cancel-causing ?!\n' + f'\n' + f'{pformat(err)}\n' + ) continue - reqid: str = str(err['reqid']) - reason: str = err['reason'] - if err['reqid'] == -1: - log.error(f'TWS external order error:\n{pformat(err)}') + log.error( + f'TWS external order error ??\n' + f'{pformat(err)}\n' + ) flow: dict = dict( flows.get(reqid) From 27c83fae0c13c4b7ce98f5960aa00c60b2a726d5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 20 Sep 2025 22:13:59 -0400 Subject: [PATCH 03/15] ib: add venue-hours checking Such that we can avoid other (pretty unreliable) "alternative" checks to determine whether a real-time quote should be waited on or (when venue is closed) we should just signal that historical backfilling can commence immediately. This has been a todo for a very long time and it turned out to be much easier to accomplish than anticipated.. Deats, - add a new `is_current_time_in_range()` dt range checker to predicate whether an input range contains `datetime.now(start_dt.tzinfo)`. - in `.ib.feed.stream_quotes()` add a `venue_is_open: bool` which uses all of the new ^^ to determine whether to branch for the short-circuit-and-do-history-now-case or the std real-time-quotes should-be-awaited-since-venue-is-open, case; drop all the old hacks trying to workaround not figuring that venue state stuff.. Other, - also add a gpt5 composed parser to `._util` for the `ib_insync.ContractDetails.tradingHours: str` for before i realized there was a `.tradingSessions` property XD - in `.ib_feed`, * add various EG-collapsings per recent tractor/trio updates. * better logging / exc-handling around ticker quote pushes. * stop clearing `Ticker.ticks` each quote iteration; not sure if this is needed/correct tho? * add masked `Ticker.ticks` poll loop that logs. - fix some `str.format()` usage in `._util.try_xdo_manual()` NOTE, resolved conflicts on `piker/brokers/ib/feed.py` due to rebasing onto up stream `brokers_refinery` commit, d809c797 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout --- piker/brokers/ib/_util.py | 109 +++++++++++++++++++++- piker/brokers/ib/feed.py | 188 ++++++++++++++++++++++++++------------ 2 files changed, 233 insertions(+), 64 deletions(-) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index 512e5358..2941284a 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -20,6 +20,11 @@ runnable script-programs. ''' from __future__ import annotations +from datetime import ( # noqa + datetime, + date, + tzinfo as TzInfo, +) from functools import partial from typing import ( Literal, @@ -75,7 +80,7 @@ def try_xdo_manual( return True except OSError: log.exception( - no_setup_msg.format(vnc_sockaddr) + no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) ) return False @@ -124,7 +129,7 @@ async def data_reset_hack( if not vnc_sockaddr: log.warning( - no_setup_msg.format(vnc_sockaddr) + no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) + 'REQUIRES A `vnc_addrs: array` ENTRY' ) @@ -153,7 +158,7 @@ async def data_reset_hack( import i3ipc # noqa (since a deps dynamic check) except ModuleNotFoundError: log.warning( - no_setup_msg.format(vnc_sockaddr) + no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) ) return False @@ -164,7 +169,7 @@ async def data_reset_hack( focussed, matches = i3ipc_fin_wins_titled() if not matches: log.warning( - no_setup_msg.format(vnc_sockaddr) + no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) ) return False else: @@ -337,3 +342,99 @@ def i3ipc_xdotool_manual_click_hack() -> None: ]) except subprocess.TimeoutExpired: log.exception('xdotool timed out?') + + + +def is_current_time_in_range( + start_dt: datetime, + end_dt: datetime, +) -> bool: + ''' + Check if current time is within the datetime range. + + Use any/the-same timezone as provided by `start_dt.tzinfo` value + in the range. + + ''' + now: datetime = datetime.now(start_dt.tzinfo) + return start_dt <= now <= end_dt + + +# TODO, put this into `._util` and call it from here! +# +# NOTE, this was generated by @guille from a gpt5 prompt +# and was originally thot to be needed before learning about +# `ib_insync.contract.ContractDetails._parseSessions()` and +# it's downstream meths.. +# +# This is still likely useful to keep for now to parse the +# `.tradingHours: str` value manually if we ever decide +# to move off `ib_async` and implement our own `trio`/`anyio` +# based version Bp +# +# >attempt to parse the retarted ib "time stampy thing" they +# >do for "venue hours" with this.. written by +# >gpt5-"thinking", +# + + +def parse_trading_hours( + spec: str, + tz: TzInfo|None = None +) -> dict[ + date, + tuple[datetime, datetime] +]|None: + ''' + Parse venue hours like: + 'YYYYMMDD:HHMM-YYYYMMDD:HHMM;YYYYMMDD:CLOSED;...' + + Returns `dict[date] = (open_dt, close_dt)` or `None` if + closed. + + ''' + if ( + not isinstance(spec, str) + or + not spec + ): + raise ValueError('spec must be a non-empty string') + + out: dict[ + date, + tuple[datetime, datetime] + ]|None = {} + + for part in (p.strip() for p in spec.split(';') if p.strip()): + if part.endswith(':CLOSED'): + day_s, _ = part.split(':', 1) + d = datetime.strptime(day_s, '%Y%m%d').date() + out[d] = None + continue + + try: + start_s, end_s = part.split('-', 1) + start_dt = datetime.strptime(start_s, '%Y%m%d:%H%M') + end_dt = datetime.strptime(end_s, '%Y%m%d:%H%M') + except ValueError as exc: + raise ValueError(f'invalid segment: {part}') from exc + + if tz is not None: + start_dt = start_dt.replace(tzinfo=tz) + end_dt = end_dt.replace(tzinfo=tz) + + out[start_dt.date()] = (start_dt, end_dt) + + return out + + +# ORIG desired usage, +# +# TODO, for non-drunk tomorrow, +# - call above fn and check that `output[today] is not None` +# trading_hrs: dict = parse_trading_hours( +# details.tradingHours +# ) +# liq_hrs: dict = parse_trading_hours( +# details.liquidHours + # ) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 062b2c2e..68043674 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -26,7 +26,6 @@ from dataclasses import asdict from datetime import datetime from functools import partial from pprint import pformat -from math import isnan import time from typing import ( Any, @@ -69,7 +68,10 @@ from .api import ( Contract, RequestError, ) -from ._util import data_reset_hack +from ._util import ( + data_reset_hack, + is_current_time_in_range, +) from .symbols import get_mkt_info if TYPE_CHECKING: @@ -184,7 +186,8 @@ async def open_history_client( if ( start_dt - and start_dt.timestamp() == 0 + and + start_dt.timestamp() == 0 ): await tractor.pause() @@ -203,7 +206,7 @@ async def open_history_client( ): count += 1 mean += latency / count - print( + log.debug( f'HISTORY FRAME QUERY LATENCY: {latency}\n' f'mean: {mean}' ) @@ -607,7 +610,10 @@ async def get_bars( # such that simultaneous symbol queries don't try data resettingn # too fast.. unset_resetter: bool = False - async with trio.open_nursery() as tn: + async with ( + tractor.trionics.collapse_eg(), + trio.open_nursery() as tn + ): # start history request that we allow # to run indefinitely until a result is acquired @@ -689,10 +695,17 @@ async def _setup_quote_stream( async with load_aio_clients( disconnect_on_exit=False, ) as accts2clients: + + # since asyncio.Task + # tractor.pause_from_sync() + caccount_name, client = get_preferred_data_client(accts2clients) contract = contract or (await client.find_contract(symbol)) to_trio.send_nowait(contract) # cuz why not - ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) + ticker: Ticker = client.ib.reqMktData( + contract, + ','.join(opts), + ) # NOTE: it's batch-wise and slow af but I guess could # be good for backchecking? Seems to be every 5s maybe? @@ -718,10 +731,10 @@ async def _setup_quote_stream( Push quotes to trio task. """ - # log.debug(t) + + # log.debug(f'new IB quote: {t}\n') try: to_trio.send_nowait(t) - except ( trio.BrokenResourceError, @@ -736,21 +749,47 @@ async def _setup_quote_stream( # resulting in tracebacks spammed to console.. # Manually do the dereg ourselves. teardown() - except trio.WouldBlock: - # log.warning( - # f'channel is blocking symbol feed for {symbol}?' - # f'\n{to_trio.statistics}' - # ) - pass - # except trio.WouldBlock: - # # for slow debugging purposes to avoid clobbering prompt - # # with log msgs - # pass + # for slow debugging purposes to avoid clobbering prompt + # with log msgs + except trio.WouldBlock: + log.exception( + f'Asyncio->Trio `to_trio.send_nowait()` blocked !?\n' + f'\n' + f'{to_trio.statistics()}\n' + ) + + # ?TODO, handle re-connection attempts? + except BaseException as _berr: + berr = _berr + log.exception( + f'Failed to push ticker quote !?\n' + f'cause: {berr}\n' + f'\n' + f't: {t}\n' + f'{to_trio.statistics}\n' + ) + # raise berr + ticker.updateEvent.connect(push) try: await asyncio.sleep(float('inf')) + + # XXX, just for debug.. + # tractor.pause_from_sync() + # while True: + # await asyncio.sleep(1.6) + # if ticker.ticks: + # log.debug( + # f'ticker.ticks = \n' + # f'{ticker.ticks}\n' + # ) + # else: + # log.warning( + # 'UHH no ticker.ticks ??' + # ) + finally: teardown() @@ -826,7 +865,7 @@ def normalize( tbt = ticker.tickByTicks if tbt: - print(f'tickbyticks:\n {ticker.tickByTicks}') + log.info(f'tickbyticks:\n {ticker.tickByTicks}') ticker.ticks = new_ticks @@ -867,22 +906,28 @@ async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, - loglevel: str = None, + + # TODO? we need to hook into the `ib_async` logger like + # we can with i3ipc from modden! + # loglevel: str|None = None, # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: ''' - Stream symbol quotes. + Stream `symbols[0]` quotes back via `send_chan`. - This is a ``trio`` callable routine meant to be invoked - once the brokerd is up. + The `feed_is_live: Event` is set to signal the caller that it can + begin processing msgs from the mem-chan. ''' # TODO: support multiple subscriptions - sym = symbols[0] - log.info(f'request for real-time quotes: {sym}') + sym: str = symbols[0] + log.info( + f'request for real-time quotes\n' + f'sym: {sym!r}\n' + ) init_msgs: list[FeedInit] = [] @@ -891,34 +936,49 @@ async def stream_quotes( details: ibis.ContractDetails async with ( open_data_client() as proxy, - # trio.open_nursery() as tn, ): mkt, details = await get_mkt_info( sym, proxy=proxy, # passed to avoid implicit client load ) + # is venue active rn? + venue_is_open: bool = any( + is_current_time_in_range( + start_dt=sesh.start, + end_dt=sesh.end, + ) + for sesh in details.tradingSessions() + ) + init_msg = FeedInit(mkt_info=mkt) + # NOTE, tell sampler (via config) to skip vlm summing for dst + # assets which provide no vlm data.. if mkt.dst.atype in { 'fiat', 'index', 'commodity', }: - # tell sampler config that it shouldn't do vlm summing. init_msg.shm_write_opts['sum_tick_vlm'] = False init_msg.shm_write_opts['has_vlm'] = False init_msgs.append(init_msg) con: Contract = details.contract - first_ticker: Ticker | None = None - with trio.move_on_after(1): + first_ticker: Ticker|None = None + + with trio.move_on_after(1.6) as quote_cs: first_ticker: Ticker = await proxy.get_quote( contract=con, raise_on_timeout=False, ) + # XXX should never happen with this ep right? + # but if so then, more then likely mkt is closed? + if quote_cs.cancelled_caught: + await tractor.pause() + if first_ticker: first_quote: dict = normalize(first_ticker) @@ -930,28 +990,27 @@ async def stream_quotes( f'{pformat(first_quote)}\n' ) - # NOTE: it might be outside regular trading hours for - # assets with "standard venue operating hours" so we - # only "pretend the feed is live" when the dst asset - # type is NOT within the NON-NORMAL-venue set: aka not - # commodities, forex or crypto currencies which CAN - # always return a NaN on a snap quote request during - # normal venue hours. In the case of a closed venue - # (equitiies, futes, bonds etc.) we at least try to - # grab the OHLC history. - if ( - first_ticker - and - isnan(first_ticker.last) - # SO, if the last quote price value is NaN we ONLY - # "pretend to do" `feed_is_live.set()` if it's a known - # dst asset venue with a lot of closed operating hours. - and mkt.dst.atype not in { - 'commodity', - 'fiat', - 'crypto', - } - ): + # XXX NOTE: whenever we're "outside regular trading hours" + # (only relevant for assets coming from the "legacy markets" + # space) so we basically (from an API/runtime-operational + # perspective) "pretend the feed is live" even if it's + # actually closed. + # + # IOW, we signal to the effective caller (task) that the live + # feed is "already up" but really we're just indicating that + # the OHLCV history can start being loaded immediately by the + # `piker.data`/`.tsp` layers. + # + # XXX, deats: the "pretend we're live" is just done by + # a `feed_is_live.set()` even though nothing is actually live + # Bp + if not venue_is_open: + log.warning( + f'Venue is closed, unable to establish real-time feed.\n' + f'mkt: {mkt!r}\n' + f'\n' + f'first_ticker: {first_ticker}\n' + ) task_status.started(( init_msgs, first_quote, @@ -962,10 +1021,12 @@ async def stream_quotes( feed_is_live.set() # block and let data history backfill code run. + # XXX obvi given the venue is closed, we never expect feed + # to come up; a taskc should be the only way to + # terminate this task. await trio.sleep_forever() - return # we never expect feed to come up? - # TODO: we should instead spawn a task that waits on a feed + # ?TODO, we could instead spawn a task that waits on a feed # to start and let it wait indefinitely..instead of this # hard coded stuff. # async def wait_for_first_quote(): @@ -987,24 +1048,27 @@ async def stream_quotes( 'Rxed init quote:\n' f'{pformat(first_quote)}' ) - cs: trio.CancelScope | None = None + cs: trio.CancelScope|None = None startup: bool = True iter_quotes: trio.abc.Channel while ( startup - or cs.cancel_called + or + cs.cancel_called ): with trio.CancelScope() as cs: async with ( + tractor.trionics.collapse_eg(), trio.open_nursery() as tn, open_aio_quote_stream( symbol=sym, contract=con, ) as iter_quotes, ): + # ?TODO? can we rm this - particularly for `ib_async`? # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] + # first_ticker.ticks = [] # only on first entry at feed boot up if startup: @@ -1018,8 +1082,8 @@ async def stream_quotes( # data feed event. async def reset_on_feed(): - # TODO: this seems to be surpressed from the - # traceback in ``tractor``? + # ??TODO? this seems to be surpressed from the + # traceback in `tractor`? # assert 0 rt_ev = proxy.status_event( @@ -1065,7 +1129,7 @@ async def stream_quotes( # ugh, clear ticks since we've # consumed them (ahem, ib_insync is # truly stateful trash) - ticker.ticks = [] + # ticker.ticks = [] # XXX: this works because we don't use # ``aclosing()`` above? @@ -1087,8 +1151,12 @@ async def stream_quotes( async for ticker in iter_quotes: quote = normalize(ticker) fqme: str = quote['fqme'] + log.debug( + f'Sending quote\n' + f'{quote}' + ) await send_chan.send({fqme: quote}) # ugh, clear ticks since we've consumed them - ticker.ticks = [] + # ticker.ticks = [] # last = time.time() From 323840fdfc62cadc4e97e7e849d595028ccf6f45 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 21 Sep 2025 16:05:50 -0400 Subject: [PATCH 04/15] `ib`: various type-annot, multiline styling and todos updates --- piker/brokers/ib/api.py | 20 ++++++++++---------- piker/brokers/ib/broker.py | 16 +++++++++++++--- piker/brokers/ib/feed.py | 17 +++++++++++++---- 3 files changed, 36 insertions(+), 17 deletions(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 74d03075..8a498bd5 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -334,15 +334,15 @@ class Client: fqme: str, # EST in ISO 8601 format is required... below is EPOCH - start_dt: datetime | str = "1970-01-01T00:00:00.000000-05:00", - end_dt: datetime | str = "", + start_dt: datetime|str = "1970-01-01T00:00:00.000000-05:00", + end_dt: datetime|str = "", # ohlc sample period in seconds sample_period_s: int = 1, # optional "duration of time" equal to the # length of the returned history frame. - duration: str | None = None, + duration: str|None = None, **kwargs, @@ -716,8 +716,8 @@ class Client: async def find_contracts( self, - pattern: str | None = None, - contract: Contract | None = None, + pattern: str|None = None, + contract: Contract|None = None, qualify: bool = True, err_on_qualify: bool = True, @@ -862,7 +862,7 @@ class Client: self, fqme: str, - ) -> datetime | None: + ) -> datetime|None: ''' Return the first datetime stamp for `fqme` or `None` on request failure. @@ -918,7 +918,7 @@ class Client: tries: int = 100, raise_on_timeout: bool = False, - ) -> Ticker | None: + ) -> Ticker|None: ''' Return a single (snap) quote for symbol. @@ -930,7 +930,7 @@ class Client: ready: ticker.TickerUpdateEvent = ticker.updateEvent # ensure a last price gets filled in before we deliver quote - timeouterr: Exception | None = None + timeouterr: Exception|None = None warnset: bool = False for _ in range(tries): @@ -1505,7 +1505,7 @@ class MethodProxy: self, pattern: str, - ) -> dict[str, Any] | trio.Event: + ) -> dict[str, Any]|trio.Event: ev = self.event_table.get(pattern) @@ -1542,7 +1542,7 @@ async def open_aio_client_method_relay( # relay all method requests to ``asyncio``-side client and deliver # back results while not to_trio._closed: - msg: tuple[str, dict] | dict | None = await from_trio.get() + msg: tuple[str, dict]|dict|None = await from_trio.get() match msg: case None: # termination sentinel log.info('asyncio `Client` method-proxy SHUTDOWN!') diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index f9c27bba..70dfda19 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -547,7 +547,10 @@ async def open_trade_dialog( ), # TODO: do this as part of `open_account()`!? - open_symcache('ib', only_from_memcache=True) as symcache, + open_symcache( + 'ib', + only_from_memcache=True, + ) as symcache, ): # Open a trade ledgers stack for appending trade records over # multiple accounts. @@ -556,7 +559,9 @@ async def open_trade_dialog( tables: dict[str, Account] = {} order_msgs: list[Status] = [] conf = get_config() - accounts_def_inv: bidict[str, str] = bidict(conf['accounts']).inverse + accounts_def_inv: bidict[str, str] = bidict( + conf['accounts'] + ).inverse with ( ExitStack() as lstack, @@ -706,7 +711,11 @@ async def open_trade_dialog( # client-account and build out position msgs to deliver to # EMS. for acctid, acnt in tables.items(): - active_pps, closed_pps = acnt.dump_active() + active_pps: dict[str, Position] + ( + active_pps, + closed_pps, + ) = acnt.dump_active() for pps in [active_pps, closed_pps]: piker_pps: list[Position] = list(pps.values()) @@ -722,6 +731,7 @@ async def open_trade_dialog( ) if ibpos: bs_mktid: str = str(ibpos.contract.conId) + msg = await update_and_audit_pos_msg( acctid, pikerpos, diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 68043674..13337465 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for pikers) +# Copyright (C) 2018-forever Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -13,10 +13,12 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``. -""" +''' +Data feed endpoints pre-wrapped and ready for use with `tractor`/`trio` +via "infected-asyncio-mode". + +''' from __future__ import annotations import asyncio from contextlib import ( @@ -901,6 +903,13 @@ def normalize( return data +# ?TODO? feels like this task-fn could be factored to reduce some +# indentation levels? +# -[ ] the reconnect while loop on ib-gw "data farm connection.."s +# -[ ] everything embedded under the `async with aclosing(stream):` +# as the "meat" of the quote delivery once the connection is +# stable. +# async def stream_quotes( send_chan: trio.abc.SendChannel, From 728a6f428eb6041b9407728d0ed7c571ce2b6566 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 21 Sep 2025 22:38:05 -0400 Subject: [PATCH 05/15] `ib.feed`: finally solve `push()` exc propagation Such that if/when the `push()` ticker callback (closure) errors internally, we actually eventually bubble the error out-and-up from the `asyncio.Task` and from there out the `.to_asyncio.open_channel_from()` to the parent `trio.Task`.. It ended up being much more subtle to solve then i would have liked thanks to, - whatever `Ticker.updateEvent.connect()` does behind the scenes in terms of (clearly) swallowing with only log reporting any exc raised in the registered callback (in our case `push()`), - `asyncio.Task.set_excepion()` never working and instead needing to resort to `Task.cancel()`, catching `CancelledError` and re-raising the stashed `maybe_exc` from `push()` when set.. Further this ports `.to_asyncio.open_channel_from()` usage to use the new `chan: tractor.to_asyncio.LinkedTaskChannel` fn-sig API, namely for `_setup_quote_stream()` task. Requires the latest `tractor` updates to the inter-eventloop-chan iface providing a `.set_nowait()` and `.get()` for the `asyncio`-side. Impl deats within `_setup_quote_stream()`, - implement `push()` error-bubbling by adding a `maybe_exc` which can be set by that callback itself or by its registering task; when set it is both, * reported on by the `teardown()` cb, * re-raised by the terminated (via `.cancel()`) `asyncio.Task` after woken from its sleep, aka "cancelled" (since that's apparently one of the only options.. see big rant further todo comments). - add explicit error-tolerance-tuning via a `handler_tries: int` counter and `tries_before_raise: int` limit such that we only bubble a `push()` raised exc once enough tries have consecutively failed. - as mentioned, use the new `chan` fn-sig support and thus the new method API for `asyncio` -> `trio` comms. - a big TODO XXX around the need to use a better sys for terminating `asyncio.Task`s whether it's by delegating to some `.to_asyncio` internals after a factor-out OR by potentially going full bore `anyio` throughout `.to_asyncio`'s impl in general.. - mk `teardown()` use appropriate `log.()`s based on outcome. Surroundingly, - add a ton of doc-strings to mod fns previously missing them. - improved / added-new comments to `wait_on_data_reset()` internals and anything changed per ^above. NOTE, resolved conflicts on `piker/brokers/ib/feed.py` due to `brokers_refinery` commit: d809c797 `.brokers.ib.feed`: better `tractor.to_asyncio` typing and var naming throughout! --- piker/brokers/ib/feed.py | 204 +++++++++++++++++++++++++++++---------- 1 file changed, 155 insertions(+), 49 deletions(-) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 13337465..bb450b31 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -41,7 +41,6 @@ import numpy as np from pendulum import ( now, from_timestamp, - # DateTime, Duration, duration as mk_duration, ) @@ -290,8 +289,9 @@ _pacing: str = ( async def wait_on_data_reset( proxy: MethodProxy, + reset_type: str = 'data', - timeout: float = 16, # float('inf'), + timeout: float = 16, task_status: TaskStatus[ tuple[ @@ -300,29 +300,47 @@ async def wait_on_data_reset( ] ] = trio.TASK_STATUS_IGNORED, ) -> bool: + ''' + Wait on a (global-ish) "data-farm" event to be emitted + by the IB api server. - # TODO: we might have to put a task lock around this - # method.. - hist_ev = proxy.status_event( + Allows syncing to reconnect event-messages emitted on the API + console, such as: + + - 'HMDS data farm connection is OK:ushmds' + - 'Market data farm is connecting:usfuture' + - 'Market data farm connection is OK:usfuture' + + Deliver a `(cs, done: Event)` pair to the caller to support it + waiting or cancelling the associated "data-reset-request"; + normally a manual data-reset-req is expected to be the cause and + thus trigger such events (such as our click-hack-magic from + `.ib._util`). + + ''' + # ?TODO, do we need a task-lock around this method? + # + # register for an API "status event" wrapped for `trio`-sync. + hist_ev: trio.Event = proxy.status_event( 'HMDS data farm connection is OK:ushmds' ) - - # TODO: other event messages we might want to try and - # wait for but i wasn't able to get any of this - # reliable.. + # + # ^TODO: other event-messages we might want to support waiting-for + # but i wasn't able to get reliable.. + # # reconnect_start = proxy.status_event( # 'Market data farm is connecting:usfuture' # ) # live_ev = proxy.status_event( # 'Market data farm connection is OK:usfuture' # ) + # try to wait on the reset event(s) to arrive, a timeout # will trigger a retry up to 6 times (for now). client: Client = proxy._aio_ns done = trio.Event() with trio.move_on_after(timeout) as cs: - task_status.started((cs, done)) log.warning( @@ -401,8 +419,9 @@ async def get_bars( bool, # timed out hint ]: ''' - Retrieve historical data from a ``trio``-side task using - a ``MethoProxy``. + Request-n-retrieve historical data frames from a `trio.Task` + using a `MethoProxy` to query the `asyncio`-side's + `.ib.api.Client` methods. ''' global _data_resetter_task, _failed_resets @@ -661,14 +680,14 @@ async def get_bars( ) +# per-actor cache of inter-eventloop-chans _quote_streams: dict[str, trio.abc.ReceiveStream] = {} +# TODO! update to the new style sig with, +# `chan: to_asyncio.LinkedTaskChannel,` async def _setup_quote_stream( - - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - + chan: tractor.to_asyncio.LinkedTaskChannel, symbol: str, opts: tuple[int] = ( '375', # RT trade volume (excludes utrades) @@ -686,10 +705,13 @@ async def _setup_quote_stream( ) -> trio.abc.ReceiveChannel: ''' - Stream a ticker using the std L1 api. + Stream L1 quotes via the `Ticker.updateEvent.connect(push)` + callback API by registering a `push` callback which simply + `chan.send_nowait()`s quote msgs back to the calling + parent-`trio.Task`-side. - This task is ``asyncio``-side and must be called from - ``tractor.to_asyncio.open_channel_from()``. + NOTE, that this task-fn is run on the `asyncio.Task`-side ONLY + and is thus run via `tractor.to_asyncio.open_channel_from()`. ''' global _quote_streams @@ -698,45 +720,78 @@ async def _setup_quote_stream( disconnect_on_exit=False, ) as accts2clients: - # since asyncio.Task + # XXX since this is an `asyncio.Task`, we must use # tractor.pause_from_sync() caccount_name, client = get_preferred_data_client(accts2clients) - contract = contract or (await client.find_contract(symbol)) - to_trio.send_nowait(contract) # cuz why not + contract = ( + contract + or + (await client.find_contract(symbol)) + ) + chan.started_nowait(contract) # cuz why not ticker: Ticker = client.ib.reqMktData( contract, ','.join(opts), ) + maybe_exc: BaseException|None = None + handler_tries: int = 0 + aio_task: asyncio.Task = asyncio.current_task() - # NOTE: it's batch-wise and slow af but I guess could - # be good for backchecking? Seems to be every 5s maybe? + # ?TODO? this API is batch-wise and quite slow-af but, + # - seems to be 5s updates? + # - maybe we could use it for backchecking? + # # ticker: Ticker = client.ib.reqTickByTickData( # contract, 'Last', # ) - # # define a simple queue push routine that streams quote packets - # # to trio over the ``to_trio`` memory channel. - # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore + # define a very naive queue-pushing callback that relays + # quote-packets directly the calling (parent) `trio.Task`. + # Ensure on teardown we cancel the feed via their cancel API. + # def teardown(): + ''' + Disconnect our `push`-er callback and cancel the data-feed + for `contract`. + + ''' + nonlocal maybe_exc ticker.updateEvent.disconnect(push) - log.error( - f'Disconnected stream for `{symbol}`' - ) + report: str = f'Disconnected mkt-data for {symbol!r} due to ' + if maybe_exc is not None: + report += ( + 'error,\n' + f'{maybe_exc!r}\n' + ) + log.error(report) + else: + report += ( + 'cancellation.\n' + ) + log.cancel(report) + client.ib.cancelMktData(contract) # decouple broadcast mem chan _quote_streams.pop(symbol, None) - def push(t: Ticker) -> None: - """ - Push quotes to trio task. - - """ + def push( + t: Ticker, + tries_before_raise: int = 6, + ) -> None: + ''' + Push quotes verbatim to parent-side `trio.Task`. + ''' + nonlocal maybe_exc, handler_tries # log.debug(f'new IB quote: {t}\n') try: - to_trio.send_nowait(t) + chan.send_nowait(t) + + # XXX TODO XXX replicate in `tractor` tests + # as per `CancelledError`-handler notes below! + # assert 0 except ( trio.BrokenResourceError, @@ -756,29 +811,40 @@ async def _setup_quote_stream( # with log msgs except trio.WouldBlock: log.exception( - f'Asyncio->Trio `to_trio.send_nowait()` blocked !?\n' + f'Asyncio->Trio `chan.send_nowait()` blocked !?\n' f'\n' - f'{to_trio.statistics()}\n' + f'{chan._to_trio.statistics()}\n' ) # ?TODO, handle re-connection attempts? except BaseException as _berr: berr = _berr + if handler_tries >= tries_before_raise: + # breakpoint() + maybe_exc = _berr + # task.set_exception(berr) + aio_task.cancel(msg=berr.args) + raise berr + else: + handler_tries += 1 + log.exception( f'Failed to push ticker quote !?\n' - f'cause: {berr}\n' + f'handler_tries={handler_tries!r}\n' + f'ticker: {t!r}\n' f'\n' - f't: {t}\n' - f'{to_trio.statistics}\n' + f'{chan._to_trio.statistics()}\n' + f'\n' + f'CAUSE: {berr}\n' ) - # raise berr ticker.updateEvent.connect(push) try: await asyncio.sleep(float('inf')) - # XXX, just for debug.. + # XXX, for debug.. TODO? can we rm again? + # # tractor.pause_from_sync() # while True: # await asyncio.sleep(1.6) @@ -792,23 +858,55 @@ async def _setup_quote_stream( # 'UHH no ticker.ticks ??' # ) - finally: - teardown() + # XXX TODO XXX !?!? + # apparently **without this handler** and the subsequent + # re-raising of `maybe_exc from _taskc` cancelling the + # `aio_task` from the `push()`-callback will cause a very + # strange chain of exc raising that breaks alll sorts of + # downstream callers, tasks and remote-actor tasks!? + # + # -[ ] we need some lowlevel reproducting tests to replicate + # those worst-case scenarios in `tractor` core!! + # -[ ] likely we should factor-out the `tractor.to_asyncio` + # attempts at workarounds in `.translate_aio_errors()` + # for failed `asyncio.Task.set_exception()` to either + # call `aio_task.cancel()` and/or + # `aio_task._fut_waiter.set_exception()` to a re-useable + # toolset in something like a `.to_asyncio._utils`?? + # + except asyncio.CancelledError as _taskc: + if maybe_exc is not None: + raise maybe_exc from _taskc - # return from_aio + raise _taskc + + except BaseException as _berr: + # stash any crash cause for reporting in `teardown()` + maybe_exc = _berr + raise _berr + + finally: + # always disconnect our `push()` and cancel the + # ib-"mkt-data-feed". + teardown() @acm async def open_aio_quote_stream( - symbol: str, - contract: Contract | None = None, + contract: Contract|None = None, ) -> ( trio.abc.Channel| # iface tractor.to_asyncio.LinkedTaskChannel # actually ): + ''' + Open a real-time `Ticker` quote stream from an `asyncio.Task` + spawned via `tractor.to_asyncio.open_channel_from()`, deliver the + inter-event-loop channel to the `trio.Task` caller and cache it + globally for re-use. + ''' from tractor.trionics import broadcast_receiver global _quote_streams @@ -834,6 +932,10 @@ async def open_aio_quote_stream( assert contract + # TODO? de-reg on teardown of last consumer task? + # -> why aren't we using `.trionics.maybe_open_context()` + # here again?? (we are in `open_client_proxies()` tho?) + # # cache feed for later consumers _quote_streams[symbol] = from_aio @@ -848,7 +950,12 @@ def normalize( calc_price: bool = False ) -> dict: + ''' + Translate `ib_async`'s `Ticker.ticks` values to a `piker` + normalized `dict` form for transmit to downstream `.data` layer + consumers. + ''' # check for special contract types con = ticker.contract fqme, calc_price = con2fqme(con) @@ -911,7 +1018,6 @@ def normalize( # stable. # async def stream_quotes( - send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, From 269b8158e697d07def50c0e5221bd80b1da03378 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 22 Sep 2025 12:58:23 -0400 Subject: [PATCH 06/15] Convert remaining `.to_asyncio.open_channel_from()` to `chan` fn-sig usage --- piker/brokers/ib/api.py | 24 ++++++++++-------------- piker/brokers/ib/feed.py | 2 -- 2 files changed, 10 insertions(+), 16 deletions(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index 8a498bd5..ac936e1b 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -1363,9 +1363,7 @@ async def load_aio_clients( async def load_clients_for_trio( - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - + chan: tractor.to_asyncio.LinkedTaskChannel, ) -> None: ''' Pure async mngr proxy to ``load_aio_clients()``. @@ -1378,8 +1376,7 @@ async def load_clients_for_trio( disconnect_on_exit=False, ) as accts2clients: - to_trio.send_nowait(accts2clients) - + chan.started_nowait(accts2clients) # TODO: maybe a sync event to wait on instead? await asyncio.sleep(float('inf')) @@ -1526,23 +1523,22 @@ class MethodProxy: async def open_aio_client_method_relay( - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, + chan: tractor.to_asyncio.LinkedTaskChannel, client: Client, event_consumers: dict[str, trio.Event], ) -> None: # sync with `open_client_proxy()` caller - to_trio.send_nowait(client) + chan.started_nowait(client) # TODO: separate channel for error handling? - client.inline_errors(to_trio) + client.inline_errors(chan) # relay all method requests to ``asyncio``-side client and deliver # back results - while not to_trio._closed: - msg: tuple[str, dict]|dict|None = await from_trio.get() + while not chan._to_trio._closed: # <- TODO, better check like `._web_bs`? + msg: tuple[str, dict]|dict|None = await chan.get() match msg: case None: # termination sentinel log.info('asyncio `Client` method-proxy SHUTDOWN!') @@ -1555,7 +1551,7 @@ async def open_aio_client_method_relay( try: resp = await meth(**kwargs) # echo the msg back - to_trio.send_nowait({'result': resp}) + chan.send_nowait({'result': resp}) except ( RequestError, @@ -1563,10 +1559,10 @@ async def open_aio_client_method_relay( # TODO: relay all errors to trio? # BaseException, ) as err: - to_trio.send_nowait({'exception': err}) + chan.send_nowait({'exception': err}) case {'error': content}: - to_trio.send_nowait({'exception': content}) + chan.send_nowait({'exception': content}) case _: raise ValueError(f'Unhandled msg {msg}') diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index bb450b31..8d6d9262 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -684,8 +684,6 @@ async def get_bars( _quote_streams: dict[str, trio.abc.ReceiveStream] = {} -# TODO! update to the new style sig with, -# `chan: to_asyncio.LinkedTaskChannel,` async def _setup_quote_stream( chan: tractor.to_asyncio.LinkedTaskChannel, symbol: str, From c1fbf70c62d3855a896de2e992517e700ddcd7d3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 26 Sep 2025 13:02:11 -0400 Subject: [PATCH 07/15] Switch to `pyvnc` for IB reset hackz It actually works for vncAuth(2) (thank god!) which the previous `asyncvnc` **did not**, and seems to be mostly based on the work from the `asyncvnc` author anyway (so all my past efforts don't seem to have been in vain XD). NOTE, the below deats ended up being factored in earlier into the `pyproject.toml` alongside nix(os) support needed for testing and landing this history. As the such, the comments are the originals but the changes are not. Deats, - switch to `pyvnc` async API (using `asyncio` again obvi) in `.ib._util._vnc_click_hack()`. - add `pyvnc` as src installed dep from GH. - drop `asyncvnc` as dep. Other, - update `pytest` version range to avoid weird auto-load plugin exposed by `xonsh`? - add a `tool.pytest.ini_options` to project file with vars to, - disable that^ `xonsh` plug using `addopts = '-p no:xonsh'`. - set a `testpaths` to avoid running anything but that subdir. - try out the `'progress'` style console output (does it work?). --- piker/brokers/ib/_util.py | 48 +++++++++++++++++++++++---------------- 1 file changed, 28 insertions(+), 20 deletions(-) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index 2941284a..cb3affc7 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -197,15 +197,20 @@ async def vnc_click_hack( ) -> None: ''' Reset the data or network connection for the VNC attached - ib gateway using magic combos. + ib-gateway using a (magic) keybinding combo. ''' try: - import asyncvnc + from pyvnc import ( + AsyncVNCClient, + VNCConfig, + Point, + MOUSE_BUTTON_LEFT, + ) except ModuleNotFoundError: log.warning( "In order to leverage `piker`'s built-in data reset hacks, install " - "the `asyncvnc` project: https://github.com/barneygale/asyncvnc" + "the `pyvnc` project: https://github.com/regulad/pyvnc.git" ) return @@ -216,24 +221,27 @@ async def vnc_click_hack( 'connection': 'r' }[reset_type] - async with asyncvnc.connect( - host, - port=port, - - # TODO: doesn't work? - # see, https://github.com/barneygale/asyncvnc/issues/7 - password='doggy', - - ) as client: - - # move to middle of screen - # 640x1800 - client.mouse.move( - x=500, - y=500, + with tractor.devx.open_crash_handler(): + client = await AsyncVNCClient.connect( + VNCConfig( + host=host, + port=port, + password='doggy', + ) ) - client.mouse.click() - client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked + async with client: + # move to middle of screen + # 640x1800 + await client.move( + Point( + 500, + 500, + ) + ) + # ensure the ib-gw window is active + await client.click(MOUSE_BUTTON_LEFT) + # send the hotkeys combo B) + await client.press('Ctrl', 'Alt', key) # keys are stacked def i3ipc_fin_wins_titled( From 6ff9ba2e7868dab1573d8fbf0b64caed1c597d79 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Oct 2025 20:52:01 -0400 Subject: [PATCH 08/15] ib.feed: better no-bars error-log message format --- piker/brokers/ib/feed.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 8d6d9262..b9f63d8a 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -214,7 +214,9 @@ async def open_history_client( # could be trying to retreive bars over weekend if out is None: - log.error(f"Can't grab bars starting at {end_dt}!?!?") + log.error( + f"No bars starting at {end_dt!r} !?!?" + ) if ( end_dt and head_dt From 9fd14ad6ce70ce64bfcb640e2544d7bfe25b4821 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Oct 2025 22:12:56 -0400 Subject: [PATCH 09/15] ib: bump `docker/ib/README.rst` For the new github image, a high-level look at its basic features/usage/docs and prosing around our expected default usage with the `piker.brokers.ib` backend. --- dockering/ib/README.rst | 75 +++++++++++++++++++++++++++++++---------- 1 file changed, 58 insertions(+), 17 deletions(-) diff --git a/dockering/ib/README.rst b/dockering/ib/README.rst index 3f9e01b9..9be11cc5 100644 --- a/dockering/ib/README.rst +++ b/dockering/ib/README.rst @@ -1,30 +1,71 @@ running ``ib`` gateway in ``docker`` ------------------------------------ -We have a config based on the (now defunct) -image from "waytrade": +We have a config based on a well maintained community +image from `@gnzsnz`: -https://github.com/waytrade/ib-gateway-docker +https://github.com/gnzsnz/ib-gateway-docker -To startup this image with our custom settings -simply run the command:: + +To startup this image simply run the command:: docker compose up -And you should have the following socket-available services: +(For further usage^ see the official `docker-compose`_ docs) -- ``x11vnc1@127.0.0.1:3003`` -- ``ib-gw@127.0.0.1:4002`` -You can attach to the container via a VNC client -without password auth. +And you should have the following socket-available services by +default: -SECURITY STUFF!?!?! -------------------- -Though "``ib``" claims they host filter connections outside -localhost (aka ``127.0.0.1``) it's probably better if you filter -the socket at the OS level using a stateless firewall rule:: +- ``x11vnc1 @ 127.0.0.1:5900`` +- ``ib-gw @ 127.0.0.1:4002`` + +You can now attach to the container via a VNC client with password-auth; +here is an example using ``vncclient`` on ``linux``:: + + vncviewer localhost:5900 + + +now enter the pw you set via an (see second code blob) `.env file`_ +or pw-file according to the `credentials section`_. + +If you want to change away from their default config see the example +`docker-compose.yml`-config issue and config-section of the readme, + + - https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#configuration + - https://github.com/gnzsnz/ib-gateway-docker/discussions/103 + +.. _.env file: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#how-to-use-it +.. _docker-compose: https://docs.docker.com/compose/ +.. _credentials section: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#credentials + + +IF you also want to run ``TWS`` +------------------------------- +You can also run it containerized, + +https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#using-tws + + +SECURITY stuff (advanced, only if you're paranoid) +-------------------------------------------------- +First and foremost if doing a "distributed" container setup where you +run the ``ib-gw`` docker container and your connecting API client +(likely ``ib_async`` from python) on **different hosts** be sure to +read the `security considerations`_ section! + +And for a further (somewhat paranoid) perspective from +a long-time-ago serious devops eng.. + +Though "``ib``" claims they filter remote host connections outside +``localhost`` (aka ``127.0.0.1`` on ipv4) it's prolly justified if +you'd like to filter the socket at the *OS level* using a stateless +firewall rule:: ip rule add not unicast iif lo to 0.0.0.0/0 dport 4002 -We will soon have this baked into our own custom image but for -now you'll have to do it urself dawgy. + +We will soon have this either baked into our own custom derivative +image (or patched into the current upstream one after further testin) +but for now you'll have to do it urself, diggity dawg. + +.. _security considerations: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#security-considerations From a392185d2fc92fdfe87e31cc9c91d9052605a29f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 5 Oct 2025 12:32:53 -0400 Subject: [PATCH 10/15] Support per-`ib.vnc_addrs` vnc passwords Such that the `brokers.toml` can contain any of the following = dict|tuple styles, ```toml [ib.vnc_addrs] 4002 = {host = 'localhost', port = 5900, pw = 'doggy'} # host, port, pw 4002 = {host = 'localhost', port = 5900} # host, port, pw 4002 = ['localhost', 5900] # host, port, pw ``` With the first line demonstrating a vnc-server password (as normally set via a `.env` file in the `dockering/ib/` subdir) with the `pw =` field. This obviously removes the hardcoded `'doggy'` password from prior. Impl details in `.brokers.ib._util`: - pass the `ib.api.Client` down into `vnc_click_hack()` doing all config reading within and removing host, port unpacking in the callingn `data_reset_hack()`. - also pass the client `try_xdo_manual()` and comment (with plans to remove) the recently added localhost-only fallback section since we now have a fully working py vnc client again with `pyvnc` B) - in `vnc_click_hack()` match for all the possible config line styles and, * pass any `pw` field to `pyvncVNCConfig`, * continue matching host, port without password, * fallthrough to raising a val-err when neither ^ match. --- piker/brokers/ib/_util.py | 105 ++++++++++++++++++++++++-------------- 1 file changed, 67 insertions(+), 38 deletions(-) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index cb3affc7..1cf50926 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -38,7 +38,6 @@ from piker.brokers._util import get_logger if TYPE_CHECKING: from .api import Client - from ib_insync import IB import i3ipc log = get_logger('piker.brokers.ib') @@ -62,7 +61,7 @@ no_setup_msg:str = ( def try_xdo_manual( - vnc_sockaddr: str, + client: Client, ): ''' Do the "manual" `xdo`-based screen switch + click @@ -79,6 +78,7 @@ def try_xdo_manual( _reset_tech = 'i3ipc_xdotool' return True except OSError: + vnc_sockaddr: str = client.conf.vnc_addrs log.exception( no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) ) @@ -86,7 +86,6 @@ def try_xdo_manual( async def data_reset_hack( - # vnc_host: str, client: Client, reset_type: Literal['data', 'connection'], @@ -118,36 +117,24 @@ async def data_reset_hack( that need to be wrangle. ''' - ib_client: IB = client.ib - # look up any user defined vnc socket address mapped from # a particular API socket port. - api_port: str = str(ib_client.client.port) - vnc_host: str - vnc_port: int - vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs') - - if not vnc_sockaddr: + vnc_addrs: tuple[str]|None = client.conf.get('vnc_addrs') + if not vnc_addrs: log.warning( - no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) + no_setup_msg.format(vnc_sockaddr=client.conf) + 'REQUIRES A `vnc_addrs: array` ENTRY' ) - vnc_host, vnc_port = vnc_sockaddr.get( - api_port, - ('localhost', 3003) - ) global _reset_tech - match _reset_tech: case 'vnc': try: await tractor.to_asyncio.run_task( partial( vnc_click_hack, - host=vnc_host, - port=vnc_port, + client=client, ) ) except ( @@ -158,29 +145,31 @@ async def data_reset_hack( import i3ipc # noqa (since a deps dynamic check) except ModuleNotFoundError: log.warning( - no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) + no_setup_msg.format(vnc_sockaddr=client.conf) ) return False - if vnc_host not in { - 'localhost', - '127.0.0.1', - }: - focussed, matches = i3ipc_fin_wins_titled() - if not matches: - log.warning( - no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) - ) - return False - else: - try_xdo_manual(vnc_sockaddr) + # XXX, Xorg only workaround.. + # TODO? remove now that we have `pyvnc`? + # if vnc_host not in { + # 'localhost', + # '127.0.0.1', + # }: + # focussed, matches = i3ipc_fin_wins_titled() + # if not matches: + # log.warning( + # no_setup_msg.format(vnc_sockaddr=vnc_sockaddr) + # ) + # return False + # else: + # try_xdo_manual(vnc_sockaddr) # localhost but no vnc-client or it borked.. else: - try_xdo_manual(vnc_sockaddr) + try_xdo_manual(client) case 'i3ipc_xdotool': - try_xdo_manual(vnc_sockaddr) + try_xdo_manual(client) # i3ipc_xdotool_manual_click_hack() case _ as tech: @@ -191,15 +180,55 @@ async def data_reset_hack( async def vnc_click_hack( - host: str, - port: int, - reset_type: str = 'data' + client: Client, + reset_type: str = 'data', + pw: str|None = None, + ) -> None: ''' Reset the data or network connection for the VNC attached ib-gateway using a (magic) keybinding combo. + A vnc-server password can be set either by an input `pw` param or + set in the client's config with the latter loaded from the user's + `brokers.toml` in a vnc-addrs-port-mapping section, + + .. code:: toml + + [ib.vnc_addrs] + 4002 = {host = 'localhost', port = 5900, pw = 'doggy'} + ''' + api_port: str = str(client.client.port) + conf: dict = client.conf + vnc_addrs: dict[int, tuple] = conf.get('vnc_addrs') + if not vnc_addrs: + return None + + addr_entry: dict|tuple = vnc_addrs.get( + api_port, + ('localhost', 5900) # a typical default + ) + if pw is None: + match addr_entry: + case ( + host, + port, + ): + pass + + case { + 'host': host, + 'port': port, + 'pw': pw + }: + pass + + case _: + raise ValueError( + f'Invalid `ib.vnc_addrs` entry ?\n' + f'{addr_entry!r}\n' + ) try: from pyvnc import ( AsyncVNCClient, @@ -226,7 +255,7 @@ async def vnc_click_hack( VNCConfig( host=host, port=port, - password='doggy', + password=pw, ) ) async with client: From 05bdac5542b6176132737a8071b88ad40a2659e7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Oct 2025 18:12:33 -0400 Subject: [PATCH 11/15] Woops, fix to read `.api_port` ref from the `Client.ib.client`.. --- piker/brokers/ib/_util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/ib/_util.py b/piker/brokers/ib/_util.py index 1cf50926..00b2d233 100644 --- a/piker/brokers/ib/_util.py +++ b/piker/brokers/ib/_util.py @@ -199,7 +199,7 @@ async def vnc_click_hack( 4002 = {host = 'localhost', port = 5900, pw = 'doggy'} ''' - api_port: str = str(client.client.port) + api_port: str = str(client.ib.client.port) conf: dict = client.conf vnc_addrs: dict[int, tuple] = conf.get('vnc_addrs') if not vnc_addrs: From 1089de024a0f8b663cbcc5435af6a5bada355e8e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 30 Dec 2025 11:02:41 -0500 Subject: [PATCH 12/15] ib: multiline stylings, typing, timeout report --- piker/brokers/ib/api.py | 13 +++++++++---- piker/brokers/ib/broker.py | 8 ++++++-- piker/brokers/ib/feed.py | 7 +++++-- 3 files changed, 20 insertions(+), 8 deletions(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index ac936e1b..5bcc7336 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -944,6 +944,7 @@ class Client: ) if tkr: break + except TimeoutError as err: timeouterr = err await asyncio.sleep(0.01) @@ -952,7 +953,9 @@ class Client: else: if not warnset: log.warning( - f'Quote req timed out..maybe venue is closed?\n' + f'Quote req timed out..\n' + f'Maybe the venue is closed?\n' + f'\n' f'{asdict(contract)}' ) warnset = True @@ -964,9 +967,11 @@ class Client: ) break else: - if timeouterr and raise_on_timeout: - import pdbp - pdbp.set_trace() + if ( + timeouterr + and + raise_on_timeout + ): raise timeouterr if not warnset: diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 70dfda19..5065d678 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -117,7 +117,11 @@ def pack_position( symbol=fqme, currency=con.currency, size=float(pos.position), - avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + avg_price=( + float(pos.avgCost) + / + float(con.multiplier or 1.0) + ), ), ) @@ -558,7 +562,7 @@ async def open_trade_dialog( ledgers: dict[str, TransactionLedger] = {} tables: dict[str, Account] = {} order_msgs: list[Status] = [] - conf = get_config() + conf: dict = get_config() accounts_def_inv: bidict[str, str] = bidict( conf['accounts'] ).inverse diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index b9f63d8a..51305ced 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -1083,7 +1083,8 @@ async def stream_quotes( con: Contract = details.contract first_ticker: Ticker|None = None - with trio.move_on_after(1.6) as quote_cs: + timeout: float = 1.6 + with trio.move_on_after(timeout) as quote_cs: first_ticker: Ticker = await proxy.get_quote( contract=con, raise_on_timeout=False, @@ -1092,7 +1093,9 @@ async def stream_quotes( # XXX should never happen with this ep right? # but if so then, more then likely mkt is closed? if quote_cs.cancelled_caught: - await tractor.pause() + log.warning( + f'First quote req timed out after {timeout!r}s' + ) if first_ticker: first_quote: dict = normalize(first_ticker) From ccb4f7917003ff6f0f7f81a436e8ff22a684ae3c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 13 Feb 2025 10:43:50 -0500 Subject: [PATCH 13/15] Bump various `.brokers.core` doc string content/style --- piker/brokers/core.py | 67 ++++++++++++++++++++++++++++++------------- 1 file changed, 47 insertions(+), 20 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 6111d307..c1aa88ac 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -22,7 +22,9 @@ routines should be primitive data types where possible. """ import inspect from types import ModuleType -from typing import List, Dict, Any, Optional +from typing import ( + Any, +) import trio @@ -34,8 +36,10 @@ from ..accounting import MktPair async def api(brokername: str, methname: str, **kwargs) -> dict: - """Make (proxy through) a broker API call by name and return its result. - """ + ''' + Make (proxy through) a broker API call by name and return its result. + + ''' brokermod = get_brokermod(brokername) async with brokermod.get_client() as client: meth = getattr(client, methname, None) @@ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: async def stocks_quote( brokermod: ModuleType, - tickers: List[str] -) -> Dict[str, Dict[str, Any]]: - """Return quotes dict for ``tickers``. - """ + tickers: list[str] + +) -> dict[str, dict[str, Any]]: + ''' + Return a `dict` of snapshot quotes for the provided input + `tickers`: a `list` of fqmes. + + ''' async with brokermod.get_client() as client: return await client.quote(tickers) @@ -74,13 +82,15 @@ async def stocks_quote( async def option_chain( brokermod: ModuleType, symbol: str, - date: Optional[str] = None, -) -> Dict[str, Dict[str, Dict[str, Any]]]: - """Return option chain for ``symbol`` for ``date``. + date: str|None = None, +) -> dict[str, dict[str, dict[str, Any]]]: + ''' + Return option chain for ``symbol`` for ``date``. By default all expiries are returned. If ``date`` is provided then contract quotes for that single expiry are returned. - """ + + ''' async with brokermod.get_client() as client: if date: id = int((await client.tickers2ids([symbol]))[symbol]) @@ -98,7 +108,7 @@ async def option_chain( # async def contracts( # brokermod: ModuleType, # symbol: str, -# ) -> Dict[str, Dict[str, Dict[str, Any]]]: +# ) -> dict[str, dict[str, dict[str, Any]]]: # """Return option contracts (all expiries) for ``symbol``. # """ # async with brokermod.get_client() as client: @@ -110,15 +120,24 @@ async def bars( brokermod: ModuleType, symbol: str, **kwargs, -) -> Dict[str, Dict[str, Dict[str, Any]]]: - """Return option contracts (all expiries) for ``symbol``. - """ +) -> dict[str, dict[str, dict[str, Any]]]: + ''' + Return option contracts (all expiries) for ``symbol``. + + ''' async with brokermod.get_client() as client: return await client.bars(symbol, **kwargs) -async def search_w_brokerd(name: str, pattern: str) -> dict: +async def search_w_brokerd( + name: str, + pattern: str, +) -> dict: + # TODO: WHY NOT WORK!?! + # when we `step` through the next block? + # import tractor + # await tractor.pause() async with open_cached_client(name) as client: # TODO: support multiple asset type concurrent searches. @@ -130,12 +149,12 @@ async def symbol_search( pattern: str, **kwargs, -) -> Dict[str, Dict[str, Dict[str, Any]]]: +) -> dict[str, dict[str, dict[str, Any]]]: ''' Return symbol info from broker. ''' - results = [] + results: list[str] = [] async def search_backend( brokermod: ModuleType @@ -143,6 +162,13 @@ async def symbol_search( brokername: str = mod.name + # TODO: figure this the FUCK OUT + # -> ok so obvi in the root actor any async task that's + # spawned outside the main tractor-root-actor task needs to + # call this.. + # await tractor.devx._debug.maybe_init_greenback() + # tractor.pause_from_sync() + async with maybe_spawn_brokerd( mod.name, infect_asyncio=getattr( @@ -162,7 +188,6 @@ async def symbol_search( )) async with trio.open_nursery() as n: - for mod in brokermods: n.start_soon(search_backend, mod.name) @@ -172,11 +197,13 @@ async def symbol_search( async def mkt_info( brokermod: ModuleType, fqme: str, + **kwargs, ) -> MktPair: ''' - Return MktPair info from broker including src and dst assets. + Return the `piker.accounting.MktPair` info struct from a given + backend broker tradable src/dst asset pair. ''' async with open_cached_client(brokermod.name) as client: From a0020d485e07e8a769adacc5ed6ae3f9dd5fa448 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Oct 2025 18:10:23 -0400 Subject: [PATCH 14/15] Bump ib-container docs and compose file Add necessary details for the `brokers.toml`, cleanup and link to the new GH container repo in the `docker-compose.yml`. --- dockering/ib/README.rst | 73 +++++++++++++++++++++++++++++++-- dockering/ib/docker-compose.yml | 38 +++++++++++------ 2 files changed, 96 insertions(+), 15 deletions(-) diff --git a/dockering/ib/README.rst b/dockering/ib/README.rst index 9be11cc5..ad441213 100644 --- a/dockering/ib/README.rst +++ b/dockering/ib/README.rst @@ -24,9 +24,8 @@ here is an example using ``vncclient`` on ``linux``:: vncviewer localhost:5900 - -now enter the pw you set via an (see second code blob) `.env file`_ -or pw-file according to the `credentials section`_. +now enter the pw (password) you set via an (see second code blob) +`.env file`_ or pw-file according to the `credentials section`_. If you want to change away from their default config see the example `docker-compose.yml`-config issue and config-section of the readme, @@ -39,6 +38,74 @@ If you want to change away from their default config see the example .. _credentials section: https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#credentials +Connecting to the API from `piker` +--------------------------------- +In order to expose the container's API endpoint to the +`brokerd/datad/ib` actor, we need to add a section to the user's +`brokers.toml` config (note the below is similar to the repo-shipped +template file), + +.. code:: toml + + [ib] + # define the (set of) host-port socketaddrs that + # brokerd.ib will scan to connect to an API endpoint + # (ib-gw or ib-tws listening instances) + hosts = [ + '127.0.0.1', + ] + ports = [ + 4002, # gw + 7497, # tws + ] + + # When API endpoints are being scanned durin startup, the order + # of user-defined-account "names" (as defined below) here + # determines which py-client connection is given priority to be + # used for data-feed-requests by according to whichever client + # connected to an API endpoing which reported the equivalent + # account number for that name. + prefer_data_account = [ + 'paper', + 'margin', + 'ira', + ] + + # define "aliases" (names) for each account number + # such that the names can be reffed and logged throughout + # `piker.accounting` subsys and more easily + # referred to by the user. + # + # These keys will be the set exposed through the order-mode + # account-selection UI so that numbers are never shown. + [ib.accounts] + paper = 'XX0000000' + margin = 'X0000000' + ira = 'X0000000' + + +the broker daemon can also connect to the container's VNC server for +added functionalies including, + +- viewing the API endpoint program's GUI for manual interventions, +- workarounds for historical data throttling using hotkey hacks, + +Add a further section to `brokers.toml` which maps each API-ep's +port to a table of VNC server connection info like, + +.. code:: toml + + [ib.vnc_addrs] + 4002 = {host = 'localhost', port = 5900, pw = 'doggy'} + +The `pw = 'doggy'` here ^ should the same value as the particular +container instances `.env` file setting (when it was run), + +.. code:: ini + + VNC_SERVER_PASSWORD='doggy' + + IF you also want to run ``TWS`` ------------------------------- You can also run it containerized, diff --git a/dockering/ib/docker-compose.yml b/dockering/ib/docker-compose.yml index 2f2db58f..54d62ca9 100644 --- a/dockering/ib/docker-compose.yml +++ b/dockering/ib/docker-compose.yml @@ -1,10 +1,15 @@ -# rework from the original @ -# https://github.com/waytrade/ib-gateway-docker/blob/master/docker-compose.yml -version: "3.5" - +# a community maintained IB API container! +# +# https://github.com/gnzsnz/ib-gateway-docker +# +# For piker we (currently) include some minor deviations +# for some config files in the `volumes` section. +# +# See full configuration settings @ +# - https://github.com/gnzsnz/ib-gateway-docker?tab=readme-ov-file#configuration +# - https://github.com/gnzsnz/ib-gateway-docker/discussions/103 services: - ib_gw_paper: # apparently java is a mega cukc: @@ -50,16 +55,22 @@ services: target: /root/scripts/run_x11_vnc.sh read_only: true - # NOTE:to fill these out, define an `.env` file in the same dir as - # this compose file which looks something like: - # TWS_USERID='myuser' - # TWS_PASSWORD='guest' + # NOTE: an alt method to fill these out is to + # define an `.env` file in the same dir as + # this compose file. environment: TWS_USERID: ${TWS_USERID} + # TWS_USERID: 'myuser' TWS_PASSWORD: ${TWS_PASSWORD} - TRADING_MODE: 'paper' - VNC_SERVER_PASSWORD: 'doggy' - VNC_SERVER_PORT: '3003' + # TWS_PASSWORD: 'guest' + TRADING_MODE: ${TRADING_MODE} + # TRADING_MODE: 'paper' + VNC_SERVER_PASSWORD: ${VNC_SERVER_PASSWORD} + # VNC_SERVER_PASSWORD: 'doggy' + + # TODO, see if we can get this supported like it + # was on the old `waytrade` image? + # VNC_SERVER_PORT: '3003' # ports: # - target: 4002 @@ -76,6 +87,9 @@ services: # - "127.0.0.1:4002:4002" # - "127.0.0.1:5900:5900" + # TODO, a masked but working example of dual paper + live + # ib-gw instances running in a single app run! + # # ib_gw_live: # image: waytrade/ib-gateway:1012.2i # restart: no From 55116eea010cfb262e8f208a06335dce0bd51ca0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Oct 2025 18:25:50 -0400 Subject: [PATCH 15/15] Bump `brokers.toml`, update ib and deribit sections For `[ib]` adjust content to match changes to the `dockering/ib/README.rst` and for `[deribit]` toss in the WIP options related params for anyone who wants to play around with @nt's work. --- config/brokers.toml | 77 +++++++++++++++++++++++++++++---------------- 1 file changed, 50 insertions(+), 27 deletions(-) diff --git a/config/brokers.toml b/config/brokers.toml index 42df5a3e..098a940c 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -1,6 +1,5 @@ -################ # ---- CEXY ---- -################ + [binance] accounts.paper = 'paper' @@ -13,28 +12,41 @@ accounts.spot = 'spot' spot.use_testnet = false spot.api_key = '' spot.api_secret = '' +# ------ binance ------ [deribit] +# std assets key_id = '' key_secret = '' +# options +accounts.option = 'option' +option.use_testnet = false +option.key_id = '' +option.key_secret = '' +# aux logging from `cryptofeed` +option.log.filename = 'cryptofeed.log' +option.log.level = 'DEBUG' +option.log.disabled = true +# ------ deribit ------ [kraken] key_descr = '' api_key = '' secret = '' +# ------ kraken ------ [kucoin] key_id = '' key_secret = '' key_passphrase = '' +# ------ kucoin ------ -################ # -- BROKERZ --- -################ + [questrade] refresh_token = '' access_token = '' @@ -42,44 +54,55 @@ api_server = 'https://api06.iq.questrade.com/' expires_in = 1800 token_type = 'Bearer' expires_at = 1616095326.355846 +# ------ questrade ------ [ib] +# define the (set of) host-port socketaddrs that +# brokerd.ib will scan to connect to an API endpoint +# (ib-gw or ib-tws listening instances) hosts = [ '127.0.0.1', ] -# XXX: the order in which ports will be scanned -# (by the `brokerd` daemon-actor) -# is determined # by the line order here. -# TODO: when we eventually spawn gateways in our -# container, we can just dynamically allocate these -# using IBC. ports = [ 4002, # gw 7497, # tws ] -# XXX: for a paper account the flex web query service -# is not supported so you have to manually download -# and XML report and put it in a location that can be -# accessed by the ``brokerd.ib`` backend code for parsing. -flex_token = '' -flex_trades_query_id = '' # live account - -# when clients are being scanned this determines -# which clients are preferred to be used for data -# feeds based on the order of account names, if -# detected as active on an API client. +# When API endpoints are being scanned durin startup, the order +# of user-defined-account "names" (as defined below) here +# determines which py-client connection is given priority to be +# used for data-feed-requests by according to whichever client +# connected to an API endpoing which reported the equivalent +# account number for that name. prefer_data_account = [ 'paper', 'margin', 'ira', ] +# For long-term trades txn (transaction) history +# processing (i.e your txn ledger with IB) you can +# (automatically for live accounts) query the FLEX +# report system for past history. +# +# (For paper accounts the web query service +# is not supported so you have to manually download +# an XML report and put it in a location that can be +# accessed by our `brokerd.ib` backend code for parsing). +# +flex_token = '' +flex_trades_query_id = '' # live account + +# define "aliases" (names) for each account number +# such that the names can be reffed and logged throughout +# `piker.accounting` subsys and more easily +# referred to by the user. +# +# These keys will be the set exposed through the order-mode +# account-selection UI so that numbers are never shown. [ib.accounts] -# the order in which accounts will be selectable -# in the order mode UI (if found via clients during -# API-app scanning)when a new symbol is loaded. -paper = 'XX0000000' -margin = 'X0000000' -ira = 'X0000000' +paper = 'DU0000000' # <- literal account # +margin = 'U0000000' +ira = 'U0000000' +# ------ ib ------