diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index 03125b20..6fcf11f7 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -39,7 +39,9 @@ class NoData(BrokerError): def resproc( resp: asks.response_objects.Response, log: logging.Logger, - return_json: bool = True + return_json: bool = True, + log_resp: bool = False, + ) -> asks.response_objects.Response: """Process response and return its json content. @@ -52,7 +54,8 @@ def resproc( except json.decoder.JSONDecodeError: log.exception(f"Failed to process {resp}:\n{resp.text}") raise BrokerError(resp.text) - else: + + if log_resp: log.debug(f"Received json contents:\n{colorize_json(json)}") return json if return_json else resp diff --git a/piker/brokers/core.py b/piker/brokers/core.py index b16f46fe..af5da3a1 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -142,15 +142,23 @@ async def symbol_search( brokermods: list[ModuleType], pattern: str, **kwargs, + ) -> Dict[str, Dict[str, Dict[str, Any]]]: - """Return symbol info from broker. - """ + ''' + Return symbol info from broker. + + ''' results = [] - async def search_backend(brokername: str) -> None: + async def search_backend( + brokermod: ModuleType + ) -> None: + + brokername: str = mod.name async with maybe_spawn_brokerd( - brokername, + mod.name, + infect_asyncio=getattr(mod, '_infect_asyncio', False), ) as portal: results.append(( diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 3431dfd6..51724e5a 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -23,14 +23,15 @@ built on it) and thus actor aware API calls must be spawned with """ from contextlib import asynccontextmanager as acm -from dataclasses import asdict +from dataclasses import asdict, astuple from datetime import datetime from functools import partial import itertools from math import isnan from typing import ( - Any, Optional, + Any, Callable, Optional, AsyncIterator, Awaitable, + Union, ) import asyncio from pprint import pformat @@ -40,9 +41,11 @@ import platform from random import randint import time + import trio from trio_typing import TaskStatus import tractor +from tractor import to_asyncio from async_generator import aclosing from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails, Option @@ -58,8 +61,7 @@ import numpy as np from .. import config from ..log import get_logger, get_console_log -from .._daemon import maybe_spawn_brokerd -from ..data._source import from_df +from ..data._source import base_ohlc_dtype from ..data._sharedmem import ShmArray from ._util import SymbolNotFound, NoData from ..clearing._messages import ( @@ -110,13 +112,18 @@ _show_wap_in_history: bool = False # accepting patterns before the kb has settled more then # a quarter second). _search_conf = { - 'pause_period': 6/16, + 'pause_period': 6 / 16, } +# annotation to let backend agnostic code +# know if ``brokerd`` should be spawned with +# ``tractor``'s aio mode. +_infect_asyncio: bool = True + + # overrides to sidestep pretty questionable design decisions in # ``ib_insync``: - class NonShittyWrapper(Wrapper): def tcpDataArrived(self): """Override time stamps to be floats for now. @@ -171,6 +178,13 @@ _adhoc_cmdty_data_map = { 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), } +_futes_venues = ( + 'GLOBEX', + 'NYMEX', + 'CME', + 'CMECRYPTO', +) + _adhoc_futes_set = { # equities @@ -216,12 +230,35 @@ _exch_skip_list = { _enters = 0 +def bars_to_np(bars: list) -> np.ndarray: + ''' + Convert a "bars list thing" (``BarsList`` type from ibis) + into a numpy struct array. + + ''' + # TODO: maybe rewrite this faster with ``numba`` + np_ready = [] + for bardata in bars: + ts = bardata.date.timestamp() + t = astuple(bardata)[:7] + np_ready.append((ts, ) + t[1:7]) + + nparr = np.array( + np_ready, + dtype=base_ohlc_dtype, + ) + assert nparr['time'][0] == bars[0].date.timestamp() + assert nparr['time'][-1] == bars[-1].date.timestamp() + return nparr + + class Client: - """IB wrapped for our broker backend API. + ''' + IB wrapped for our broker backend API. Note: this client requires running inside an ``asyncio`` loop. - """ + ''' _contracts: dict[str, Contract] = {} def __init__( @@ -240,31 +277,35 @@ class Client: async def bars( self, - symbol: str, + fqsn: str, + # EST in ISO 8601 format is required... below is EPOCH - start_dt: str = "1970-01-01T00:00:00.000000-05:00", - end_dt: str = "", + start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", + end_dt: Union[datetime, str] = "", sample_period_s: str = 1, # ohlc sample period period_count: int = int(2e3), # <- max per 1s sample query - is_paid_feed: bool = False, # placeholder ) -> list[dict[str, Any]]: - """Retreive OHLCV bars for a symbol over a range to the present. - """ + ''' + Retreive OHLCV bars for a fqsn over a range to the present. + + ''' bars_kwargs = {'whatToShow': 'TRADES'} global _enters - print(f'ENTER BARS {_enters}') + # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}') + print(f'REQUESTING BARS {_enters} @ end={end_dt}') _enters += 1 - contract = await self.find_contract(symbol) + contract = await self.find_contract(fqsn) bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) # _min = min(2000*100, count) bars = await self.ib.reqHistoricalDataAsync( contract, endDateTime=end_dt, + formatDate=2, # time history length values format: # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` @@ -295,12 +336,10 @@ class Client: ) if not bars: # TODO: raise underlying error here - raise ValueError(f"No bars retreived for {symbol}?") + raise ValueError(f"No bars retreived for {fqsn}?") - # TODO: rewrite this faster with ``numba`` - # convert to pandas dataframe: - df = ibis.util.df(bars) - return bars, from_df(df) + nparr = bars_to_np(bars) + return bars, nparr async def con_deats( self, @@ -316,59 +355,52 @@ class Client: # batch request all details results = await asyncio.gather(*futs) - # XXX: if there is more then one entry in the details list + # one set per future result details = {} for details_set in results: + # XXX: if there is more then one entry in the details list # then the contract is so called "ambiguous". for d in details_set: con = d.contract - unique_sym = f'{con.symbol}.{con.primaryExchange}' - as_dict = asdict(d) + key = '.'.join([ + con.symbol, + con.primaryExchange or con.exchange, + ]) + expiry = con.lastTradeDateOrContractMonth + if expiry: + key += f'.{expiry}' + # nested dataclass we probably don't need and that - # won't IPC serialize - as_dict.pop('secIdList') + # won't IPC serialize.. + d.secIdList = '' - details[unique_sym] = as_dict + details[key] = d return details async def search_stocks( self, pattern: str, - - get_details: bool = False, - # how many contracts to search "up to" - upto: int = 3, + upto: int = 3, # how many contracts to search "up to" ) -> dict[str, ContractDetails]: - """Search for stocks matching provided ``str`` pattern. + ''' + Search for stocks matching provided ``str`` pattern. Return a dictionary of ``upto`` entries worth of contract details. - """ + + ''' descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) - if descriptions is not None: - descrs = descriptions[:upto] - - if get_details: - return await self.con_deats([d.contract for d in descrs]) - - else: - results = {} - for d in descrs: - con = d.contract - # sometimes there's a weird extra suffix returned - # from search? - exch = con.primaryExchange.rsplit('.')[0] - unique_sym = f'{con.symbol}.{exch}' - results[unique_sym] = {} - - return results - else: + if descriptions is None: return {} + # limit + descrs = descriptions[:upto] + return await self.con_deats([d.contract for d in descrs]) + async def search_symbols( self, pattern: str, @@ -380,26 +412,72 @@ class Client: # TODO add search though our adhoc-locally defined symbol set # for futes/cmdtys/ - return await self.search_stocks(pattern, upto, get_details=True) + results = await self.search_stocks( + pattern, + upto=upto, + ) - async def get_cont_fute( + for key, deats in results.copy().items(): + + tract = deats.contract + sym = tract.symbol + sectype = tract.secType + + if sectype == 'IND': + results[f'{sym}.IND'] = tract + results.pop(key) + exch = tract.exchange + + if exch in _futes_venues: + # try get all possible contracts for symbol as per, + # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut + con = ibis.Future( + symbol=sym, + exchange=exch, + ) + try: + all_deats = await self.con_deats([con]) + results |= all_deats + + except RequestError as err: + log.warning(err.message) + + return results + + async def get_fute( self, symbol: str, exchange: str, - ) -> Contract: - """Get an unqualifed contract for the current "continous" future. - """ - contcon = ibis.ContFuture(symbol, exchange=exchange) + expiry: str = '', + front: bool = False, + ) -> Contract: + ''' + Get an unqualifed contract for the current "continous" future. + + ''' # it's the "front" contract returned here - frontcon = (await self.ib.qualifyContractsAsync(contcon))[0] - return ibis.Future(conId=frontcon.conId) + if front: + con = (await self.ib.qualifyContractsAsync( + ibis.ContFuture(symbol, exchange=exchange) + ))[0] + else: + con = (await self.ib.qualifyContractsAsync( + ibis.Future( + symbol, + exchange=exchange, + lastTradeDateOrContractMonth=expiry, + ) + ))[0] + + return con async def find_contract( self, - symbol, + pattern: str, currency: str = 'USD', **kwargs, + ) -> Contract: # TODO: we can't use this currently because @@ -413,11 +491,20 @@ class Client: # XXX UPDATE: we can probably do the tick/trades scraping # inside our eventkit handler instead to bypass this entirely? + if 'ib' in pattern: + from ..data._source import unpack_fqsn + broker, symbol, expiry = unpack_fqsn(pattern) + else: + symbol = pattern + # try: # # give the cache a go # return self._contracts[symbol] # except KeyError: # log.debug(f'Looking up contract for {symbol}') + expiry: str = '' + if symbol.count('.') > 1: + symbol, _, expiry = symbol.rpartition('.') # use heuristics to figure out contract "type" try: @@ -426,9 +513,27 @@ class Client: # likely there's an embedded `.` for a forex pair breakpoint() + qualify: bool = True + # futes - if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): - con = await self.get_cont_fute(symbol=sym, exchange=exch) + if exch in _futes_venues: + if expiry: + # get the "front" contract + contract = await self.get_fute( + symbol=sym, + exchange=exch, + expiry=expiry, + ) + + else: + # get the "front" contract + contract = await self.get_fute( + symbol=sym, + exchange=exch, + front=True, + ) + + qualify = False elif exch in ('FOREX'): currency = '' @@ -468,12 +573,21 @@ class Client: ) try: exch = 'SMART' if not exch else exch - contract = (await self.ib.qualifyContractsAsync(con))[0] + if qualify: + contract = (await self.ib.qualifyContractsAsync(con))[0] + else: + assert contract except IndexError: raise ValueError(f"No contract could be found {con}") - self._contracts[symbol] = contract + self._contracts[pattern] = contract + + # add an aditional entry with expiry suffix if available + conexp = contract.lastTradeDateOrContractMonth + if conexp: + self._contracts[pattern + f'.{conexp}'] = contract + return contract async def get_head_time( @@ -516,12 +630,23 @@ class Client: ''' contract, ticker, details = await self.get_sym_details(symbol) + ready = ticker.updateEvent + # ensure a last price gets filled in before we deliver quote for _ in range(100): if isnan(ticker.last): - await asyncio.sleep(0.01) - log.warning(f'Quote for {symbol} timed out: market is closed?') - ticker = await ticker.updateEvent + + done, pending = await asyncio.wait( + [ready], + timeout=0.1, + ) + if ready in done: + break + else: + log.warning( + f'Quote for {symbol} timed out: market is closed?' + ) + else: log.info(f'Got first quote for {symbol}') break @@ -562,23 +687,28 @@ class Client: # against non-known prices. raise RuntimeError("Can not order {symbol}, no live feed?") - trade = self.ib.placeOrder( - contract, - Order( - orderId=reqid or 0, # stupid api devs.. - action=action.upper(), # BUY/SELL - # lookup the literal account number by name here. - account=account, - orderType='LMT', - lmtPrice=price, - totalQuantity=size, - outsideRth=True, + try: + trade = self.ib.placeOrder( + contract, + Order( + orderId=reqid or 0, # stupid api devs.. + action=action.upper(), # BUY/SELL + # lookup the literal account number by name here. + account=account, + orderType='LMT', + lmtPrice=price, + totalQuantity=size, + outsideRth=True, - optOutSmartRouting=True, - routeMarketableToBbo=True, - designatedLocation='SMART', - ), - ) + optOutSmartRouting=True, + routeMarketableToBbo=True, + designatedLocation='SMART', + ), + ) + except AssertionError: # errrg insync.. + log.warning(f'order for {reqid} already complete?') + # will trigger an error in ems request handler task. + return None # ib doesn't support setting your own id outside # their own weird client int counting ids.. @@ -601,25 +731,38 @@ class Client: def inline_errors( self, to_trio: trio.abc.SendChannel, - ) -> None: - # connect error msgs + ) -> None: + ''' + Setup error relay to the provided ``trio`` mem chan such that + trio tasks can retreive and parse ``asyncio``-side API request + errors. + + ''' def push_err( reqId: int, errorCode: int, errorString: str, contract: Contract, + ) -> None: log.error(errorString) + reason = errorString + + if reqId == -1: + # it's a general event? + key = 'event' + else: + key = 'error' try: to_trio.send_nowait(( - 'error', + key, # error "object" {'reqid': reqId, - 'reason': errorString, + 'reason': reason, 'contract': contract} )) except trio.BrokenResourceError: @@ -799,7 +942,7 @@ async def load_aio_clients( # try: # TODO: support multiple clients allowing for execution on # multiple accounts (including a paper instance running on the - # same machine) and switching between accounts in the EMs + # same machine) and switching between accounts in the ems. _err = None @@ -812,8 +955,8 @@ async def load_aio_clients( accounts_found: dict[str, Client] = {} if ( - client and client.ib.isConnected() or - sockaddr in _scan_ignore + client and client.ib.isConnected() + or sockaddr in _scan_ignore ): continue @@ -822,7 +965,7 @@ async def load_aio_clients( # XXX: not sure if we ever really need to increment the # client id if teardown is sucessful. - client_id = 616 + client_id = 6116 await ib.connectAsync( host, @@ -898,13 +1041,13 @@ async def load_aio_clients( # cache logic to avoid rescanning if we already have all # clients loaded. _scan_ignore.add(sockaddr) - else: - if not _client_cache: - raise ConnectionError( - 'No ib APIs could be found scanning @:\n' - f'{pformat(combos)}\n' - 'Check your `brokers.toml` and/or network' - ) from _err + + if not _client_cache: + raise ConnectionError( + 'No ib APIs could be found scanning @:\n' + f'{pformat(combos)}\n' + 'Check your `brokers.toml` and/or network' + ) from _err # retreive first loaded client clients = list(_client_cache.values()) @@ -973,7 +1116,7 @@ async def _trio_run_client_method( # ): # kwargs['_treat_as_stream'] = True - return await tractor.to_asyncio.run_task( + return await to_asyncio.run_task( _aio_run_client_method, meth=method, client=client, @@ -981,60 +1124,188 @@ async def _trio_run_client_method( ) -class _MethodProxy: +class MethodProxy: + def __init__( self, - portal: tractor.Portal + chan: to_asyncio.LinkedTaskChannel, + event_table: dict[str, trio.Event], + ) -> None: - self._portal = portal + self.chan = chan + self.event_table = event_table async def _run_method( self, *, meth: str = None, **kwargs + ) -> Any: - return await self._portal.run( - _trio_run_client_method, - method=meth, - **kwargs - ) + ''' + Make a ``Client`` method call by requesting through the + ``tractor.to_asyncio`` layer. + + ''' + chan = self.chan + await chan.send((meth, kwargs)) + + while not chan.closed(): + # send through method + ``kwargs: dict`` as pair + msg = await chan.receive() + # print(f'NEXT MSG: {msg}') + + # TODO: py3.10 ``match:`` syntax B) + if 'result' in msg: + res = msg.get('result') + return res + + elif 'exception' in msg: + err = msg.get('exception') + raise err + + elif 'error' in msg: + etype, emsg = msg + log.warning(f'IB error relay: {emsg}') + continue + + else: + log.warning(f'UNKNOWN IB MSG: {msg}') + + def status_event( + self, + pattern: str, + + ) -> Union[dict[str, Any], trio.Event]: + + ev = self.event_table.get(pattern) + + if not ev or ev.is_set(): + # print(f'inserting new data reset event item') + ev = self.event_table[pattern] = trio.Event() + + return ev + + async def wait_for_data_reset(self) -> None: + ''' + Send hacker hot keys to ib program and wait + for the event that declares the data feeds to be + back up before unblocking. + + ''' + ... -def get_client_proxy( +async def open_aio_client_method_relay( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, + event_consumers: dict[str, trio.Event], - portal: tractor.Portal, - target=Client, +) -> None: -) -> _MethodProxy: - - proxy = _MethodProxy(portal) - - # mock all remote methods - for name, method in inspect.getmembers( - target, predicate=inspect.isfunction + async with load_aio_clients() as ( + client, + clients, + accts2clients, ): - if '_' == name[0]: - continue - setattr(proxy, name, partial(proxy._run_method, meth=name)) + to_trio.send_nowait(client) - return proxy + # TODO: separate channel for error handling? + client.inline_errors(to_trio) + + # relay all method requests to ``asyncio``-side client and + # deliver back results + while not to_trio._closed: + msg = await from_trio.get() + if msg is None: + print('asyncio PROXY-RELAY SHUTDOWN') + break + + meth_name, kwargs = msg + meth = getattr(client, meth_name) + + try: + resp = await meth(**kwargs) + # echo the msg back + to_trio.send_nowait({'result': resp}) + + except ( + RequestError, + + # TODO: relay all errors to trio? + # BaseException, + ) as err: + to_trio.send_nowait({'exception': err}) + + +@acm +async def open_client_proxy() -> MethodProxy: + + # try: + event_table = {} + + async with ( + to_asyncio.open_channel_from( + open_aio_client_method_relay, + event_consumers=event_table, + ) as (first, chan), + trio.open_nursery() as relay_n, + ): + + assert isinstance(first, Client) + proxy = MethodProxy(chan, event_table) + + # mock all remote methods on ib ``Client``. + for name, method in inspect.getmembers( + Client, predicate=inspect.isfunction + ): + if '_' == name[0]: + continue + setattr(proxy, name, partial(proxy._run_method, meth=name)) + + async def relay_events(): + + async with chan.subscribe() as msg_stream: + + async for msg in msg_stream: + if 'event' not in msg: + continue + + # if 'event' in msg: + # wake up any system event waiters. + etype, status_msg = msg + reason = status_msg['reason'] + + ev = proxy.event_table.pop(reason, None) + + if ev and ev.statistics().tasks_waiting: + log.info(f'Relaying ib status message: {msg}') + ev.set() + + continue + + relay_n.start_soon(relay_events) + + yield proxy + + # terminate asyncio side task + await chan.send(None) @acm async def get_client( **kwargs, + ) -> Client: - """Init the ``ib_insync`` client in another actor and return + ''' + Init the ``ib_insync`` client in another actor and return a method proxy to it. - """ - async with maybe_spawn_brokerd( - brokername='ib', - infect_asyncio=True, - **kwargs - ) as portal: - proxy_client = get_client_proxy(portal) - yield proxy_client + + ''' + # TODO: the IPC via portal relay layer for when this current + # actor isn't in aio mode. + async with open_client_proxy() as proxy: + yield proxy # https://interactivebrokers.github.io/tws-api/tick_types.html @@ -1062,11 +1333,40 @@ tick_types = { } +# TODO: cython/mypyc/numba this! def normalize( ticker: Ticker, calc_price: bool = False ) -> dict: + + # should be real volume for this contract by default + calc_price = False + + # check for special contract types + con = ticker.contract + if type(con) in ( + ibis.Commodity, + ibis.Forex, + ): + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = con.secType + # no real volume on this tract + calc_price = True + + else: + suffix = con.primaryExchange + if not suffix: + suffix = con.exchange + + # append a `.` to the returned symbol + # key for derivatives that normally is the expiry + # date key. + expiry = con.lastTradeDateOrContractMonth + if expiry: + suffix += f'.{expiry}' + # convert named tuples to dicts so we send usable keys new_ticks = [] for tick in ticker.ticks: @@ -1095,6 +1395,12 @@ def normalize( # serialize for transport data = asdict(ticker) + # generate fqsn with possible specialized suffix + # for derivatives, note the lowercase. + data['symbol'] = data['fqsn'] = '.'.join( + (con.symbol, suffix) + ).lower() + # convert named tuples to dicts for transport tbts = data.get('tickByTicks') if tbts: @@ -1113,82 +1419,163 @@ def normalize( return data +_pacing: str = ( + 'Historical Market Data Service error ' + 'message:Historical data request pacing violation' +) + + async def get_bars( - sym: str, - end_dt: str = "", + proxy: MethodProxy, + fqsn: str, + + # blank to start which tells ib to look up the latest datum + end_dt: str = '', ) -> (dict, np.ndarray): + ''' + Retrieve historical data from a ``trio``-side task using + a ``MethoProxy``. - _err: Optional[Exception] = None + ''' + import pendulum fails = 0 - for _ in range(2): - try: + bars: Optional[list] = None + first_dt: datetime = None + last_dt: datetime = None - bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, + if end_dt: + last_dt = pendulum.from_timestamp(end_dt.timestamp()) + + for _ in range(10): + try: + bars, bars_array = await proxy.bars( + fqsn=fqsn, end_dt=end_dt, ) if bars_array is None: - raise SymbolNotFound(sym) + raise SymbolNotFound(fqsn) - next_dt = bars[0].date + first_dt = pendulum.from_timestamp( + bars[0].date.timestamp()) - return (bars, bars_array, next_dt), fails + last_dt = pendulum.from_timestamp( + bars[-1].date.timestamp()) + + time = bars_array['time'] + assert time[-1] == last_dt.timestamp() + assert time[0] == first_dt.timestamp() + log.info(f'bars retreived for dts {first_dt}:{last_dt}') + + return (bars, bars_array, first_dt, last_dt), fails except RequestError as err: - _err = err + msg = err.message + # why do we always need to rebind this? + # _err = err - # TODO: retreive underlying ``ib_insync`` error? - if err.code == 162: + if 'No market data permissions for' in msg: + # TODO: signalling for no permissions searches + raise NoData(f'Symbol: {fqsn}') + break - # TODO: so this error is normally raised (it seems) if - # we try to retrieve history for a time range for which - # there is none. in that case we should not only report - # the "empty range" but also do a iteration on the time - # step for ``next_dt`` to see if we can pull older - # history. - if 'HMDS query returned no data' in err.message: - # means we hit some kind of historical "empty space" - # and further requests will need to decrement the - # start time dt in order to not receive a further - # error? - # OLDER: seem to always cause throttling despite low rps + elif ( + err.code == 162 + and 'HMDS query returned no data' in err.message + ): + # try to decrement start point and look further back + end_dt = last_dt = last_dt.subtract(seconds=2000) + log.warning( + f'No data found ending @ {end_dt}\n' + f'Starting another request for {end_dt}' + ) - # raise err - break + continue - elif 'No market data permissions for' in err.message: + elif _pacing in msg: - # TODO: signalling for no permissions searches - raise NoData(f'Symbol: {sym}') - break + log.warning( + 'History throttle rate reached!\n' + 'Resetting farms with `ctrl-alt-f` hack\n' + ) + # TODO: we might have to put a task lock around this + # method.. + hist_ev = proxy.status_event( + 'HMDS data farm connection is OK:ushmds' + ) + # live_ev = proxy.status_event( + # # 'Market data farm connection is OK:usfuture' + # 'Market data farm connection is OK:usfarm' + # ) + # TODO: some kinda resp here that indicates success + # otherwise retry? + await data_reset_hack() - else: - log.exception( - "Data query rate reached: Press `ctrl-alt-f`" - "in TWS" - ) - print(_err) + # TODO: a while loop here if we timeout? + for name, ev in [ + ('history', hist_ev), + # ('live', live_ev), + ]: + with trio.move_on_after(22) as cs: + await ev.wait() + log.info(f"{name} DATA RESET") - # TODO: should probably create some alert on screen - # and then somehow get that to trigger an event here - # that restarts/resumes this task? - await tractor.breakpoint() - fails += 1 - continue + if cs.cancelled_caught: + log.warning("reset hack failed on first try?") + # await tractor.breakpoint() + + fails += 1 + continue + + else: + raise return None, None # else: # throttle wasn't fixed so error out immediately # raise _err +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + async with open_client_proxy() as proxy: + + async def get_hist( + end_dt: str, + start_dt: str = '', + + ) -> tuple[np.ndarray, str]: + + out, fails = await get_bars(proxy, symbol, end_dt=end_dt) + + # TODO: add logic here to handle tradable hours and only grab + # valid bars in the range + if out == (None, None): + # could be trying to retreive bars over weekend + log.error(f"Can't grab bars starting at {end_dt}!?!?") + raise NoData(f'{end_dt}') + + bars, bars_array, first_dt, last_dt = out + + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + + return bars_array, first_dt, last_dt + + yield get_hist + + async def backfill_bars( - sym: str, + fqsn: str, shm: ShmArray, # type: ignore # noqa # TODO: we want to avoid overrunning the underlying shm array buffer @@ -1208,56 +1595,66 @@ async def backfill_bars( https://github.com/pikers/piker/issues/128 ''' - if platform.system() == 'Windows': - log.warning( - 'Decreasing history query count to 4 since, windows...') - count = 4 - - out, fails = await get_bars(sym) - - if out is None: - raise RuntimeError("Could not pull currrent history?!") - - (first_bars, bars_array, next_dt) = out - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - - # write historical data to buffer - shm.push(bars_array) + # last_dt1 = None + last_dt = None with trio.CancelScope() as cs: - task_status.started(cs) + # async with open_history_client(fqsn) as proxy: + async with open_client_proxy() as proxy: - i = 0 - while i < count: + if platform.system() == 'Windows': + log.warning( + 'Decreasing history query count to 4 since, windows...') + count = 4 - out, fails = await get_bars(sym, end_dt=next_dt) + out, fails = await get_bars(proxy, fqsn) - if fails is None or fails > 1: - break + if out is None: + raise RuntimeError("Could not pull currrent history?!") - if out == (None, None): - # could be trying to retreive bars over weekend - # TODO: add logic here to handle tradable hours and only grab - # valid bars in the range - log.error(f"Can't grab bars starting at {next_dt}!?!?") - continue - - bars, bars_array, next_dt = out - - # volume cleaning since there's -ve entries, - # wood luv to know what crookery that is.. + (first_bars, bars_array, first_dt, last_dt) = out vlm = bars_array['volume'] vlm[vlm < 0] = 0 - # TODO we should probably dig into forums to see what peeps - # think this data "means" and then use it as an indicator of - # sorts? dinkus has mentioned that $vlms for the day dont' - # match other platforms nor the summary stat tws shows in - # the monitor - it's probably worth investigating. + last_dt = first_dt - shm.push(bars_array, prepend=True) - i += 1 + # write historical data to buffer + shm.push(bars_array) + + task_status.started(cs) + + i = 0 + while i < count: + + out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) + + if out is None: + # could be trying to retreive bars over weekend + # TODO: add logic here to handle tradable hours and + # only grab valid bars in the range + log.error(f"Can't grab bars starting at {first_dt}!?!?") + + # XXX: get_bars() should internally decrement dt by + # 2k seconds and try again. + continue + + (first_bars, bars_array, first_dt, last_dt) = out + # last_dt1 = last_dt + # last_dt = first_dt + + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + + # TODO we should probably dig into forums to see what peeps + # think this data "means" and then use it as an indicator of + # sorts? dinkus has mentioned that $vlms for the day dont' + # match other platforms nor the summary stat tws shows in + # the monitor - it's probably worth investigating. + + shm.push(bars_array, prepend=True) + i += 1 asset_type_map = { @@ -1303,8 +1700,10 @@ async def _setup_quote_stream( contract: Optional[Contract] = None, ) -> trio.abc.ReceiveChannel: - """Stream a ticker using the std L1 api. - """ + ''' + Stream a ticker using the std L1 api. + + ''' global _quote_streams to_trio.send_nowait(None) @@ -1392,7 +1791,10 @@ async def open_aio_quote_stream( if from_aio: # if we already have a cached feed deliver a rx side clone to consumer - async with broadcast_receiver(from_aio) as from_aio: + async with broadcast_receiver( + from_aio, + 2**6, + ) as from_aio: yield from_aio return @@ -1428,18 +1830,13 @@ async def stream_quotes( ''' # TODO: support multiple subscriptions sym = symbols[0] - details: Optional[dict] = None + log.info(f'request for real-time quotes: {sym}') - contract, first_ticker, details = await _trio_run_client_method( + con, first_ticker, details = await _trio_run_client_method( method='get_sym_details', symbol=sym, ) - - with trio.move_on_after(1): - contract, first_ticker, details = await _trio_run_client_method( - method='get_quote', - symbol=sym, - ) + first_quote = normalize(first_ticker) def mk_init_msgs() -> dict[str, dict]: # pass back some symbol info like min_tick, trading_hours, etc. @@ -1467,46 +1864,25 @@ async def stream_quotes( # and that history has been written sym: { 'symbol_info': syminfo, + 'fqsn': first_quote['fqsn'], } } return init_msgs init_msgs = mk_init_msgs() - con = first_ticker.contract - - # should be real volume for this contract by default - calc_price = False - - # check for special contract types - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - - suffix = con.primaryExchange - if not suffix: - suffix = con.exchange - - else: - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = con.secType - # no real volume on this tract - calc_price = True - - quote = normalize(first_ticker, calc_price=calc_price) - con = quote['contract'] - topic = '.'.join((con['symbol'], suffix)).lower() - quote['symbol'] = topic - - # for compat with upcoming fqsn based derivs search - init_msgs[sym]['fqsn'] = topic - - # pass first quote asap - first_quote = quote + # TODO: we should instead spawn a task that waits on a feed to start + # and let it wait indefinitely..instead of this hard coded stuff. + with trio.move_on_after(1): + contract, first_ticker, details = await _trio_run_client_method( + method='get_quote', + symbol=sym, + ) # it might be outside regular trading hours so see if we can at # least grab history. if isnan(first_ticker.last): - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, first_quote)) # it's not really live but this will unblock # the brokerd feed task to tell the ui to update? @@ -1517,30 +1893,32 @@ async def stream_quotes( return # we never expect feed to come up? async with open_aio_quote_stream( - symbol=sym, contract=contract + symbol=sym, + contract=con, ) as stream: # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) first_ticker.ticks = [] - log.debug(f"First ticker received {quote}") - - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, first_quote)) async with aclosing(stream): - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - # suffix = 'exchange' - # calc_price = False # should be real volume for contract - + if type(first_ticker.contract) not in ( + ibis.Commodity, + ibis.Forex + ): # wait for real volume on feed (trading might be closed) while True: - ticker = await stream.receive() # for a real volume contract we rait for the first # "real" trade to take place - if not calc_price and not ticker.rtTime: + if ( + # not calc_price + # and not ticker.rtTime + not ticker.rtTime + ): # spin consuming tickers until we get a real # market datum log.debug(f"New unsent ticker: {ticker}") @@ -1555,21 +1933,16 @@ async def stream_quotes( # ``aclosing()`` above? break + quote = normalize(ticker) + log.debug(f"First ticker received {quote}") + # tell caller quotes are now coming in live feed_is_live.set() # last = time.time() async for ticker in stream: - # print(f'ticker rate: {1/(time.time() - last)}') - - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price - ) - - quote['symbol'] = topic - await send_chan.send({topic: quote}) + quote = normalize(ticker) + await send_chan.send({quote['fqsn']: quote}) # ugh, clear ticks since we've consumed them ticker.ticks = [] @@ -1587,11 +1960,11 @@ def pack_position( symbol = con.localSymbol.replace(' ', '') else: + # TODO: lookup fqsn even for derivs. symbol = con.symbol.lower() exch = (con.primaryExchange or con.exchange).lower() symkey = '.'.join((symbol, exch)) - if not exch: # attempt to lookup the symbol from our # hacked set.. @@ -1600,7 +1973,11 @@ def pack_position( symkey = sym break - # TODO: options contracts into a sane format.. + expiry = con.lastTradeDateOrContractMonth + if expiry: + symkey += f'.{expiry}' + + # TODO: options contracts into a sane format.. return BrokerdPosition( broker='ib', @@ -1673,6 +2050,12 @@ async def handle_order_requests( # counter - collision prone..) reqid=order.reqid, ) + if reqid is None: + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason='Order already active?', + ).dict()) # deliver ack that order has been submitted to broker routing await ems_order_stream.send( @@ -1901,18 +2284,26 @@ async def deliver_trade_events( msg = pack_position(item) msg.account = accounts_def.inverse[msg.account] - if getattr(msg, 'reqid', 0) < -1: + elif event_name == 'event': - # it's a trade event generated by TWS usage. - log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + # it's either a general system status event or an external + # trade event? + log.info(f"TWS system status: \n{pformat(item)}") - msg.reqid = 'tws-' + str(-1 * msg.reqid) + # TODO: support this again but needs parsing at the callback + # level... + # reqid = item.get('reqid', 0) + # if getattr(msg, 'reqid', 0) < -1: + # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + + continue + + # msg.reqid = 'tws-' + str(-1 * reqid) # mark msg as from "external system" # TODO: probably something better then this.. and start # considering multiplayer/group trades tracking - msg.broker_details['external_src'] = 'tws' - continue + # msg.broker_details['external_src'] = 'tws' # XXX: we always serialize to a dict for msgpack # translations, ideally we can move to an msgspec (or other) @@ -1979,7 +2370,7 @@ async def open_symbol_search( sn.start_soon( stash_results, _trio_run_client_method( - method='search_stocks', + method='search_symbols', pattern=pattern, upto=5, ) @@ -2015,3 +2406,104 @@ async def open_symbol_search( log.debug(f"sending matches: {matches.keys()}") await stream.send(matches) + + +async def data_reset_hack( + reset_type: str = 'data', + +) -> None: + ''' + Run key combos for resetting data feeds and yield back to caller + when complete. + + This is a linux-only hack around: + + https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations + + TODOs: + - a return type that hopefully determines if the hack was + successful. + - other OS support? + - integration with ``ib-gw`` run in docker + Xorg? + + ''' + # TODO: try out this lib instead, seems to be the most modern + # and usess the underlying lib: + # https://github.com/rshk/python-libxdo + + # TODO: seems to be a few libs for python but not sure + # if they support all the sub commands we need, order of + # most recent commit history: + # https://github.com/rr-/pyxdotool + # https://github.com/ShaneHutter/pyxdotool + # https://github.com/cphyc/pyxdotool + + try: + import i3ipc + except ImportError: + return False + log.warning('IB data hack no-supported on ur platformz') + + i3 = i3ipc.Connection() + t = i3.get_tree() + + orig_win_id = t.find_focused().window + + # for tws + win_names: list[str] = [ + 'Interactive Brokers', # tws running in i3 + 'IB Gateway', # gw running in i3 + # 'IB', # gw running in i3 (newer version?) + ] + + combos: dict[str, str] = { + # only required if we need a connection reset. + 'connection': ('ctrl+alt+r', 12), + + # data feed reset. + 'data': ('ctrl+alt+f', 6) + } + + for name in win_names: + results = t.find_titled(name) + print(f'results for {name}: {results}') + if results: + con = results[0] + print(f'Resetting data feed for {name}') + win_id = str(con.window) + w, h = con.rect.width, con.rect.height + + # TODO: only run the reconnect (2nd) kc on a detected + # disconnect? + key_combo, timeout = combos[reset_type] + # for key_combo, timeout in [ + # # only required if we need a connection reset. + # # ('ctrl+alt+r', 12), + # # data feed reset. + # ('ctrl+alt+f', 6) + # ]: + await trio.run_process([ + 'xdotool', + 'windowactivate', '--sync', win_id, + + # move mouse to bottom left of window (where there should + # be nothing to click). + 'mousemove_relative', '--sync', str(w-4), str(h-4), + + # NOTE: we may need to stick a `--retry 3` in here.. + 'click', '--window', win_id, + '--repeat', '3', '1', + + # hackzorzes + 'key', key_combo, + # ], + # timeout=timeout, + ]) + + # re-activate and focus original window + await trio.run_process([ + 'xdotool', + 'windowactivate', '--sync', str(orig_win_id), + 'click', '--window', str(orig_win_id), '1', + ]) + return True diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 103fa6d5..6bc69eb4 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -19,7 +19,6 @@ NumPy compatible shared memory buffers for real-time IPC streaming. """ from __future__ import annotations -from dataclasses import dataclass, asdict from sys import byteorder from typing import Optional from multiprocessing.shared_memory import SharedMemory, _USE_POSIX @@ -30,7 +29,7 @@ if _USE_POSIX: import tractor import numpy as np -from pydantic import BaseModel, validator +from pydantic import BaseModel from ..log import get_logger from ._source import base_iohlc_dtype @@ -39,6 +38,14 @@ from ._source import base_iohlc_dtype log = get_logger(__name__) +# how much is probably dependent on lifestyle +_secs_in_day = int(60 * 60 * 24) +# we try for 3 times but only on a run-every-other-day kinda week. +_default_size = 10 * _secs_in_day +# where to start the new data append index +_rt_buffer_start = int(9*_secs_in_day) + + # Tell the "resource tracker" thing to fuck off. class ManTracker(mantracker.ResourceTracker): def register(self, name, rtype): @@ -152,7 +159,8 @@ def _make_token( class ShmArray: - """A shared memory ``numpy`` (compatible) array API. + ''' + A shared memory ``numpy`` (compatible) array API. An underlying shared memory buffer is allocated based on a user specified ``numpy.ndarray``. This fixed size array @@ -162,7 +170,7 @@ class ShmArray: ``SharedInt`` interfaces) values such that multiple processes can interact with the same array using a synchronized-index. - """ + ''' def __init__( self, shmarr: np.ndarray, @@ -209,7 +217,8 @@ class ShmArray: @property def array(self) -> np.ndarray: - '''Return an up-to-date ``np.ndarray`` view of the + ''' + Return an up-to-date ``np.ndarray`` view of the so-far-written data to the underlying shm buffer. ''' @@ -231,26 +240,34 @@ class ShmArray: def last( self, length: int = 1, + ) -> np.ndarray: + ''' + Return the last ``length``'s worth of ("row") entries from the + array. + + ''' return self.array[-length:] def push( self, data: np.ndarray, + field_map: Optional[dict[str, str]] = None, prepend: bool = False, start: Optional[int] = None, ) -> int: - '''Ring buffer like "push" to append data + ''' + Ring buffer like "push" to append data into the buffer and return updated "last" index. NB: no actual ring logic yet to give a "loop around" on overflow condition, lel. + ''' - self._post_init = True length = len(data) - index = start or self._last.value + index = start if start is not None else self._last.value if prepend: index = self._first.value - length @@ -263,10 +280,15 @@ class ShmArray: end = index + length - fields = self._write_fields + if field_map: + src_names, dst_names = zip(*field_map.items()) + else: + dst_names = src_names = self._write_fields try: - self._array[fields][index:end] = data[fields][:] + self._array[ + list(dst_names) + ][index:end] = data[list(src_names)][:] # NOTE: there was a race here between updating # the first and last indices and when the next reader @@ -281,9 +303,13 @@ class ShmArray: else: self._last.value = end + self._post_init = True return end except ValueError as err: + if field_map: + raise + # should raise if diff detected self.diff_err_fields(data) raise err @@ -336,12 +362,6 @@ class ShmArray: ... -# how much is probably dependent on lifestyle -_secs_in_day = int(60 * 60 * 24) -# we try for 3 times but only on a run-every-other-day kinda week. -_default_size = 3 * _secs_in_day - - def open_shm_array( key: Optional[str] = None, @@ -392,7 +412,24 @@ def open_shm_array( ) ) - last.value = first.value = int(_secs_in_day) + # start the "real-time" updated section after 3-days worth of 1s + # sampled OHLC. this allows appending up to a days worth from + # tick/quote feeds before having to flush to a (tsdb) storage + # backend, and looks something like, + # ------------------------- + # | | i + # _________________________ + # <-------------> <-------> + # history real-time + # + # Once fully "prepended", the history section will leave the + # ``ShmArray._start.value: int = 0`` and the yet-to-be written + # real-time section will start at ``ShmArray.index: int``. + + # this sets the index to 3/4 of the length of the buffer + # leaving a "days worth of second samples" for the real-time + # section. + last.value = first.value = _rt_buffer_start shmarr = ShmArray( array,