diff --git a/config/brokers.toml b/config/brokers.toml index 20216bde..adee409e 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -12,16 +12,34 @@ api_key = "" secret = "" [ib] -host = "127.0.0.1" +hosts = [ + "127.0.0.1", +] +# XXX: the order in which ports will be scanned +# (by the `brokerd` daemon-actor) +# is determined # by the line order here. +# TODO: when we eventually spawn gateways in our +# container, we can just dynamically allocate these +# using IBC. +ports = [ + 4002, # gw + 7497, # tws +] -ports.gw = 4002 -ports.tws = 7497 -ports.order = ["gw", "tws",] +# when clients are being scanned this determines +# which clients are preferred to be used for data +# feeds based on the order of account names, if +# detected as active on an API client. +prefer_data_account = [ + 'paper', + 'margin', + 'ira', +] -accounts.margin = "X0000000" -accounts.ira = "X0000000" -accounts.paper = "XX0000000" - -# the order in which accounts will be selected (if found through -# `brokerd`) when a new symbol is loaded -accounts_order = ['paper', 'margin', 'ira'] +[ib.accounts] +# the order in which accounts will be selectable +# in the order mode UI (if found via clients during +# API-app scanning)when a new symbol is loaded. +paper = "XX0000000" +margin = "X0000000" +ira = "X0000000" diff --git a/dockering/ib/run_x11_vnc.sh b/dockering/ib/run_x11_vnc.sh index c87dc2bb..69b6da85 100755 --- a/dockering/ib/run_x11_vnc.sh +++ b/dockering/ib/run_x11_vnc.sh @@ -13,4 +13,4 @@ x11vnc \ -autoport 3003 \ # can't use this because of ``asyncvnc`` issue: # https://github.com/barneygale/asyncvnc/issues/1 - # -passwd "$VNC_SERVER_PASSWORD" + # -passwd 'ibcansmbz' diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index b861d895..2fed7581 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -22,7 +22,9 @@ built on it) and thus actor aware API calls must be spawned with ``infected_aio==True``. """ +from __future__ import annotations from contextlib import asynccontextmanager as acm +from contextlib import AsyncExitStack from dataclasses import asdict, astuple from datetime import datetime from functools import partial @@ -36,9 +38,8 @@ from typing import ( import asyncio from pprint import pformat import inspect -import logging -from random import randint import time +from types import SimpleNamespace import trio @@ -161,13 +162,23 @@ class NonShittyIB(ibis.IB): - Don't use named tuples """ def __init__(self): + + # override `ib_insync` internal loggers so we can see wtf + # it's doing.. + self._logger = get_logger( + 'ib_insync.ib', + ) self._createEvents() + # XXX: just to override this wrapper self.wrapper = NonShittyWrapper(self) self.client = ib_Client(self.wrapper) + self.client._logger = get_logger( + 'ib_insync.client', + ) + # self.errorEvent += self._onError self.client.apiEnd += self.disconnectedEvent - self._logger = logging.getLogger('ib_insync.ib') # map of symbols to contract ids @@ -276,6 +287,27 @@ class Client: # NOTE: the ib.client here is "throttled" to 45 rps by default + async def trades( + self, + # api_only: bool = False, + + ) -> dict[str, Any]: + + # orders = await self.ib.reqCompletedOrdersAsync( + # apiOnly=api_only + # ) + fills = await self.ib.reqExecutionsAsync() + norm_fills = [] + for fill in fills: + fill = fill._asdict() # namedtuple + for key, val in fill.copy().items(): + if isinstance(val, Contract): + fill[key] = asdict(val) + + norm_fills.append(fill) + + return norm_fills + async def bars( self, fqsn: str, @@ -496,7 +528,7 @@ class Client: # XXX UPDATE: we can probably do the tick/trades scraping # inside our eventkit handler instead to bypass this entirely? - if 'ib' in pattern: + if '.ib' in pattern: from ..data._source import unpack_fqsn broker, symbol, expiry = unpack_fqsn(pattern) else: @@ -512,11 +544,7 @@ class Client: symbol, _, expiry = symbol.rpartition('.') # use heuristics to figure out contract "type" - try: - sym, exch = symbol.upper().rsplit('.', maxsplit=1) - except ValueError: - # likely there's an embedded `.` for a forex pair - breakpoint() + sym, exch = symbol.upper().rsplit('.', maxsplit=1) qualify: bool = True @@ -855,17 +883,7 @@ async def recv_trade_updates( # let the engine run and stream await client.ib.disconnectedEvent - -# default config ports -_tws_port: int = 7497 -_gw_port: int = 4002 -_try_ports = [ - _gw_port, - _tws_port -] -# TODO: remove the randint stuff and use proper error checking in client -# factor below.. -_client_ids = itertools.count(randint(1, 100)) +# per-actor API ep caching _client_cache: dict[tuple[str, int], Client] = {} _scan_ignore: set[tuple[str, int]] = set() @@ -891,10 +909,14 @@ async def load_aio_clients( host: str = '127.0.0.1', port: int = None, + client_id: int = 6116, - client_id: Optional[int] = None, + # the API TCP in `ib_insync` connection can be flaky af so instead + # retry a few times to get the client going.. + connect_retries: int = 3, + connect_timeout: float = 0.5, -) -> Client: +) -> dict[str, Client]: ''' Return an ``ib_insync.IB`` instance wrapped in our client API. @@ -922,140 +944,116 @@ async def load_aio_clients( raise ValueError( 'Specify only one of `host` or `hosts` in `brokers.toml` config') - ports = conf.get( + try_ports = conf.get( 'ports', # default order is to check for gw first - { - 'gw': 4002, - 'tws': 7497, - # 'order': ['gw', 'tws'] - } + [4002, 7497] ) - order = ports.pop('order', None) - if order: - log.warning('`ports.order` section in `brokers.toml` is deprecated') - - accounts_def = config.load_accounts(['ib']) - try_ports = list(ports.values()) - ports = try_ports if port is None else [port] - # we_connected = [] - connect_timeout = 2 - combos = list(itertools.product(hosts, ports)) - - # allocate new and/or reload disconnected but cached clients - # try: - # TODO: support multiple clients allowing for execution on - # multiple accounts (including a paper instance running on the - # same machine) and switching between accounts in the ems. + if isinstance(try_ports, dict): + log.warning( + '`ib.ports` in `brokers.toml` should be a `list` NOT a `dict`' + ) + try_ports = list(try_ports.values()) _err = None + accounts_def = config.load_accounts(['ib']) + ports = try_ports if port is None else [port] + combos = list(itertools.product(hosts, ports)) + accounts_found: dict[str, Client] = {} # (re)load any and all clients that can be found # from connection details in ``brokers.toml``. for host, port in combos: sockaddr = (host, port) - client = _client_cache.get(sockaddr) - accounts_found: dict[str, Client] = {} - if ( - client and client.ib.isConnected() + sockaddr in _client_cache or sockaddr in _scan_ignore ): continue - try: - ib = NonShittyIB() + ib = NonShittyIB() - # XXX: not sure if we ever really need to increment the - # client id if teardown is sucessful. - client_id = 6116 + for i in range(connect_retries): + try: + await ib.connectAsync( + host, + port, + clientId=client_id, - await ib.connectAsync( - host, - port, - clientId=client_id, + # this timeout is sensative on windows and will + # fail without a good "timeout error" so be + # careful. + timeout=connect_timeout, + ) + break - # this timeout is sensative on windows and will - # fail without a good "timeout error" so be - # careful. - timeout=connect_timeout, - ) + except ( + ConnectionRefusedError, - # create and cache client - client = Client(ib) + # TODO: if trying to scan for remote api clients + # pretty sure we need to catch this, though it + # definitely needs a shorter timeout since it hangs + # for like 5s.. + asyncio.exceptions.TimeoutError, + OSError, + ) as ce: + _err = ce - # Pre-collect all accounts available for this - # connection and map account names to this client - # instance. - pps = ib.positions() - if pps: - for pp in pps: - accounts_found[ - accounts_def.inverse[pp.account] - ] = client + if i > 8: + # cache logic to avoid rescanning if we already have all + # clients loaded. + _scan_ignore.add(sockaddr) + raise - # if there are accounts without positions we should still - # register them for this client - for value in ib.accountValues(): - acct_number = value.account + log.warning( + f'Failed to connect on {port} for {i} time, retrying...') - entry = accounts_def.inverse.get(acct_number) - if not entry: - raise ValueError( - 'No section in brokers.toml for account:' - f' {acct_number}\n' - f'Please add entry to continue using this API client' - ) + # create and cache client + client = Client(ib) - # surjection of account names to operating clients. - if acct_number not in accounts_found: - accounts_found[entry] = client + # Pre-collect all accounts available for this + # connection and map account names to this client + # instance. + pps = ib.positions() + if pps: + for pp in pps: + accounts_found[ + accounts_def.inverse[pp.account] + ] = client - log.info( - f'Loaded accounts for client @ {host}:{port}\n' - f'{pformat(accounts_found)}' - ) + # if there are accounts without positions we should still + # register them for this client + for value in ib.accountValues(): + acct_number = value.account - # update all actor-global caches - log.info(f"Caching client for {(host, port)}") - _client_cache[(host, port)] = client + entry = accounts_def.inverse.get(acct_number) + if not entry: + raise ValueError( + 'No section in brokers.toml for account:' + f' {acct_number}\n' + f'Please add entry to continue using this API client' + ) - # we_connected.append((host, port, client)) + # surjection of account names to operating clients. + if acct_number not in accounts_found: + accounts_found[entry] = client - # TODO: don't do it this way, get a gud to_asyncio - # context / .start() system goin.. - def pop_and_discon(): - log.info(f'Disconnecting client {client}') - client.ib.disconnect() - _client_cache.pop((host, port), None) + log.info( + f'Loaded accounts for client @ {host}:{port}\n' + f'{pformat(accounts_found)}' + ) - # NOTE: the above callback **CAN'T FAIL** or shm won't get - # torn down correctly ... - tractor._actor._lifetime_stack.callback(pop_and_discon) + # update all actor-global caches + log.info(f"Caching client for {sockaddr}") + _client_cache[sockaddr] = client - # XXX: why aren't we just updating this directy above - # instead of using the intermediary `accounts_found`? - _accounts2clients.update(accounts_found) - - except ( - ConnectionRefusedError, - - # TODO: if trying to scan for remote api clients - # pretty sure we need to catch this, though it - # definitely needs a shorter timeout since it hangs - # for like 5s.. - asyncio.exceptions.TimeoutError, - OSError, - ) as ce: - _err = ce - log.warning(f'Failed to connect on {port}') - - # cache logic to avoid rescanning if we already have all - # clients loaded. - _scan_ignore.add(sockaddr) + # XXX: why aren't we just updating this directy above + # instead of using the intermediary `accounts_found`? + _accounts2clients.update(accounts_found) + # if we have no clients after the scan loop then error out. if not _client_cache: raise ConnectionError( 'No ib APIs could be found scanning @:\n' @@ -1063,79 +1061,121 @@ async def load_aio_clients( 'Check your `brokers.toml` and/or network' ) from _err - # retreive first loaded client - clients = list(_client_cache.values()) - if clients: - client = clients[0] - - yield client, _client_cache, _accounts2clients - - # TODO: this in a way that works xD - # finally: - # pass - # # async with trio.CancelScope(shield=True): - # for host, port, client in we_connected: - # client.ib.disconnect() - # _client_cache.pop((host, port)) - # raise + try: + yield _accounts2clients + finally: + # TODO: for re-scans we'll want to not teardown clients which + # are up and stable right? + for acct, client in _accounts2clients.items(): + log.info(f'Disconnecting {acct}@{client}') + client.ib.disconnect() + _client_cache.pop((host, port)) -async def _aio_run_client_method( - meth: str, - to_trio=None, - from_trio=None, - client=None, - **kwargs, +async def load_clients_for_trio( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, + ) -> None: + ''' + Pure async mngr proxy to ``load_aio_clients()``. - async with load_aio_clients() as ( - _client, - clients, - accts2clients, + This is a bootstrap entrypoing to call from + a ``tractor.to_asyncio.open_channel_from()``. + + ''' + global _accounts2clients + + if _accounts2clients: + to_trio.send_nowait(_accounts2clients) + await asyncio.sleep(float('inf')) + + else: + async with load_aio_clients() as accts2clients: + to_trio.send_nowait(accts2clients) + + # TODO: maybe a sync event to wait on instead? + await asyncio.sleep(float('inf')) + + +_proxies: dict[str, MethodProxy] = {} + + +@acm +async def open_client_proxies() -> tuple[ + dict[str, MethodProxy], + dict[str, Client], +]: + async with ( + tractor.trionics.maybe_open_context( + # acm_func=open_client_proxies, + acm_func=tractor.to_asyncio.open_channel_from, + kwargs={'target': load_clients_for_trio}, + + # lock around current actor task access + # TODO: maybe this should be the default in tractor? + key=tractor.current_actor().uid, + + ) as (cache_hit, (clients, from_aio)), + + AsyncExitStack() as stack ): - client = client or _client - async_meth = getattr(client, meth) + if cache_hit: + log.info(f'Re-using cached clients: {clients}') - # handle streaming methods - args = tuple(inspect.getfullargspec(async_meth).args) - if to_trio and 'to_trio' in args: - kwargs['to_trio'] = to_trio + for acct_name, client in clients.items(): + proxy = await stack.enter_async_context( + open_client_proxy(client), + ) + _proxies[acct_name] = proxy - log.runtime(f'Running {meth}({kwargs})') - return await async_meth(**kwargs) + yield _proxies, clients -async def _trio_run_client_method( - method: str, - client: Optional[Client] = None, - **kwargs, +def get_preferred_data_client( + clients: dict[str, Client], -) -> None: +) -> tuple[str, Client]: ''' - Asyncio entry point to run tasks against the ``ib_insync`` api. + Load and return the (first found) `Client` instance that is + preferred and should be used for data by iterating, in priority + order, the ``ib.prefer_data_account: list[str]`` account names in + the users ``brokers.toml`` file. ''' - ca = tractor.current_actor() - assert ca.is_infected_aio() + conf = get_config() + data_accounts = conf['prefer_data_account'] - # if the method is an *async gen* stream for it - # meth = getattr(Client, method) + for name in data_accounts: + client = clients.get(f'ib.{name}') + if client: + return name, client + else: + raise ValueError( + 'No preferred data client could be found:\n' + f'{data_accounts}' + ) - # args = tuple(inspect.getfullargspec(meth).args) - # if inspect.isasyncgenfunction(meth) or ( - # # if the method is an *async func* but manually - # # streams back results, make sure to also stream it - # 'to_trio' in args - # ): - # kwargs['_treat_as_stream'] = True +@acm +async def open_data_client() -> MethodProxy: + ''' + Open the first found preferred "data client" as defined in the + user's ``brokers.toml`` in the ``ib.prefer_data_account`` variable + and deliver that client wrapped in a ``MethodProxy``. - return await to_asyncio.run_task( - _aio_run_client_method, - meth=method, - client=client, - **kwargs - ) + ''' + async with ( + open_client_proxies() as (proxies, clients), + ): + account_name, client = get_preferred_data_client(clients) + proxy = proxies.get(f'ib.{account_name}') + if not proxy: + raise ValueError( + f'No preferred data client could be found for {account_name}!' + ) + + yield proxy class MethodProxy: @@ -1144,10 +1184,12 @@ class MethodProxy: self, chan: to_asyncio.LinkedTaskChannel, event_table: dict[str, trio.Event], + asyncio_ns: SimpleNamespace, ) -> None: self.chan = chan self.event_table = event_table + self._aio_ns = asyncio_ns async def _run_method( self, @@ -1213,61 +1255,64 @@ class MethodProxy: async def open_aio_client_method_relay( from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, + client: Client, event_consumers: dict[str, trio.Event], ) -> None: - async with load_aio_clients() as ( - client, - clients, - accts2clients, - ): - to_trio.send_nowait(client) + to_trio.send_nowait(client) - # TODO: separate channel for error handling? - client.inline_errors(to_trio) + # TODO: separate channel for error handling? + client.inline_errors(to_trio) - # relay all method requests to ``asyncio``-side client and - # deliver back results - while not to_trio._closed: - msg = await from_trio.get() - if msg is None: - print('asyncio PROXY-RELAY SHUTDOWN') - break + # relay all method requests to ``asyncio``-side client and deliver + # back results + while not to_trio._closed: + msg = await from_trio.get() + if msg is None: + print('asyncio PROXY-RELAY SHUTDOWN') + break - meth_name, kwargs = msg - meth = getattr(client, meth_name) + meth_name, kwargs = msg + meth = getattr(client, meth_name) - try: - resp = await meth(**kwargs) - # echo the msg back - to_trio.send_nowait({'result': resp}) + try: + resp = await meth(**kwargs) + # echo the msg back + to_trio.send_nowait({'result': resp}) - except ( - RequestError, + except ( + RequestError, - # TODO: relay all errors to trio? - # BaseException, - ) as err: - to_trio.send_nowait({'exception': err}) + # TODO: relay all errors to trio? + # BaseException, + ) as err: + to_trio.send_nowait({'exception': err}) @acm -async def open_client_proxy() -> MethodProxy: +async def open_client_proxy( + client: Client, + +) -> MethodProxy: - # try: event_table = {} async with ( to_asyncio.open_channel_from( open_aio_client_method_relay, + client=client, event_consumers=event_table, ) as (first, chan), trio.open_nursery() as relay_n, ): assert isinstance(first, Client) - proxy = MethodProxy(chan, event_table) + proxy = MethodProxy( + chan, + event_table, + asyncio_ns=first, + ) # mock all remote methods on ib ``Client``. for name, method in inspect.getmembers( @@ -1318,7 +1363,7 @@ async def get_client( ''' # TODO: the IPC via portal relay layer for when this current # actor isn't in aio mode. - async with open_client_proxy() as proxy: + async with open_data_client() as proxy: yield proxy @@ -1463,10 +1508,15 @@ async def get_bars( for _ in range(10): try: - bars, bars_array = await proxy.bars( + out = await proxy.bars( fqsn=fqsn, end_dt=end_dt, ) + if out: + bars, bars_array = out + + else: + await tractor.breakpoint() if bars_array is None: raise SymbolNotFound(fqsn) @@ -1529,29 +1579,69 @@ async def get_bars( hist_ev = proxy.status_event( 'HMDS data farm connection is OK:ushmds' ) - # live_ev = proxy.status_event( - # # 'Market data farm connection is OK:usfuture' - # 'Market data farm connection is OK:usfarm' + + # XXX: other event messages we might want to try and + # wait for but i wasn't able to get any of this + # reliable.. + # reconnect_start = proxy.status_event( + # 'Market data farm is connecting:usfuture' + # ) + # live_ev = proxy.status_event( + # 'Market data farm connection is OK:usfuture' # ) - # TODO: some kinda resp here that indicates success - # otherwise retry? - await data_reset_hack() - # TODO: a while loop here if we timeout? - for name, ev in [ - ('history', hist_ev), - # ('live', live_ev), - ]: - # with trio.move_on_after(22) as cs: - await ev.wait() - log.info(f"{name} DATA RESET") + # try to wait on the reset event(s) to arrive, a timeout + # will trigger a retry up to 6 times (for now). + tries: int = 2 + timeout: float = 10 - # if cs.cancelled_caught: - # log.warning("reset hack failed on first try?") - # await tractor.breakpoint() + # try 3 time with a data reset then fail over to + # a connection reset. + for i in range(1, tries): - fails += 1 - continue + log.warning('Sending DATA RESET request') + await data_reset_hack(reset_type='data') + + with trio.move_on_after(timeout) as cs: + for name, ev in [ + # TODO: not sure if waiting on other events + # is all that useful here or not. in theory + # you could wait on one of the ones above + # first to verify the reset request was + # sent? + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + break + + if cs.cancelled_caught: + fails += 1 + log.warning( + f'Data reset {name} timeout, retrying {i}.' + ) + + continue + else: + + log.warning('Sending CONNECTION RESET') + await data_reset_hack(reset_type='connection') + + with trio.move_on_after(timeout) as cs: + for name, ev in [ + # TODO: not sure if waiting on other events + # is all that useful here or not. in theory + # you could wait on one of the ones above + # first to verify the reset request was + # sent? + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + + if cs.cancelled_caught: + fails += 1 + log.warning('Data CONNECTION RESET timeout!?') else: raise @@ -1566,8 +1656,12 @@ async def open_history_client( symbol: str, ) -> tuple[Callable, int]: + ''' + History retreival endpoint - delivers a historical frame callble + that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. - async with open_client_proxy() as proxy: + ''' + async with open_data_client() as proxy: async def get_hist( end_dt: Optional[datetime] = None, @@ -1579,7 +1673,7 @@ async def open_history_client( # TODO: add logic here to handle tradable hours and only grab # valid bars in the range - if out == (None, None): + if out is None: # could be trying to retreive bars over weekend log.error(f"Can't grab bars starting at {end_dt}!?!?") raise NoData( @@ -1631,8 +1725,7 @@ async def backfill_bars( with trio.CancelScope() as cs: - # async with open_history_client(fqsn) as proxy: - async with open_client_proxy() as proxy: + async with open_data_client() as proxy: out, fails = await get_bars(proxy, fqsn) @@ -1729,16 +1822,16 @@ async def _setup_quote_stream( ''' Stream a ticker using the std L1 api. + This task is ``asyncio``-side and must be called from + ``tractor.to_asyncio.open_channel_from()``. + ''' global _quote_streams to_trio.send_nowait(None) - async with load_aio_clients() as ( - client, - clients, - accts2clients, - ): + async with load_aio_clients() as accts2clients: + caccount_name, client = get_preferred_data_client(accts2clients) contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) @@ -1828,6 +1921,7 @@ async def open_aio_quote_stream( _setup_quote_stream, symbol=symbol, contract=contract, + ) as (first, from_aio): # cache feed for later consumers @@ -1858,122 +1952,132 @@ async def stream_quotes( sym = symbols[0] log.info(f'request for real-time quotes: {sym}') - con, first_ticker, details = await _trio_run_client_method( - method='get_sym_details', - symbol=sym, - ) - first_quote = normalize(first_ticker) - # print(f'first quote: {first_quote}') + async with open_data_client() as proxy: - def mk_init_msgs() -> dict[str, dict]: - # pass back some symbol info like min_tick, trading_hours, etc. - syminfo = asdict(details) - syminfo.update(syminfo['contract']) + con, first_ticker, details = await proxy.get_sym_details(symbol=sym) + first_quote = normalize(first_ticker) + # print(f'first quote: {first_quote}') - # nested dataclass we probably don't need and that won't IPC serialize - syminfo.pop('secIdList') + def mk_init_msgs() -> dict[str, dict]: + ''' + Collect a bunch of meta-data useful for feed startup and + pack in a `dict`-msg. - # TODO: more consistent field translation - atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] + ''' + # pass back some symbol info like min_tick, trading_hours, etc. + syminfo = asdict(details) + syminfo.update(syminfo['contract']) - # for stocks it seems TWS reports too small a tick size - # such that you can't submit orders with that granularity? - min_tick = 0.01 if atype == 'stock' else 0 + # nested dataclass we probably don't need and that won't IPC + # serialize + syminfo.pop('secIdList') - syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) + # TODO: more consistent field translation + atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - # for "traditional" assets, volume is normally discreet, not a float - syminfo['lot_tick_size'] = 0.0 + # for stocks it seems TWS reports too small a tick size + # such that you can't submit orders with that granularity? + min_tick = 0.01 if atype == 'stock' else 0 + + syminfo['price_tick_size'] = max(syminfo['minTick'], min_tick) + + # for "traditional" assets, volume is normally discreet, not + # a float + syminfo['lot_tick_size'] = 0.0 + + ibclient = proxy._aio_ns.ib.client + host, port = ibclient.host, ibclient.port + + # TODO: for loop through all symbols passed in + init_msgs = { + # pass back token, and bool, signalling if we're the writer + # and that history has been written + sym: { + 'symbol_info': syminfo, + 'fqsn': first_quote['fqsn'], + }, + 'status': { + 'data_ep': f'{host}:{port}', + }, - # TODO: for loop through all symbols passed in - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': syminfo, - 'fqsn': first_quote['fqsn'], } - } - return init_msgs + return init_msgs - init_msgs = mk_init_msgs() + init_msgs = mk_init_msgs() - # TODO: we should instead spawn a task that waits on a feed to start - # and let it wait indefinitely..instead of this hard coded stuff. - with trio.move_on_after(1): - contract, first_ticker, details = await _trio_run_client_method( - method='get_quote', - symbol=sym, - ) + # TODO: we should instead spawn a task that waits on a feed to start + # and let it wait indefinitely..instead of this hard coded stuff. + with trio.move_on_after(1): + contract, first_ticker, details = await proxy.get_quote(symbol=sym) - # it might be outside regular trading hours so see if we can at - # least grab history. - if isnan(first_ticker.last): - task_status.started((init_msgs, first_quote)) + # it might be outside regular trading hours so see if we can at + # least grab history. + if isnan(first_ticker.last): + task_status.started((init_msgs, first_quote)) - # it's not really live but this will unblock - # the brokerd feed task to tell the ui to update? - feed_is_live.set() - - # block and let data history backfill code run. - await trio.sleep_forever() - return # we never expect feed to come up? - - async with open_aio_quote_stream( - symbol=sym, - contract=con, - ) as stream: - - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] - - task_status.started((init_msgs, first_quote)) - - async with aclosing(stream): - if type(first_ticker.contract) not in ( - ibis.Commodity, - ibis.Forex - ): - # wait for real volume on feed (trading might be closed) - while True: - ticker = await stream.receive() - - # for a real volume contract we rait for the first - # "real" trade to take place - if ( - # not calc_price - # and not ticker.rtTime - not ticker.rtTime - ): - # spin consuming tickers until we get a real - # market datum - log.debug(f"New unsent ticker: {ticker}") - continue - else: - log.debug("Received first real volume tick") - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is truly stateful trash) - ticker.ticks = [] - - # XXX: this works because we don't use - # ``aclosing()`` above? - break - - quote = normalize(ticker) - log.debug(f"First ticker received {quote}") - - # tell caller quotes are now coming in live + # it's not really live but this will unblock + # the brokerd feed task to tell the ui to update? feed_is_live.set() - # last = time.time() - async for ticker in stream: - quote = normalize(ticker) - await send_chan.send({quote['fqsn']: quote}) + # block and let data history backfill code run. + await trio.sleep_forever() + return # we never expect feed to come up? + + async with open_aio_quote_stream( + symbol=sym, + contract=con, + ) as stream: + + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] + + task_status.started((init_msgs, first_quote)) + + async with aclosing(stream): + if type(first_ticker.contract) not in ( + ibis.Commodity, + ibis.Forex + ): + # wait for real volume on feed (trading might be closed) + while True: + ticker = await stream.receive() + + # for a real volume contract we rait for the first + # "real" trade to take place + if ( + # not calc_price + # and not ticker.rtTime + not ticker.rtTime + ): + # spin consuming tickers until we get a real + # market datum + log.debug(f"New unsent ticker: {ticker}") + continue + else: + log.debug("Received first real volume tick") + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is truly stateful trash) + ticker.ticks = [] + + # XXX: this works because we don't use + # ``aclosing()`` above? + break + + quote = normalize(ticker) + log.debug(f"First ticker received {quote}") + + # tell caller quotes are now coming in live + feed_is_live.set() - # ugh, clear ticks since we've consumed them - ticker.ticks = [] # last = time.time() + async for ticker in stream: + quote = normalize(ticker) + await send_chan.send({quote['fqsn']: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] + # last = time.time() def pack_position( @@ -2123,11 +2227,16 @@ async def trades_dialogue( # deliver positions to subscriber before anything else all_positions = [] accounts = set() - clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] - async with trio.open_nursery() as nurse: - for account, client in _accounts2clients.items(): + async with ( + trio.open_nursery() as nurse, + open_client_proxies() as (proxies, aioclients), + ): + # for account, client in _accounts2clients.items(): + for account, proxy in proxies.items(): + + client = aioclients[account] async def open_stream( task_status: TaskStatus[ @@ -2149,7 +2258,7 @@ async def trades_dialogue( assert account in accounts_def accounts.add(account) - for client in _client_cache.values(): + for client in aioclients.values(): for pos in client.positions(): msg = pack_position(pos) @@ -2160,6 +2269,16 @@ async def trades_dialogue( all_positions.append(msg.dict()) + trades: list[dict] = [] + for proxy in proxies.values(): + trades.append(await proxy.trades()) + + log.info(f'Loaded {len(trades)} from this session') + # TODO: write trades to local ``trades.toml`` + # - use above per-session trades data and write to local file + # - get the "flex reports" working and pull historical data and + # also save locally. + await ctx.started(( all_positions, tuple(name for name in accounts_def if name in accounts), @@ -2352,95 +2471,96 @@ async def open_symbol_search( ctx: tractor.Context, ) -> None: - # load all symbols locally for fast search + + # TODO: load user defined symbol set locally for fast search? await ctx.started({}) - async with ctx.open_stream() as stream: - - last = time.time() - - async for pattern in stream: - log.debug(f'received {pattern}') - now = time.time() - - assert pattern, 'IB can not accept blank search pattern' - - # throttle search requests to no faster then 1Hz - diff = now - last - if diff < 1.0: - log.debug('throttle sleeping') - await trio.sleep(diff) - try: - pattern = stream.receive_nowait() - except trio.WouldBlock: - pass - - if not pattern or pattern.isspace(): - log.warning('empty pattern received, skipping..') - - # TODO: *BUG* if nothing is returned here the client - # side will cache a null set result and not showing - # anything to the use on re-searches when this query - # timed out. We probably need a special "timeout" msg - # or something... - - # XXX: this unblocks the far end search task which may - # hold up a multi-search nursery block - await stream.send({}) - - continue - - log.debug(f'searching for {pattern}') + async with open_data_client() as proxy: + async with ctx.open_stream() as stream: last = time.time() - # async batch search using api stocks endpoint and module - # defined adhoc symbol set. - stock_results = [] + async for pattern in stream: + log.debug(f'received {pattern}') + now = time.time() - async def stash_results(target: Awaitable[list]): - stock_results.extend(await target) + assert pattern, 'IB can not accept blank search pattern' - async with trio.open_nursery() as sn: - sn.start_soon( - stash_results, - _trio_run_client_method( - method='search_symbols', - pattern=pattern, - upto=5, + # throttle search requests to no faster then 1Hz + diff = now - last + if diff < 1.0: + log.debug('throttle sleeping') + await trio.sleep(diff) + try: + pattern = stream.receive_nowait() + except trio.WouldBlock: + pass + + if not pattern or pattern.isspace(): + log.warning('empty pattern received, skipping..') + + # TODO: *BUG* if nothing is returned here the client + # side will cache a null set result and not showing + # anything to the use on re-searches when this query + # timed out. We probably need a special "timeout" msg + # or something... + + # XXX: this unblocks the far end search task which may + # hold up a multi-search nursery block + await stream.send({}) + + continue + + log.debug(f'searching for {pattern}') + + last = time.time() + + # async batch search using api stocks endpoint and module + # defined adhoc symbol set. + stock_results = [] + + async def stash_results(target: Awaitable[list]): + stock_results.extend(await target) + + async with trio.open_nursery() as sn: + sn.start_soon( + stash_results, + proxy.search_symbols( + pattern=pattern, + upto=5, + ), ) - ) - # trigger async request - await trio.sleep(0) + # trigger async request + await trio.sleep(0) - # match against our ad-hoc set immediately - adhoc_matches = fuzzy.extractBests( + # match against our ad-hoc set immediately + adhoc_matches = fuzzy.extractBests( + pattern, + list(_adhoc_futes_set), + score_cutoff=90, + ) + log.info(f'fuzzy matched adhocs: {adhoc_matches}') + adhoc_match_results = {} + if adhoc_matches: + # TODO: do we need to pull contract details? + adhoc_match_results = {i[0]: {} for i in adhoc_matches} + + log.debug(f'fuzzy matching stocks {stock_results}') + stock_matches = fuzzy.extractBests( pattern, - list(_adhoc_futes_set), - score_cutoff=90, + stock_results, + score_cutoff=50, ) - log.info(f'fuzzy matched adhocs: {adhoc_matches}') - adhoc_match_results = {} - if adhoc_matches: - # TODO: do we need to pull contract details? - adhoc_match_results = {i[0]: {} for i in adhoc_matches} - log.debug(f'fuzzy matching stocks {stock_results}') - stock_matches = fuzzy.extractBests( - pattern, - stock_results, - score_cutoff=50, - ) + matches = adhoc_match_results | { + item[0]: {} for item in stock_matches + } + # TODO: we used to deliver contract details + # {item[2]: item[0] for item in stock_matches} - matches = adhoc_match_results | { - item[0]: {} for item in stock_matches - } - # TODO: we used to deliver contract details - # {item[2]: item[0] for item in stock_matches} - - log.debug(f"sending matches: {matches.keys()}") - await stream.send(matches) + log.debug(f"sending matches: {matches.keys()}") + await stream.send(matches) async def data_reset_hack( @@ -2462,83 +2582,35 @@ async def data_reset_hack( - integration with ``ib-gw`` run in docker + Xorg? ''' - # TODO: try out this lib instead, seems to be the most modern - # and usess the underlying lib: - # https://github.com/rshk/python-libxdo - # TODO: seems to be a few libs for python but not sure - # if they support all the sub commands we need, order of - # most recent commit history: - # https://github.com/rr-/pyxdotool - # https://github.com/ShaneHutter/pyxdotool - # https://github.com/cphyc/pyxdotool + async def vnc_click_hack( + reset_type: str = 'data' + ) -> None: + ''' + Reset the data or netowork connection for the VNC attached + ib gateway using magic combos. - try: - import i3ipc - except ImportError: - return False - log.warning('IB data hack no-supported on ur platformz') + ''' + key = {'data': 'f', 'connection': 'r'}[reset_type] - i3 = i3ipc.Connection() - t = i3.get_tree() + import asyncvnc - orig_win_id = t.find_focused().window + async with asyncvnc.connect( + 'localhost', + port=3003, + # password='ibcansmbz', + ) as client: - # for tws - win_names: list[str] = [ - 'Interactive Brokers', # tws running in i3 - 'IB Gateway', # gw running in i3 - # 'IB', # gw running in i3 (newer version?) - ] + # move to middle of screen + # 640x1800 + client.mouse.move( + x=500, + y=500, + ) + client.mouse.click() + client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked - combos: dict[str, str] = { - # only required if we need a connection reset. - 'connection': ('ctrl+alt+r', 12), + await tractor.to_asyncio.run_task(vnc_click_hack) - # data feed reset. - 'data': ('ctrl+alt+f', 6) - } - - for name in win_names: - results = t.find_titled(name) - print(f'results for {name}: {results}') - if results: - con = results[0] - print(f'Resetting data feed for {name}') - win_id = str(con.window) - w, h = con.rect.width, con.rect.height - - # TODO: only run the reconnect (2nd) kc on a detected - # disconnect? - key_combo, timeout = combos[reset_type] - # for key_combo, timeout in [ - # # only required if we need a connection reset. - # # ('ctrl+alt+r', 12), - # # data feed reset. - # ('ctrl+alt+f', 6) - # ]: - await trio.run_process([ - 'xdotool', - 'windowactivate', '--sync', win_id, - - # move mouse to bottom left of window (where there should - # be nothing to click). - 'mousemove_relative', '--sync', str(w-4), str(h-4), - - # NOTE: we may need to stick a `--retry 3` in here.. - 'click', '--window', win_id, - '--repeat', '3', '1', - - # hackzorzes - 'key', key_combo, - # ], - # timeout=timeout, - ]) - - # re-activate and focus original window - await trio.run_process([ - 'xdotool', - 'windowactivate', '--sync', str(orig_win_id), - 'click', '--window', str(orig_win_id), '1', - ]) + # we don't really need the ``xdotool`` approach any more B) return True diff --git a/piker/data/feed.py b/piker/data/feed.py index e77052bf..605349e9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -46,6 +46,7 @@ import numpy as np from ..brokers import get_brokermod from .._cacheables import maybe_open_context +from ..calc import humanize from ..log import get_logger, get_console_log from .._daemon import ( maybe_spawn_brokerd, @@ -1183,10 +1184,10 @@ class Feed: shm: ShmArray mod: ModuleType first_quotes: dict # symbol names to first quote dicts - _portal: tractor.Portal - stream: trio.abc.ReceiveChannel[dict[str, Any]] + status: dict[str, Any] + throttle_rate: Optional[int] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None @@ -1327,9 +1328,24 @@ async def open_feed( first_quotes=first_quotes, stream=stream, _portal=portal, + status={}, throttle_rate=tick_throttle, ) + # fill out "status info" that the UI can show + host, port = feed.portal.channel.raddr + if host == '127.0.0.1': + host = 'localhost' + + feed.status.update({ + 'actor_name': feed.portal.channel.uid[0], + 'host': host, + 'port': port, + 'shm': f'{humanize(feed.shm._shm.size)}', + 'throttle_rate': feed.throttle_rate, + }) + feed.status.update(init_msg.pop('status', {})) + for sym, data in init_msg.items(): si = data['symbol_info'] fqsn = data['fqsn'] + f'.{brokername}' diff --git a/piker/ui/_feedstatus.py b/piker/ui/_feedstatus.py new file mode 100644 index 00000000..1c9eb772 --- /dev/null +++ b/piker/ui/_feedstatus.py @@ -0,0 +1,83 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Feed status and controls widget(s) for embedding in a UI-pane. + +""" + +from __future__ import annotations +from textwrap import dedent +from typing import TYPE_CHECKING + +# from PyQt5.QtCore import Qt + +from ._style import _font, _font_small +# from ..calc import humanize +from ._label import FormatLabel + +if TYPE_CHECKING: + from ._chart import ChartPlotWidget + from ..data.feed import Feed + from ._forms import FieldsForm + + +def mk_feed_label( + form: FieldsForm, + feed: Feed, + chart: ChartPlotWidget, + +) -> FormatLabel: + ''' + Generate a label from feed meta-data to be displayed + in a UI sidepane. + + TODO: eventually buttons for changing settings over + a feed control protocol. + + ''' + status = feed.status + assert status + + msg = dedent(""" + actor: **{actor_name}**\n + |_ @**{host}:{port}**\n + """) + + for key, val in status.items(): + if key in ('host', 'port', 'actor_name'): + continue + msg += f'\n|_ {key}: **{{{key}}}**\n' + + feed_label = FormatLabel( + fmt_str=msg, + # |_ streams: **{symbols}**\n + font=_font.font, + font_size=_font_small.px_size, + font_color='default_lightest', + ) + + # form.vbox.setAlignment(feed_label, Qt.AlignBottom) + # form.vbox.setAlignment(Qt.AlignBottom) + _ = chart.height() - ( + form.height() + + form.fill_bar.height() + # feed_label.height() + ) + + feed_label.format(**feed.status) + + return feed_label diff --git a/piker/ui/_forms.py b/piker/ui/_forms.py index 3b33e032..c6d09594 100644 --- a/piker/ui/_forms.py +++ b/piker/ui/_forms.py @@ -750,12 +750,12 @@ def mk_order_pane_layout( parent=parent, fields_schema={ 'account': { - 'label': '**account**:', + 'label': '**accnt**:', 'type': 'select', 'default_value': ['paper'], }, 'size_unit': { - 'label': '**allocate**:', + 'label': '**alloc**:', 'type': 'select', 'default_value': [ '$ size', diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 6316f116..3e230b71 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -30,6 +30,7 @@ import uuid from pydantic import BaseModel import tractor import trio +from PyQt5.QtCore import Qt from .. import config from ..clearing._client import open_ems, OrderBook @@ -37,6 +38,7 @@ from ..clearing._allocate import ( mk_allocator, Position, ) +from ._style import _font from ..data._source import Symbol from ..data.feed import Feed from ..log import get_logger @@ -46,7 +48,8 @@ from ._position import ( PositionTracker, SettingsPane, ) -from ._label import FormatLabel +from ._forms import FieldsForm +# from ._label import FormatLabel from ._window import MultiStatus from ..clearing._messages import Order, BrokerdPosition from ._forms import open_form_input_handling @@ -639,63 +642,21 @@ async def open_order_mode( pp_tracker.hide_info() # setup order mode sidepane widgets - form = chart.sidepane - vbox = form.vbox - - from textwrap import dedent - - from PyQt5.QtCore import Qt - - from ._style import _font, _font_small - from ..calc import humanize - - feed_label = FormatLabel( - fmt_str=dedent(""" - actor: **{actor_name}**\n - |_ @**{host}:{port}**\n - |_ throttle_hz: **{throttle_rate}**\n - |_ streams: **{symbols}**\n - |_ shm: **{shm}**\n - """), - font=_font.font, - font_size=_font_small.px_size, - font_color='default_lightest', - ) - - form.feed_label = feed_label - - # add feed info label to top - vbox.insertWidget( - 0, - feed_label, - alignment=Qt.AlignBottom, - ) - # vbox.setAlignment(feed_label, Qt.AlignBottom) - # vbox.setAlignment(Qt.AlignBottom) - _ = chart.height() - ( - form.height() + - form.fill_bar.height() - # feed_label.height() - ) - vbox.setSpacing( + form: FieldsForm = chart.sidepane + form.vbox.setSpacing( int((1 + 5/8)*_font.px_size) ) - # fill in brokerd feed info - host, port = feed.portal.channel.raddr - if host == '127.0.0.1': - host = 'localhost' - mpshm = feed.shm._shm - shmstr = f'{humanize(mpshm.size)}' - form.feed_label.format( - actor_name=feed.portal.channel.uid[0], - host=host, - port=port, - symbols=len(feed.symbols), - shm=shmstr, - throttle_rate=feed.throttle_rate, + from ._feedstatus import mk_feed_label + + feed_label = mk_feed_label( + form, + feed, + chart, ) + # XXX: we set this because? + form.feed_label = feed_label order_pane = SettingsPane( form=form, # XXX: ugh, so hideous... @@ -706,6 +667,11 @@ async def open_order_mode( ) order_pane.set_accounts(list(trackers.keys())) + form.vbox.addWidget( + feed_label, + alignment=Qt.AlignBottom, + ) + # update pp icons for name, tracker in trackers.items(): order_pane.update_account_icons({name: tracker.live_pp}) diff --git a/requirements.txt b/requirements.txt index 78255d32..64dd78c1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -15,3 +15,7 @@ # ``trimeter`` for asysnc history fetching -e git+https://github.com/python-trio/trimeter.git@master#egg=trimeter + + +# ``asyncvnc`` for sending interactions to ib-gw inside docker +-e git+https://github.com/pikers/asyncvnc.git@vid_passthrough#egg=asyncvnc