From d2d3286fb8248b19082ede76fad480d87d809086 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Feb 2022 15:20:13 -0500 Subject: [PATCH 01/21] Use `asyncio` in `Client.get_quote()` --- piker/brokers/ib.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 3431dfd6..f8ad5826 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -516,12 +516,24 @@ class Client: ''' contract, ticker, details = await self.get_sym_details(symbol) + ready = ticker.updateEvent + # ensure a last price gets filled in before we deliver quote for _ in range(100): if isnan(ticker.last): - await asyncio.sleep(0.01) - log.warning(f'Quote for {symbol} timed out: market is closed?') - ticker = await ticker.updateEvent + + done, pending = await asyncio.wait( + [ready], + timeout=0.1, + ) + if ready in done: + break + else: + log.warning( + f'Quote for {symbol} timed out: market is closed?' + ) + + # ticker = await ticker.updateEvent else: log.info(f'Got first quote for {symbol}') break From d32c26c5d7a08bff24fe2f35d1b3ffe415b24ecf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 07:40:29 -0500 Subject: [PATCH 02/21] Add flag to avoid logging json to console --- piker/brokers/_util.py | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/piker/brokers/_util.py b/piker/brokers/_util.py index 03125b20..6fcf11f7 100644 --- a/piker/brokers/_util.py +++ b/piker/brokers/_util.py @@ -39,7 +39,9 @@ class NoData(BrokerError): def resproc( resp: asks.response_objects.Response, log: logging.Logger, - return_json: bool = True + return_json: bool = True, + log_resp: bool = False, + ) -> asks.response_objects.Response: """Process response and return its json content. @@ -52,7 +54,8 @@ def resproc( except json.decoder.JSONDecodeError: log.exception(f"Failed to process {resp}:\n{resp.text}") raise BrokerError(resp.text) - else: + + if log_resp: log.debug(f"Received json contents:\n{colorize_json(json)}") return json if return_json else resp From 7936dcafbf2fe7326f364085c33bdbbe2bbe2c50 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 28 Feb 2022 08:12:40 -0500 Subject: [PATCH 03/21] Make linux timeout the same --- piker/brokers/ib.py | 20 +++++++++----------- 1 file changed, 9 insertions(+), 11 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index f8ad5826..0b06fe68 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -533,7 +533,6 @@ class Client: f'Quote for {symbol} timed out: market is closed?' ) - # ticker = await ticker.updateEvent else: log.info(f'Got first quote for {symbol}') break @@ -811,7 +810,7 @@ async def load_aio_clients( # try: # TODO: support multiple clients allowing for execution on # multiple accounts (including a paper instance running on the - # same machine) and switching between accounts in the EMs + # same machine) and switching between accounts in the ems. _err = None @@ -834,7 +833,7 @@ async def load_aio_clients( # XXX: not sure if we ever really need to increment the # client id if teardown is sucessful. - client_id = 616 + client_id = 6116 await ib.connectAsync( host, @@ -910,13 +909,13 @@ async def load_aio_clients( # cache logic to avoid rescanning if we already have all # clients loaded. _scan_ignore.add(sockaddr) - else: - if not _client_cache: - raise ConnectionError( - 'No ib APIs could be found scanning @:\n' - f'{pformat(combos)}\n' - 'Check your `brokers.toml` and/or network' - ) from _err + + if not _client_cache: + raise ConnectionError( + 'No ib APIs could be found scanning @:\n' + f'{pformat(combos)}\n' + 'Check your `brokers.toml` and/or network' + ) from _err # retreive first loaded client clients = list(_client_cache.values()) @@ -1440,7 +1439,6 @@ async def stream_quotes( ''' # TODO: support multiple subscriptions sym = symbols[0] - details: Optional[dict] = None contract, first_ticker, details = await _trio_run_client_method( method='get_sym_details', From b26b66cc66cd6a92123619c2410fef52321dd011 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Mar 2022 07:02:37 -0500 Subject: [PATCH 04/21] Add context-styled `asyncio` client proxy for ib This adds a new client manager-factory: `open_client_proxy()` which uses the newer `tractor.to_asyncio.open_channel_from()` (and thus the inter-loop-task-channel style) a `aio_client_method_relay()` and a re-implemented `MethodProxy` wrapper to allow transparently calling `asyncio` client methods from `trio` tasks. Use this proxy in the history backfiller task and add a new (prototype) `open_history_client()` which will be used in the new storage management layer. Drop `get_client()` which was the portal wrapping equivalent of the same proxy but with a one-task-per-call approach. Oh, and `Client.bars()` can take `datetime`, so let's use it B) --- piker/brokers/ib.py | 292 +++++++++++++++++++++++++++++++------------- 1 file changed, 204 insertions(+), 88 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 0b06fe68..cbb8f406 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -29,8 +29,9 @@ from functools import partial import itertools from math import isnan from typing import ( - Any, Optional, + Any, Callable, Optional, AsyncIterator, Awaitable, + Union, ) import asyncio from pprint import pformat @@ -43,6 +44,7 @@ import time import trio from trio_typing import TaskStatus import tractor +from tractor import to_asyncio from async_generator import aclosing from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails, Option @@ -58,7 +60,7 @@ import numpy as np from .. import config from ..log import get_logger, get_console_log -from .._daemon import maybe_spawn_brokerd +# from .._daemon import maybe_spawn_brokerd from ..data._source import from_df from ..data._sharedmem import ShmArray from ._util import SymbolNotFound, NoData @@ -217,11 +219,12 @@ _enters = 0 class Client: - """IB wrapped for our broker backend API. + ''' + IB wrapped for our broker backend API. Note: this client requires running inside an ``asyncio`` loop. - """ + ''' _contracts: dict[str, Contract] = {} def __init__( @@ -242,20 +245,22 @@ class Client: self, symbol: str, # EST in ISO 8601 format is required... below is EPOCH - start_dt: str = "1970-01-01T00:00:00.000000-05:00", - end_dt: str = "", + start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", + end_dt: Union[datetime, str ] = "", sample_period_s: str = 1, # ohlc sample period period_count: int = int(2e3), # <- max per 1s sample query is_paid_feed: bool = False, # placeholder ) -> list[dict[str, Any]]: - """Retreive OHLCV bars for a symbol over a range to the present. - """ + ''' + Retreive OHLCV bars for a symbol over a range to the present. + + ''' bars_kwargs = {'whatToShow': 'TRADES'} global _enters - print(f'ENTER BARS {_enters}') + print(f'ENTER BARS {_enters} @ end={end_dt}') _enters += 1 contract = await self.find_contract(symbol) @@ -984,7 +989,7 @@ async def _trio_run_client_method( # ): # kwargs['_treat_as_stream'] = True - return await tractor.to_asyncio.run_task( + return await to_asyncio.run_task( _aio_run_client_method, meth=method, client=client, @@ -992,60 +997,119 @@ async def _trio_run_client_method( ) -class _MethodProxy: +class MethodProxy: + def __init__( self, - portal: tractor.Portal + chan: to_asyncio.LinkedTaskChannel, + ) -> None: - self._portal = portal + self.chan = chan async def _run_method( self, *, meth: str = None, **kwargs + ) -> Any: - return await self._portal.run( - _trio_run_client_method, - method=meth, - **kwargs - ) + ''' + Make a ``Client`` method call by requesting through the + ``tractor.to_asyncio`` layer. + + ''' + chan = self.chan + # send through method + ``kwargs: dict`` as pair + await chan.send((meth, kwargs)) + return await chan.receive() -def get_client_proxy( +async def open_aio_client_method_relay( + from_trio: asyncio.Queue, + to_trio: trio.abc.SendChannel, - portal: tractor.Portal, - target=Client, +) -> None: -) -> _MethodProxy: - - proxy = _MethodProxy(portal) - - # mock all remote methods - for name, method in inspect.getmembers( - target, predicate=inspect.isfunction + async with load_aio_clients() as ( + client, + clients, + accts2clients, ): - if '_' == name[0]: - continue - setattr(proxy, name, partial(proxy._run_method, meth=name)) + to_trio.send_nowait(client) - return proxy + # relay all method requests to ``asyncio``-side client and + # deliver back results + while True: + msg = await from_trio.get() + meth_name, kwargs = msg + + meth = getattr(client, meth_name) + resp = await meth(**kwargs) + + # echo the msg back + to_trio.send_nowait(resp) @acm -async def get_client( - **kwargs, -) -> Client: - """Init the ``ib_insync`` client in another actor and return - a method proxy to it. - """ - async with maybe_spawn_brokerd( - brokername='ib', - infect_asyncio=True, - **kwargs - ) as portal: - proxy_client = get_client_proxy(portal) - yield proxy_client +async def open_client_proxy() -> MethodProxy: + + try: + async with to_asyncio.open_channel_from( + open_aio_client_method_relay, + ) as (first, chan): + + assert isinstance(first, Client) + proxy = MethodProxy(chan) + + # mock all remote methods on ib ``Client``. + for name, method in inspect.getmembers( + Client, predicate=inspect.isfunction + ): + if '_' == name[0]: + continue + setattr(proxy, name, partial(proxy._run_method, meth=name)) + + yield proxy + + except RequestError as err: + code, msg = err.code, err.message + + # TODO: retreive underlying ``ib_insync`` error? + if ( + code == 162 and ( + 'HMDS query returned no data' in msg + or 'No market data permissions for' in msg + ) + ): + # these cases should not cause a task crash + log.warning(msg) + + else: + raise + + +# @acm +# async def get_client( +# **kwargs, + +# ) -> Client: +# ''' +# Init the ``ib_insync`` client in another actor and return +# a method proxy to it. + +# ''' +# async with ( +# maybe_spawn_brokerd( +# brokername='ib', +# infect_asyncio=True, +# **kwargs +# ) as portal, +# ): +# assert 0 + # TODO: the IPC via portal relay layer for when this current + # actor isn't in aio mode. + # open_client_proxy() as proxy, + # yield proxy # https://interactivebrokers.github.io/tws-api/tick_types.html @@ -1126,27 +1190,32 @@ def normalize( async def get_bars( + proxy: MethodProxy, sym: str, end_dt: str = "", ) -> (dict, np.ndarray): + ''' + Retrieve historical data from a ``trio``-side task using + a ``MethoProxy``. + ''' _err: Optional[Exception] = None - fails = 0 + bars: Optional[list] = None + for _ in range(2): try: - bars, bars_array = await _trio_run_client_method( - method='bars', + bars, bars_array = await proxy.bars( symbol=sym, end_dt=end_dt, ) - if bars_array is None: raise SymbolNotFound(sym) next_dt = bars[0].date + print(f'ib datetime {next_dt}') return (bars, bars_array, next_dt), fails @@ -1169,8 +1238,16 @@ async def get_bars( # error? # OLDER: seem to always cause throttling despite low rps - # raise err - break + # TODO: if there is not bars returned from the first + # query we need to manually calculate the next step + # back and convert to an expected datetime format. + # if not bars: + # raise + + # try to decrement start point and look further back + next_dt = bars[0].date + print(f'ib datetime {next_dt}') + continue elif 'No market data permissions for' in err.message: @@ -1197,6 +1274,41 @@ async def get_bars( # raise _err +@acm +async def open_history_client( + symbol: str, + +) -> tuple[Callable, int]: + + async with open_client_proxy() as proxy: + + async def get_hist( + end_dt: str, + start_dt: str = '', + + ) -> tuple[np.ndarray, str]: + + out, fails = await get_bars(proxy, symbol, end_dt=end_dt) + + # TODO: add logic here to handle tradable hours and only grab + # valid bars in the range + if out == (None, None): + # could be trying to retreive bars over weekend + log.error(f"Can't grab bars starting at {end_dt}!?!?") + raise NoData(f'{end_dt}') + + bars, bars_array, next_dt = out + + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + + return bars_array, next_dt + + yield get_hist + + async def backfill_bars( sym: str, @@ -1219,56 +1331,60 @@ async def backfill_bars( https://github.com/pikers/piker/issues/128 ''' - if platform.system() == 'Windows': - log.warning( - 'Decreasing history query count to 4 since, windows...') - count = 4 + # async with open_history_client(sym) as proxy: + async with open_client_proxy() as proxy: - out, fails = await get_bars(sym) + if platform.system() == 'Windows': + log.warning( + 'Decreasing history query count to 4 since, windows...') + count = 4 - if out is None: - raise RuntimeError("Could not pull currrent history?!") + out, fails = await get_bars(proxy, sym) - (first_bars, bars_array, next_dt) = out - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 + if out is None: + raise RuntimeError("Could not pull currrent history?!") - # write historical data to buffer - shm.push(bars_array) + (first_bars, bars_array, next_dt) = out + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 - with trio.CancelScope() as cs: + # write historical data to buffer + shm.push(bars_array) - task_status.started(cs) + with trio.CancelScope() as cs: - i = 0 - while i < count: + task_status.started(cs) - out, fails = await get_bars(sym, end_dt=next_dt) + i = 0 + while i < count: - if fails is None or fails > 1: - break + out, fails = await get_bars(proxy, sym, end_dt=next_dt) - if out == (None, None): - # could be trying to retreive bars over weekend - # TODO: add logic here to handle tradable hours and only grab - # valid bars in the range - log.error(f"Can't grab bars starting at {next_dt}!?!?") - continue + if fails is None or fails > 1: + break - bars, bars_array, next_dt = out + if out == (None, None): + # could be trying to retreive bars over weekend + # TODO: add logic here to handle tradable hours and + # only grab valid bars in the range + log.error(f"Can't grab bars starting at {next_dt}!?!?") + continue - # volume cleaning since there's -ve entries, - # wood luv to know what crookery that is.. - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - # TODO we should probably dig into forums to see what peeps - # think this data "means" and then use it as an indicator of - # sorts? dinkus has mentioned that $vlms for the day dont' - # match other platforms nor the summary stat tws shows in - # the monitor - it's probably worth investigating. + bars, bars_array, next_dt = out - shm.push(bars_array, prepend=True) - i += 1 + # volume cleaning since there's -ve entries, + # wood luv to know what crookery that is.. + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 + + # TODO we should probably dig into forums to see what peeps + # think this data "means" and then use it as an indicator of + # sorts? dinkus has mentioned that $vlms for the day dont' + # match other platforms nor the summary stat tws shows in + # the monitor - it's probably worth investigating. + + shm.push(bars_array, prepend=True) + i += 1 asset_type_map = { From 937406534c02044477b03408ac64eff7517c1fb1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 17 Mar 2022 09:03:44 -0400 Subject: [PATCH 05/21] Maybe spawn `brokerd` in `asyncio` mode if declared in backend mod --- piker/brokers/core.py | 16 ++++++++++++---- 1 file changed, 12 insertions(+), 4 deletions(-) diff --git a/piker/brokers/core.py b/piker/brokers/core.py index b16f46fe..af5da3a1 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -142,15 +142,23 @@ async def symbol_search( brokermods: list[ModuleType], pattern: str, **kwargs, + ) -> Dict[str, Dict[str, Dict[str, Any]]]: - """Return symbol info from broker. - """ + ''' + Return symbol info from broker. + + ''' results = [] - async def search_backend(brokername: str) -> None: + async def search_backend( + brokermod: ModuleType + ) -> None: + + brokername: str = mod.name async with maybe_spawn_brokerd( - brokername, + mod.name, + infect_asyncio=getattr(mod, '_infect_asyncio', False), ) as portal: results.append(( From 1e433ca4f4b1ff9228b93ee43c283f99c6b1cffa Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 09:25:39 -0400 Subject: [PATCH 06/21] Support "expiry" suffixes for derivatives with ib To start we only have futes working but this allows both searching and loading multiple expiries of the same instrument by specifying different expiries with a `.` suffix in the symbol key (eg. `mnq.globex.20220617`). This also paves the way for options contracts which will need something similar plus a strike property. This change set also required a patch to `ib_insync` to allow retrieving multiple "ambiguous" contracts from the `IB.reqContractDetailsAcync()` method, see https://github.com/erdewit/ib_insync/pull/454 for further discussion since the approach here might change. This patch also includes a lot of serious reworking of some `trio`-`asyncio` integration to use the newer `tractor.to_asyncio.open_channel_from()` api and use it (with a relay task) to open a persistent connection with an in-actor `ib_insync` `Client` mostly for history requests. Deats, - annot the module with a `_infect_asyncio: bool` for `tractor` spawning - add a futes venu list - support ambiguous futes contracts lookups so that all expiries will show in search - support both continuous and specific expiry fute contract qualification - allow searching with "fqsn" keys - don't crash on "data not found" errors in history requests - move all quotes msg "topic-key" generation (which should now be a broker-specific fqsn) and per-contract quote processing into `normalize()` - set the fqsn key in the symbol info init msg - use `open_client_proxy()` in bars backfiller endpoint - include expiry suffix in position update keys --- piker/brokers/ib.py | 407 ++++++++++++++++++++++++++++---------------- 1 file changed, 261 insertions(+), 146 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index cbb8f406..ec163fbb 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -60,7 +60,6 @@ import numpy as np from .. import config from ..log import get_logger, get_console_log -# from .._daemon import maybe_spawn_brokerd from ..data._source import from_df from ..data._sharedmem import ShmArray from ._util import SymbolNotFound, NoData @@ -112,13 +111,18 @@ _show_wap_in_history: bool = False # accepting patterns before the kb has settled more then # a quarter second). _search_conf = { - 'pause_period': 6/16, + 'pause_period': 6 / 16, } +# annotation to let backend agnostic code +# know if ``brokerd`` should be spawned with +# ``tractor``'s aio mode. +_infect_asyncio: bool = True + + # overrides to sidestep pretty questionable design decisions in # ``ib_insync``: - class NonShittyWrapper(Wrapper): def tcpDataArrived(self): """Override time stamps to be floats for now. @@ -173,6 +177,13 @@ _adhoc_cmdty_data_map = { 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), } +_futes_venues = ( + 'GLOBEX', + 'NYMEX', + 'CME', + 'CMECRYPTO', +) + _adhoc_futes_set = { # equities @@ -243,10 +254,10 @@ class Client: async def bars( self, - symbol: str, + fqsn: str, # EST in ISO 8601 format is required... below is EPOCH start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", - end_dt: Union[datetime, str ] = "", + end_dt: Union[datetime, str] = "", sample_period_s: str = 1, # ohlc sample period period_count: int = int(2e3), # <- max per 1s sample query @@ -254,7 +265,7 @@ class Client: is_paid_feed: bool = False, # placeholder ) -> list[dict[str, Any]]: ''' - Retreive OHLCV bars for a symbol over a range to the present. + Retreive OHLCV bars for a fqsn over a range to the present. ''' bars_kwargs = {'whatToShow': 'TRADES'} @@ -263,7 +274,7 @@ class Client: print(f'ENTER BARS {_enters} @ end={end_dt}') _enters += 1 - contract = await self.find_contract(symbol) + contract = await self.find_contract(fqsn) bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) # _min = min(2000*100, count) @@ -300,7 +311,7 @@ class Client: ) if not bars: # TODO: raise underlying error here - raise ValueError(f"No bars retreived for {symbol}?") + raise ValueError(f"No bars retreived for {fqsn}?") # TODO: rewrite this faster with ``numba`` # convert to pandas dataframe: @@ -342,23 +353,24 @@ class Client: async def search_stocks( self, pattern: str, - get_details: bool = False, - # how many contracts to search "up to" - upto: int = 3, + upto: int = 3, # how many contracts to search "up to" ) -> dict[str, ContractDetails]: - """Search for stocks matching provided ``str`` pattern. + ''' + Search for stocks matching provided ``str`` pattern. Return a dictionary of ``upto`` entries worth of contract details. - """ + + ''' descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) if descriptions is not None: descrs = descriptions[:upto] if get_details: - return await self.con_deats([d.contract for d in descrs]) + deats = await self.con_deats([d.contract for d in descrs]) + return deats else: results = {} @@ -368,6 +380,10 @@ class Client: # from search? exch = con.primaryExchange.rsplit('.')[0] unique_sym = f'{con.symbol}.{exch}' + expiry = con.lastTradeDateOrContractMonth + if expiry: + unique_sym += f'{expiry}' + results[unique_sym] = {} return results @@ -385,26 +401,75 @@ class Client: # TODO add search though our adhoc-locally defined symbol set # for futes/cmdtys/ - return await self.search_stocks(pattern, upto, get_details=True) + results = await self.search_stocks( + pattern, + upto=upto, + get_details=True, + ) - async def get_cont_fute( + for key, contracts in results.copy().items(): + tract = contracts['contract'] + sym = tract['symbol'] + + sectype = tract['secType'] + if sectype == 'IND': + results[f'{sym}.IND'] = tract + results.pop(key) + exch = tract['exchange'] + + if exch in _futes_venues: + # try get all possible contracts for symbol as per, + # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut + con = Contract( + 'FUT+CONTFUT', + symbol=sym, + exchange=exch, + ) + possibles = await self.ib.qualifyContractsAsync(con) + for i, condict in enumerate(sorted( + map(asdict, possibles), + # sort by expiry + key=lambda con: con['lastTradeDateOrContractMonth'], + )): + expiry = condict['lastTradeDateOrContractMonth'] + results[f'{sym}.{exch}.{expiry}'] = condict + + return results + + async def get_fute( self, symbol: str, exchange: str, - ) -> Contract: - """Get an unqualifed contract for the current "continous" future. - """ - contcon = ibis.ContFuture(symbol, exchange=exchange) + expiry: str = '', + front: bool = False, + ) -> Contract: + ''' + Get an unqualifed contract for the current "continous" future. + + ''' # it's the "front" contract returned here - frontcon = (await self.ib.qualifyContractsAsync(contcon))[0] - return ibis.Future(conId=frontcon.conId) + if front: + con = (await self.ib.qualifyContractsAsync( + ibis.ContFuture(symbol, exchange=exchange) + ))[0] + else: + con = (await self.ib.qualifyContractsAsync( + ibis.Future( + symbol, + exchange=exchange, + lastTradeDateOrContractMonth=expiry, + ) + ))[0] + + return con async def find_contract( self, - symbol, + pattern: str, currency: str = 'USD', **kwargs, + ) -> Contract: # TODO: we can't use this currently because @@ -418,11 +483,20 @@ class Client: # XXX UPDATE: we can probably do the tick/trades scraping # inside our eventkit handler instead to bypass this entirely? + if 'ib' in pattern: + from ..data._source import uncons_fqsn + broker, symbol, expiry = uncons_fqsn(pattern) + else: + symbol = pattern + # try: # # give the cache a go # return self._contracts[symbol] # except KeyError: # log.debug(f'Looking up contract for {symbol}') + expiry: str = '' + if symbol.count('.') > 1: + symbol, _, expiry = symbol.rpartition('.') # use heuristics to figure out contract "type" try: @@ -431,9 +505,27 @@ class Client: # likely there's an embedded `.` for a forex pair breakpoint() + qualify: bool = True + # futes - if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): - con = await self.get_cont_fute(symbol=sym, exchange=exch) + if exch in _futes_venues: + if expiry: + # get the "front" contract + contract = await self.get_fute( + symbol=sym, + exchange=exch, + expiry=expiry, + ) + + else: + # get the "front" contract + contract = await self.get_fute( + symbol=sym, + exchange=exch, + front=True, + ) + + qualify = False elif exch in ('FOREX'): currency = '' @@ -473,12 +565,15 @@ class Client: ) try: exch = 'SMART' if not exch else exch - contract = (await self.ib.qualifyContractsAsync(con))[0] + if qualify: + contract = (await self.ib.qualifyContractsAsync(con))[0] + else: + assert contract except IndexError: raise ValueError(f"No contract could be found {con}") - self._contracts[symbol] = contract + self._contracts[pattern] = contract return contract async def get_head_time( @@ -828,8 +923,8 @@ async def load_aio_clients( accounts_found: dict[str, Client] = {} if ( - client and client.ib.isConnected() or - sockaddr in _scan_ignore + client and client.ib.isConnected() + or sockaddr in _scan_ignore ): continue @@ -1039,8 +1134,12 @@ async def open_aio_client_method_relay( # relay all method requests to ``asyncio``-side client and # deliver back results - while True: + while not to_trio._closed: msg = await from_trio.get() + if msg is None: + print('asyncio PROXY-RELAY SHUTDOWN') + break + meth_name, kwargs = msg meth = getattr(client, meth_name) @@ -1071,45 +1170,47 @@ async def open_client_proxy() -> MethodProxy: yield proxy - except RequestError as err: - code, msg = err.code, err.message + # terminate asyncio side task + await chan.send(None) - # TODO: retreive underlying ``ib_insync`` error? - if ( - code == 162 and ( - 'HMDS query returned no data' in msg - or 'No market data permissions for' in msg - ) - ): - # these cases should not cause a task crash - log.warning(msg) + except ( + RequestError, + BaseException, + )as err: + code = getattr(err, 'code', None) + if code: + msg = err.message + await tractor.breakpoint() + + # TODO: retreive underlying ``ib_insync`` error? + if ( + code == 162 and ( + 'HMDS query returned no data' in msg + or 'No market data permissions for' in msg + ) + or code == 200 + ): + # these cases should not cause a task crash + log.warning(msg) else: raise -# @acm -# async def get_client( -# **kwargs, +@acm +async def get_client( + **kwargs, -# ) -> Client: -# ''' -# Init the ``ib_insync`` client in another actor and return -# a method proxy to it. +) -> Client: + ''' + Init the ``ib_insync`` client in another actor and return + a method proxy to it. -# ''' -# async with ( -# maybe_spawn_brokerd( -# brokername='ib', -# infect_asyncio=True, -# **kwargs -# ) as portal, -# ): -# assert 0 - # TODO: the IPC via portal relay layer for when this current - # actor isn't in aio mode. - # open_client_proxy() as proxy, - # yield proxy + ''' + # TODO: the IPC via portal relay layer for when this current + # actor isn't in aio mode. + async with open_client_proxy() as proxy: + yield proxy # https://interactivebrokers.github.io/tws-api/tick_types.html @@ -1137,11 +1238,40 @@ tick_types = { } +# TODO: cython/mypyc/numba this! def normalize( ticker: Ticker, calc_price: bool = False ) -> dict: + + # should be real volume for this contract by default + calc_price = False + + # check for special contract types + con = ticker.contract + if type(con) in ( + ibis.Commodity, + ibis.Forex, + ): + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = con.secType + # no real volume on this tract + calc_price = True + + else: + suffix = con.primaryExchange + if not suffix: + suffix = con.exchange + + # append a `.` to the returned symbol + # key for derivatives that normally is the expiry + # date key. + expiry = con.lastTradeDateOrContractMonth + if expiry: + suffix += f'.{expiry}' + # convert named tuples to dicts so we send usable keys new_ticks = [] for tick in ticker.ticks: @@ -1170,6 +1300,12 @@ def normalize( # serialize for transport data = asdict(ticker) + # generate fqsn with possible specialized suffix + # for derivatives. + data['symbol'] = data['fqsn'] = '.'.join( + (con.symbol, suffix) + ).lower() + # convert named tuples to dicts for transport tbts = data.get('tickByTicks') if tbts: @@ -1191,7 +1327,7 @@ def normalize( async def get_bars( proxy: MethodProxy, - sym: str, + fqsn: str, end_dt: str = "", ) -> (dict, np.ndarray): @@ -1204,15 +1340,15 @@ async def get_bars( fails = 0 bars: Optional[list] = None - for _ in range(2): + for _ in range(3): try: bars, bars_array = await proxy.bars( - symbol=sym, + fqsn=fqsn, end_dt=end_dt, ) if bars_array is None: - raise SymbolNotFound(sym) + raise SymbolNotFound(fqsn) next_dt = bars[0].date print(f'ib datetime {next_dt}') @@ -1252,7 +1388,7 @@ async def get_bars( elif 'No market data permissions for' in err.message: # TODO: signalling for no permissions searches - raise NoData(f'Symbol: {sym}') + raise NoData(f'Symbol: {fqsn}') break else: @@ -1311,7 +1447,7 @@ async def open_history_client( async def backfill_bars( - sym: str, + fqsn: str, shm: ShmArray, # type: ignore # noqa # TODO: we want to avoid overrunning the underlying shm array buffer @@ -1331,34 +1467,34 @@ async def backfill_bars( https://github.com/pikers/piker/issues/128 ''' - # async with open_history_client(sym) as proxy: - async with open_client_proxy() as proxy: + with trio.CancelScope() as cs: - if platform.system() == 'Windows': - log.warning( - 'Decreasing history query count to 4 since, windows...') - count = 4 + # async with open_history_client(fqsn) as proxy: + async with open_client_proxy() as proxy: - out, fails = await get_bars(proxy, sym) + if platform.system() == 'Windows': + log.warning( + 'Decreasing history query count to 4 since, windows...') + count = 4 - if out is None: - raise RuntimeError("Could not pull currrent history?!") + out, fails = await get_bars(proxy, fqsn) - (first_bars, bars_array, next_dt) = out - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 + if out is None: + raise RuntimeError("Could not pull currrent history?!") - # write historical data to buffer - shm.push(bars_array) + (first_bars, bars_array, next_dt) = out + vlm = bars_array['volume'] + vlm[vlm < 0] = 0 - with trio.CancelScope() as cs: + # write historical data to buffer + shm.push(bars_array) task_status.started(cs) i = 0 while i < count: - out, fails = await get_bars(proxy, sym, end_dt=next_dt) + out, fails = await get_bars(proxy, fqsn, end_dt=next_dt) if fails is None or fails > 1: break @@ -1430,8 +1566,10 @@ async def _setup_quote_stream( contract: Optional[Contract] = None, ) -> trio.abc.ReceiveChannel: - """Stream a ticker using the std L1 api. - """ + ''' + Stream a ticker using the std L1 api. + + ''' global _quote_streams to_trio.send_nowait(None) @@ -1519,7 +1657,10 @@ async def open_aio_quote_stream( if from_aio: # if we already have a cached feed deliver a rx side clone to consumer - async with broadcast_receiver(from_aio) as from_aio: + async with broadcast_receiver( + from_aio, + 2**6, + ) as from_aio: yield from_aio return @@ -1555,17 +1696,13 @@ async def stream_quotes( ''' # TODO: support multiple subscriptions sym = symbols[0] + log.info(f'request for real-time quotes: {sym}') - contract, first_ticker, details = await _trio_run_client_method( + con, first_ticker, details = await _trio_run_client_method( method='get_sym_details', symbol=sym, ) - - with trio.move_on_after(1): - contract, first_ticker, details = await _trio_run_client_method( - method='get_quote', - symbol=sym, - ) + first_quote = normalize(first_ticker) def mk_init_msgs() -> dict[str, dict]: # pass back some symbol info like min_tick, trading_hours, etc. @@ -1593,46 +1730,23 @@ async def stream_quotes( # and that history has been written sym: { 'symbol_info': syminfo, + 'fqsn': first_quote['fqsn'], } } return init_msgs init_msgs = mk_init_msgs() - con = first_ticker.contract - - # should be real volume for this contract by default - calc_price = False - - # check for special contract types - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - - suffix = con.primaryExchange - if not suffix: - suffix = con.exchange - - else: - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = con.secType - # no real volume on this tract - calc_price = True - - quote = normalize(first_ticker, calc_price=calc_price) - con = quote['contract'] - topic = '.'.join((con['symbol'], suffix)).lower() - quote['symbol'] = topic - - # for compat with upcoming fqsn based derivs search - init_msgs[sym]['fqsn'] = topic - - # pass first quote asap - first_quote = quote + with trio.move_on_after(1): + contract, first_ticker, details = await _trio_run_client_method( + method='get_quote', + symbol=sym, + ) # it might be outside regular trading hours so see if we can at # least grab history. if isnan(first_ticker.last): - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, first_quote)) # it's not really live but this will unblock # the brokerd feed task to tell the ui to update? @@ -1643,30 +1757,32 @@ async def stream_quotes( return # we never expect feed to come up? async with open_aio_quote_stream( - symbol=sym, contract=contract + symbol=sym, + contract=con, ) as stream: # ugh, clear ticks since we've consumed them # (ahem, ib_insync is stateful trash) first_ticker.ticks = [] - log.debug(f"First ticker received {quote}") - - task_status.started((init_msgs, first_quote)) + task_status.started((init_msgs, first_quote)) async with aclosing(stream): - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - # suffix = 'exchange' - # calc_price = False # should be real volume for contract - + if type(first_ticker.contract) not in ( + ibis.Commodity, + ibis.Forex + ): # wait for real volume on feed (trading might be closed) while True: - ticker = await stream.receive() # for a real volume contract we rait for the first # "real" trade to take place - if not calc_price and not ticker.rtTime: + if ( + # not calc_price + # and not ticker.rtTime + not ticker.rtTime + ): # spin consuming tickers until we get a real # market datum log.debug(f"New unsent ticker: {ticker}") @@ -1681,21 +1797,16 @@ async def stream_quotes( # ``aclosing()`` above? break + quote = normalize(ticker) + log.debug(f"First ticker received {quote}") + # tell caller quotes are now coming in live feed_is_live.set() # last = time.time() async for ticker in stream: - # print(f'ticker rate: {1/(time.time() - last)}') - - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price - ) - - quote['symbol'] = topic - await send_chan.send({topic: quote}) + quote = normalize(ticker) + await send_chan.send({quote['fqsn']: quote}) # ugh, clear ticks since we've consumed them ticker.ticks = [] @@ -1713,11 +1824,11 @@ def pack_position( symbol = con.localSymbol.replace(' ', '') else: + # TODO: lookup fqsn even for derivs. symbol = con.symbol.lower() exch = (con.primaryExchange or con.exchange).lower() symkey = '.'.join((symbol, exch)) - if not exch: # attempt to lookup the symbol from our # hacked set.. @@ -1726,7 +1837,11 @@ def pack_position( symkey = sym break - # TODO: options contracts into a sane format.. + expiry = con.lastTradeDateOrContractMonth + if expiry: + symkey += f'.{expiry}' + + # TODO: options contracts into a sane format.. return BrokerdPosition( broker='ib', @@ -2105,7 +2220,7 @@ async def open_symbol_search( sn.start_soon( stash_results, _trio_run_client_method( - method='search_stocks', + method='search_symbols', pattern=pattern, upto=5, ) From 957686a9fe5bb39dfe4f21e761c9d701d2aa0b1c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 17:53:21 -0400 Subject: [PATCH 07/21] Comment exception debug in ib request error block --- piker/brokers/ib.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index ec163fbb..38d9f491 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1175,12 +1175,12 @@ async def open_client_proxy() -> MethodProxy: except ( RequestError, - BaseException, + # BaseException, )as err: code = getattr(err, 'code', None) if code: msg = err.message - await tractor.breakpoint() + # await tractor.breakpoint() # TODO: retreive underlying ``ib_insync`` error? if ( From 8395a1fcfeaefa713a149d53964dfbd795b798a3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 19 Mar 2022 14:27:41 -0400 Subject: [PATCH 08/21] IB: Comment on lowercase for the fqsn key --- piker/brokers/ib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 38d9f491..3d792a9f 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1301,7 +1301,7 @@ def normalize( data = asdict(ticker) # generate fqsn with possible specialized suffix - # for derivatives. + # for derivatives, note the lowercase. data['symbol'] = data['fqsn'] = '.'.join( (con.symbol, suffix) ).lower() From 3e125625b1691321b915fafeea545be218717d40 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 22 Mar 2022 13:14:22 -0400 Subject: [PATCH 09/21] Attempt to better handle history throttles using flag --- piker/brokers/ib.py | 40 ++++++++++++++++++++++++---------------- 1 file changed, 24 insertions(+), 16 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 3d792a9f..9a77f5fb 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1340,20 +1340,25 @@ async def get_bars( fails = 0 bars: Optional[list] = None - for _ in range(3): + async def get(): + + bars, bars_array = await proxy.bars( + fqsn=fqsn, + end_dt=end_dt, + ) + if bars_array is None: + raise SymbolNotFound(fqsn) + + next_dt = bars[0].date + log.info(f'ib datetime {next_dt}') + + return (bars, bars_array, next_dt), fails + + in_throttle: bool = False + + for _ in range(10): try: - - bars, bars_array = await proxy.bars( - fqsn=fqsn, - end_dt=end_dt, - ) - if bars_array is None: - raise SymbolNotFound(fqsn) - - next_dt = bars[0].date - print(f'ib datetime {next_dt}') - - return (bars, bars_array, next_dt), fails + return await get() except RequestError as err: _err = err @@ -1382,7 +1387,7 @@ async def get_bars( # try to decrement start point and look further back next_dt = bars[0].date - print(f'ib datetime {next_dt}') + log.info(f'ib datetime {next_dt}') continue elif 'No market data permissions for' in err.message: @@ -1401,7 +1406,10 @@ async def get_bars( # TODO: should probably create some alert on screen # and then somehow get that to trigger an event here # that restarts/resumes this task? - await tractor.breakpoint() + if not in_throttle: + await tractor.breakpoint() + + in_throttle = True fails += 1 continue @@ -1455,7 +1463,7 @@ async def backfill_bars( # on that until we have the `marketstore` daemon in place in which # case the shm size will be driven by user config and available sys # memory. - count: int = 16, + count: int = 22, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, From 62d073dc18247fb641e48bba084bf78cd7d51766 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Mar 2022 16:06:52 -0400 Subject: [PATCH 10/21] More IB repairs.. Make the throttle error propagate through to `trio` again by adding `dict`-msg support between the two loops such that errors can be re-raised on the `trio` side. This is all integrated into the `MethoProxy` and accompanying result relay task. Further fix a longer standing issue where sometimes the `ib_insync` order entry method will raise a weird assertion error because it detects some internal order-id state issue.. Just ignore those and make relay back an error to the ems in such cases. Add a bunch of notes for todos surrounding data feed reset hackery. --- piker/brokers/ib.py | 90 ++++++++++++++++++++++++++++++++++----------- 1 file changed, 69 insertions(+), 21 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 9a77f5fb..8fda1dec 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -673,23 +673,28 @@ class Client: # against non-known prices. raise RuntimeError("Can not order {symbol}, no live feed?") - trade = self.ib.placeOrder( - contract, - Order( - orderId=reqid or 0, # stupid api devs.. - action=action.upper(), # BUY/SELL - # lookup the literal account number by name here. - account=account, - orderType='LMT', - lmtPrice=price, - totalQuantity=size, - outsideRth=True, + try: + trade = self.ib.placeOrder( + contract, + Order( + orderId=reqid or 0, # stupid api devs.. + action=action.upper(), # BUY/SELL + # lookup the literal account number by name here. + account=account, + orderType='LMT', + lmtPrice=price, + totalQuantity=size, + outsideRth=True, - optOutSmartRouting=True, - routeMarketableToBbo=True, - designatedLocation='SMART', - ), - ) + optOutSmartRouting=True, + routeMarketableToBbo=True, + designatedLocation='SMART', + ), + ) + except AssertionError: # errrg insync.. + log.warning(f'order for {reqid} already complete?') + # will trigger an error in ems request handler task. + return None # ib doesn't support setting your own id outside # their own weird client int counting ids.. @@ -1116,7 +1121,25 @@ class MethodProxy: chan = self.chan # send through method + ``kwargs: dict`` as pair await chan.send((meth, kwargs)) - return await chan.receive() + msg = await chan.receive() + res = msg.get('result') + if res: + return res + + err = msg.get('error') + if not err: + raise ValueError(f'Received unexpected asyncio msg {msg}') + + raise err + + async def wait_for_data_reset(self) -> None: + ''' + Send hacker hot keys to ib program and wait + for the event that declares the data feeds to be + back up before unblocking. + + ''' + ... async def open_aio_client_method_relay( @@ -1132,6 +1155,9 @@ async def open_aio_client_method_relay( ): to_trio.send_nowait(client) + # TODO: separate channel for error handling? + # client.inline_errors(to_trio) + # relay all method requests to ``asyncio``-side client and # deliver back results while not to_trio._closed: @@ -1143,10 +1169,18 @@ async def open_aio_client_method_relay( meth_name, kwargs = msg meth = getattr(client, meth_name) - resp = await meth(**kwargs) + try: + resp = await meth(**kwargs) + # echo the msg back + to_trio.send_nowait({'result': resp}) - # echo the msg back - to_trio.send_nowait(resp) + except ( + RequestError, + + # TODO: relay all errors to trio? + # BaseException, + ) as err: + to_trio.send_nowait({'error': err}) @acm @@ -1361,6 +1395,7 @@ async def get_bars( return await get() except RequestError as err: + # why do we always need to rebind this? _err = err # TODO: retreive underlying ``ib_insync`` error? @@ -1409,6 +1444,10 @@ async def get_bars( if not in_throttle: await tractor.breakpoint() + # TODO: wait on data con reset event + # then begin backfilling again. + # await proxy.wait_for_data() + in_throttle = True fails += 1 continue @@ -1463,6 +1502,7 @@ async def backfill_bars( # on that until we have the `marketstore` daemon in place in which # case the shm size will be driven by user config and available sys # memory. + # count: int = 120, count: int = 22, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -1745,7 +1785,9 @@ async def stream_quotes( init_msgs = mk_init_msgs() - with trio.move_on_after(1): + # TODO: we should instead spawn a task that waits on a feed to start + # and let it wait indefinitely..instead of this hard coded stuff. + with trio.move_on_after(6): contract, first_ticker, details = await _trio_run_client_method( method='get_quote', symbol=sym, @@ -1922,6 +1964,12 @@ async def handle_order_requests( # counter - collision prone..) reqid=order.reqid, ) + if reqid is None: + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason='Order already active?', + ).dict()) # deliver ack that order has been submitted to broker routing await ems_order_stream.send( From 874374af06a8901a1dafd6b4b97794ee0800a406 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 29 Mar 2022 10:36:40 -0400 Subject: [PATCH 11/21] Drop `pandas` use in ib backend for history Found an issue (that was predictably brushed aside XD) where the `ib_insync.util.df()` helper was changing the timestamps on bars data to be way off (probably a `pandas.Timestamp` timezone thing?). Anyway, dropped all that (which will hopefully let us drop `pandas` as a hard dep) and added a buncha timestamp checking as well as start/end datetime return values using `pendulum` so that consumer code can know which "slice" is output. Also added some WIP code to work around "no history found" request errors where instead now we try to increment backward another 200 seconds - not sure if this actually correct yet. --- piker/brokers/ib.py | 186 +++++++++++++++++++++++++------------------- 1 file changed, 105 insertions(+), 81 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 8fda1dec..cb525a13 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -23,7 +23,7 @@ built on it) and thus actor aware API calls must be spawned with """ from contextlib import asynccontextmanager as acm -from dataclasses import asdict +from dataclasses import asdict, astuple from datetime import datetime from functools import partial import itertools @@ -41,6 +41,7 @@ import platform from random import randint import time + import trio from trio_typing import TaskStatus import tractor @@ -60,7 +61,7 @@ import numpy as np from .. import config from ..log import get_logger, get_console_log -from ..data._source import from_df +from ..data._source import base_ohlc_dtype from ..data._sharedmem import ShmArray from ._util import SymbolNotFound, NoData from ..clearing._messages import ( @@ -229,6 +230,28 @@ _exch_skip_list = { _enters = 0 +def bars_to_np(bars: list) -> np.ndarray: + ''' + Convert a "bars list thing" (``BarsList`` type from ibis) + into a numpy struct array. + + ''' + # TODO: maybe rewrite this faster with ``numba`` + np_ready = [] + for bardata in bars: + ts = bardata.date.timestamp() + t = astuple(bardata)[:7] + np_ready.append((ts, ) + t[1:7]) + + nparr = np.array( + np_ready, + dtype=base_ohlc_dtype, + ) + assert nparr['time'][0] == bars[0].date.timestamp() + assert nparr['time'][-1] == bars[-1].date.timestamp() + return nparr + + class Client: ''' IB wrapped for our broker backend API. @@ -255,6 +278,7 @@ class Client: async def bars( self, fqsn: str, + # EST in ISO 8601 format is required... below is EPOCH start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", end_dt: Union[datetime, str] = "", @@ -262,7 +286,6 @@ class Client: sample_period_s: str = 1, # ohlc sample period period_count: int = int(2e3), # <- max per 1s sample query - is_paid_feed: bool = False, # placeholder ) -> list[dict[str, Any]]: ''' Retreive OHLCV bars for a fqsn over a range to the present. @@ -313,10 +336,8 @@ class Client: # TODO: raise underlying error here raise ValueError(f"No bars retreived for {fqsn}?") - # TODO: rewrite this faster with ``numba`` - # convert to pandas dataframe: - df = ibis.util.df(bars) - return bars, from_df(df) + nparr = bars_to_np(bars) + return bars, nparr async def con_deats( self, @@ -1214,7 +1235,6 @@ async def open_client_proxy() -> MethodProxy: code = getattr(err, 'code', None) if code: msg = err.message - # await tractor.breakpoint() # TODO: retreive underlying ``ib_insync`` error? if ( @@ -1362,7 +1382,9 @@ async def get_bars( proxy: MethodProxy, fqsn: str, - end_dt: str = "", + + # blank to start which tells ib to look up the latest datum + end_dt: str = '', ) -> (dict, np.ndarray): ''' @@ -1370,87 +1392,83 @@ async def get_bars( a ``MethoProxy``. ''' - _err: Optional[Exception] = None + import pendulum + fails = 0 bars: Optional[list] = None - - async def get(): - - bars, bars_array = await proxy.bars( - fqsn=fqsn, - end_dt=end_dt, - ) - if bars_array is None: - raise SymbolNotFound(fqsn) - - next_dt = bars[0].date - log.info(f'ib datetime {next_dt}') - - return (bars, bars_array, next_dt), fails - in_throttle: bool = False + first_dt: datetime = None + last_dt: datetime = None + + if end_dt: + last_dt = pendulum.from_timestamp(end_dt.timestamp()) for _ in range(10): try: - return await get() + bars, bars_array = await proxy.bars( + fqsn=fqsn, + end_dt=end_dt, + ) + + if bars_array is None: + raise SymbolNotFound(fqsn) + + first_dt = pendulum.from_timestamp( + bars[0].date.timestamp()) + + last_dt = pendulum.from_timestamp( + bars[-1].date.timestamp()) + + time = bars_array['time'] + assert time[-1] == last_dt.timestamp() + assert time[0] == first_dt.timestamp() + log.info(f'bars retreived for dts {first_dt}:{last_dt}') + + return (bars, bars_array, first_dt, last_dt), fails except RequestError as err: + msg = err.message # why do we always need to rebind this? - _err = err + # _err = err - # TODO: retreive underlying ``ib_insync`` error? - if err.code == 162: + if 'No market data permissions for' in msg: + # TODO: signalling for no permissions searches + raise NoData(f'Symbol: {fqsn}') + break - # TODO: so this error is normally raised (it seems) if - # we try to retrieve history for a time range for which - # there is none. in that case we should not only report - # the "empty range" but also do a iteration on the time - # step for ``next_dt`` to see if we can pull older - # history. - if 'HMDS query returned no data' in err.message: - # means we hit some kind of historical "empty space" - # and further requests will need to decrement the - # start time dt in order to not receive a further - # error? - # OLDER: seem to always cause throttling despite low rps + elif ( + err.code == 162 + and 'HMDS query returned no data' in err.message + ): + # try to decrement start point and look further back + end_dt = last_dt = last_dt.subtract(seconds=2000) + log.warning( + f'No data found ending @ {end_dt}\n' + f'Starting another request for {end_dt}' + ) - # TODO: if there is not bars returned from the first - # query we need to manually calculate the next step - # back and convert to an expected datetime format. - # if not bars: - # raise + continue - # try to decrement start point and look further back - next_dt = bars[0].date - log.info(f'ib datetime {next_dt}') - continue + else: + log.exception( + "Data query rate reached: Press `ctrl-alt-f`" + "in TWS" + ) - elif 'No market data permissions for' in err.message: + # TODO: should probably create some alert on screen + # and then somehow get that to trigger an event here + # that restarts/resumes this task? + if not in_throttle: + await tractor.breakpoint() - # TODO: signalling for no permissions searches - raise NoData(f'Symbol: {fqsn}') - break + # TODO: wait on data con reset event + # then begin backfilling again. + # await proxy.wait_for_data() - else: - log.exception( - "Data query rate reached: Press `ctrl-alt-f`" - "in TWS" - ) - print(_err) + in_throttle = True + fails += 1 + continue - # TODO: should probably create some alert on screen - # and then somehow get that to trigger an event here - # that restarts/resumes this task? - if not in_throttle: - await tractor.breakpoint() - - # TODO: wait on data con reset event - # then begin backfilling again. - # await proxy.wait_for_data() - - in_throttle = True - fails += 1 - continue return None, None # else: # throttle wasn't fixed so error out immediately @@ -1480,14 +1498,14 @@ async def open_history_client( log.error(f"Can't grab bars starting at {end_dt}!?!?") raise NoData(f'{end_dt}') - bars, bars_array, next_dt = out + bars, bars_array, first_dt, last_dt = out # volume cleaning since there's -ve entries, # wood luv to know what crookery that is.. vlm = bars_array['volume'] vlm[vlm < 0] = 0 - return bars_array, next_dt + return bars_array, first_dt, last_dt yield get_hist @@ -1503,7 +1521,7 @@ async def backfill_bars( # case the shm size will be driven by user config and available sys # memory. # count: int = 120, - count: int = 22, + count: int = 36, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -1515,6 +1533,9 @@ async def backfill_bars( https://github.com/pikers/piker/issues/128 ''' + # last_dt1 = None + last_dt = None + with trio.CancelScope() as cs: # async with open_history_client(fqsn) as proxy: @@ -1530,9 +1551,10 @@ async def backfill_bars( if out is None: raise RuntimeError("Could not pull currrent history?!") - (first_bars, bars_array, next_dt) = out + (first_bars, bars_array, first_dt, last_dt) = out vlm = bars_array['volume'] vlm[vlm < 0] = 0 + last_dt = first_dt # write historical data to buffer shm.push(bars_array) @@ -1542,7 +1564,7 @@ async def backfill_bars( i = 0 while i < count: - out, fails = await get_bars(proxy, fqsn, end_dt=next_dt) + out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) if fails is None or fails > 1: break @@ -1551,10 +1573,12 @@ async def backfill_bars( # could be trying to retreive bars over weekend # TODO: add logic here to handle tradable hours and # only grab valid bars in the range - log.error(f"Can't grab bars starting at {next_dt}!?!?") + log.error(f"Can't grab bars starting at {first_dt}!?!?") continue - bars, bars_array, next_dt = out + (first_bars, bars_array, first_dt, last_dt) = out + # last_dt1 = last_dt + # last_dt = first_dt # volume cleaning since there's -ve entries, # wood luv to know what crookery that is.. @@ -1787,7 +1811,7 @@ async def stream_quotes( # TODO: we should instead spawn a task that waits on a feed to start # and let it wait indefinitely..instead of this hard coded stuff. - with trio.move_on_after(6): + with trio.move_on_after(1): contract, first_ticker, details = await _trio_run_client_method( method='get_quote', symbol=sym, From b579d4b1f59fb97861a6d9c0a397ac0c0be2173a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 30 Mar 2022 13:49:19 -0400 Subject: [PATCH 12/21] Get ib data feed hackzorz workin ib has a throttle limit for "hft" bars but contained in here is some hackery using ``xdotool`` to reset data farms auto-magically B) This copies the working script into the ib backend mod as a routine and now uses `trio.run_process()` and calls into it from the `get_bars()` history retriever and then waits for "data re-established" events to be received from the client before making more history queries. TL;DR summary of changes: - relay ib's "system status" events (like for data farm statuses) as a new "event" msg that can be processed by registers of `Client.inline_errors()` (though we should probably make a new method for this). - add `MethodProxy.status_event()` which allows a proxy user to register for a particular "system event" (as mentioned above), which puts a `trio.Event` entry in a small table can be set by an relay task if there are any detected waiters. - start a "msg relay task" when opening the method proxy which does the event setting mentioned above in the background. - drop the request error handling around the proxy creation, doesn't seem necessary any more now that we have better error propagation from `asyncio`. - add event waiting logic around the data feed reset hackzorin. - change the order relay task to only log system events for now (though we need to do some better parsing/logic to get tws-external order updates to work again.. --- piker/brokers/ib.py | 333 +++++++++++++++++++++++++++++++++----------- 1 file changed, 255 insertions(+), 78 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index cb525a13..65194ff5 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -294,7 +294,8 @@ class Client: bars_kwargs = {'whatToShow': 'TRADES'} global _enters - print(f'ENTER BARS {_enters} @ end={end_dt}') + # log.info(f'REQUESTING BARS {_enters} @ end={end_dt}') + print(f'REQUESTING BARS {_enters} @ end={end_dt}') _enters += 1 contract = await self.find_contract(fqsn) @@ -304,6 +305,7 @@ class Client: bars = await self.ib.reqHistoricalDataAsync( contract, endDateTime=end_dt, + formatDate=2, # time history length values format: # ``durationStr=integer{SPACE}unit (S|D|W|M|Y)`` @@ -738,25 +740,38 @@ class Client: def inline_errors( self, to_trio: trio.abc.SendChannel, - ) -> None: - # connect error msgs + ) -> None: + ''' + Setup error relay to the provided ``trio`` mem chan such that + trio tasks can retreive and parse ``asyncio``-side API request + errors. + + ''' def push_err( reqId: int, errorCode: int, errorString: str, contract: Contract, + ) -> None: log.error(errorString) + reason = errorString + + if reqId == -1: + # it's a general event? + key = 'event' + else: + key = 'error' try: to_trio.send_nowait(( - 'error', + key, # error "object" {'reqid': reqId, - 'reason': errorString, + 'reason': reason, 'contract': contract} )) except trio.BrokenResourceError: @@ -1123,9 +1138,11 @@ class MethodProxy: def __init__( self, chan: to_asyncio.LinkedTaskChannel, + event_table: dict[str, trio.Event], ) -> None: self.chan = chan + self.event_table = event_table async def _run_method( self, @@ -1140,18 +1157,43 @@ class MethodProxy: ''' chan = self.chan - # send through method + ``kwargs: dict`` as pair await chan.send((meth, kwargs)) - msg = await chan.receive() - res = msg.get('result') - if res: - return res - err = msg.get('error') - if not err: - raise ValueError(f'Received unexpected asyncio msg {msg}') + while not chan.closed(): + # send through method + ``kwargs: dict`` as pair + msg = await chan.receive() + # print(f'NEXT MSG: {msg}') - raise err + # TODO: py3.10 ``match:`` syntax B) + if 'result' in msg: + res = msg.get('result') + return res + + elif 'exception' in msg: + err = msg.get('exception') + raise err + + elif 'error' in msg: + etype, emsg = msg + log.warning(f'IB error relay: {emsg}') + continue + + else: + log.warning(f'UNKNOWN IB MSG: {msg}') + + def status_event( + self, + pattern: str, + + ) -> Union[dict[str, Any], trio.Event]: + + ev = self.event_table.get(pattern) + + if not ev or ev.is_set(): + # print(f'inserting new data reset event item') + ev = self.event_table[pattern] = trio.Event() + + return ev async def wait_for_data_reset(self) -> None: ''' @@ -1166,6 +1208,7 @@ class MethodProxy: async def open_aio_client_method_relay( from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, + event_consumers: dict[str, trio.Event], ) -> None: @@ -1177,7 +1220,7 @@ async def open_aio_client_method_relay( to_trio.send_nowait(client) # TODO: separate channel for error handling? - # client.inline_errors(to_trio) + client.inline_errors(to_trio) # relay all method requests to ``asyncio``-side client and # deliver back results @@ -1188,8 +1231,8 @@ async def open_aio_client_method_relay( break meth_name, kwargs = msg - meth = getattr(client, meth_name) + try: resp = await meth(**kwargs) # echo the msg back @@ -1201,54 +1244,61 @@ async def open_aio_client_method_relay( # TODO: relay all errors to trio? # BaseException, ) as err: - to_trio.send_nowait({'error': err}) + to_trio.send_nowait({'exception': err}) @acm async def open_client_proxy() -> MethodProxy: - try: - async with to_asyncio.open_channel_from( + # try: + event_table = {} + + async with ( + to_asyncio.open_channel_from( open_aio_client_method_relay, - ) as (first, chan): + event_consumers=event_table, + ) as (first, chan), + trio.open_nursery() as relay_n, + ): - assert isinstance(first, Client) - proxy = MethodProxy(chan) + assert isinstance(first, Client) + proxy = MethodProxy(chan, event_table) + + # mock all remote methods on ib ``Client``. + for name, method in inspect.getmembers( + Client, predicate=inspect.isfunction + ): + if '_' == name[0]: + continue + setattr(proxy, name, partial(proxy._run_method, meth=name)) + + async def relay_events(): + + async with chan.subscribe() as msg_stream: + + async for msg in msg_stream: + if 'event' not in msg: + continue + + # if 'event' in msg: + # wake up any system event waiters. + etype, status_msg = msg + reason = status_msg['reason'] + + ev = proxy.event_table.pop(reason, None) + + if ev and ev.statistics().tasks_waiting: + log.info(f'Relaying ib status message: {msg}') + ev.set() - # mock all remote methods on ib ``Client``. - for name, method in inspect.getmembers( - Client, predicate=inspect.isfunction - ): - if '_' == name[0]: continue - setattr(proxy, name, partial(proxy._run_method, meth=name)) - yield proxy + relay_n.start_soon(relay_events) - # terminate asyncio side task - await chan.send(None) + yield proxy - except ( - RequestError, - # BaseException, - )as err: - code = getattr(err, 'code', None) - if code: - msg = err.message - - # TODO: retreive underlying ``ib_insync`` error? - if ( - code == 162 and ( - 'HMDS query returned no data' in msg - or 'No market data permissions for' in msg - ) - or code == 200 - ): - # these cases should not cause a task crash - log.warning(msg) - - else: - raise + # terminate asyncio side task + await chan.send(None) @acm @@ -1378,6 +1428,12 @@ def normalize( return data +_pacing: str = ( + 'Historical Market Data Service error ' + 'message:Historical data request pacing violation' +) + + async def get_bars( proxy: MethodProxy, @@ -1396,14 +1452,13 @@ async def get_bars( fails = 0 bars: Optional[list] = None - in_throttle: bool = False first_dt: datetime = None last_dt: datetime = None if end_dt: last_dt = pendulum.from_timestamp(end_dt.timestamp()) - for _ in range(10): + for _ in range(2): try: bars, bars_array = await proxy.bars( fqsn=fqsn, @@ -1449,26 +1504,43 @@ async def get_bars( continue - else: - log.exception( - "Data query rate reached: Press `ctrl-alt-f`" - "in TWS" + elif _pacing in msg: + + log.warning( + 'History throttle rate reached!\n' + 'Resetting farms with `ctrl-alt-f` hack\n' ) + # TODO: we might have to put a task lock around this + # method.. + hist_ev = proxy.status_event( + 'HMDS data farm connection is OK:ushmds' + ) + # live_ev = proxy.status_event( + # # 'Market data farm connection is OK:usfuture' + # 'Market data farm connection is OK:usfarm' + # ) + # TODO: some kinda resp here that indicates success + # otherwise retry? + await data_reset_hack() - # TODO: should probably create some alert on screen - # and then somehow get that to trigger an event here - # that restarts/resumes this task? - if not in_throttle: - await tractor.breakpoint() + # TODO: a while loop here if we timeout? + for name, ev in [ + ('history', hist_ev), + # ('live', live_ev), + ]: + with trio.move_on_after(22) as cs: + await ev.wait() + log.info(f"{name} DATA RESET") - # TODO: wait on data con reset event - # then begin backfilling again. - # await proxy.wait_for_data() + if cs.cancelled_caught: + log.warning("reset hack failed on first try?") + # await tractor.breakpoint() - in_throttle = True fails += 1 continue + else: + raise return None, None # else: # throttle wasn't fixed so error out immediately @@ -1520,8 +1592,7 @@ async def backfill_bars( # on that until we have the `marketstore` daemon in place in which # case the shm size will be driven by user config and available sys # memory. - # count: int = 120, - count: int = 36, + count: int = 65, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -1566,9 +1637,6 @@ async def backfill_bars( out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) - if fails is None or fails > 1: - break - if out == (None, None): # could be trying to retreive bars over weekend # TODO: add logic here to handle tradable hours and @@ -2222,18 +2290,26 @@ async def deliver_trade_events( msg = pack_position(item) msg.account = accounts_def.inverse[msg.account] - if getattr(msg, 'reqid', 0) < -1: + elif event_name == 'event': - # it's a trade event generated by TWS usage. - log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + # it's either a general system status event or an external + # trade event? + log.info(f"TWS system status: \n{pformat(item)}") - msg.reqid = 'tws-' + str(-1 * msg.reqid) + # TODO: support this again but needs parsing at the callback + # level... + # reqid = item.get('reqid', 0) + # if getattr(msg, 'reqid', 0) < -1: + # log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + + continue + + # msg.reqid = 'tws-' + str(-1 * reqid) # mark msg as from "external system" # TODO: probably something better then this.. and start # considering multiplayer/group trades tracking - msg.broker_details['external_src'] = 'tws' - continue + # msg.broker_details['external_src'] = 'tws' # XXX: we always serialize to a dict for msgpack # translations, ideally we can move to an msgspec (or other) @@ -2336,3 +2412,104 @@ async def open_symbol_search( log.debug(f"sending matches: {matches.keys()}") await stream.send(matches) + + +async def data_reset_hack( + reset_type: str = 'data', + +) -> None: + ''' + Run key combos for resetting data feeds and yield back to caller + when complete. + + This is a linux-only hack around: + + https://interactivebrokers.github.io/tws-api/historical_limitations.html#pacing_violations + + TODOs: + - a return type that hopefully determines if the hack was + successful. + - other OS support? + - integration with ``ib-gw`` run in docker + Xorg? + + ''' + # TODO: try out this lib instead, seems to be the most modern + # and usess the underlying lib: + # https://github.com/rshk/python-libxdo + + # TODO: seems to be a few libs for python but not sure + # if they support all the sub commands we need, order of + # most recent commit history: + # https://github.com/rr-/pyxdotool + # https://github.com/ShaneHutter/pyxdotool + # https://github.com/cphyc/pyxdotool + + try: + import i3ipc + except ImportError: + return False + log.warning('IB data hack no-supported on ur platformz') + + i3 = i3ipc.Connection() + t = i3.get_tree() + + orig_win_id = t.find_focused().window + + # for tws + win_names: list[str] = [ + 'Interactive Brokers', # tws running in i3 + 'IB Gateway', # gw running in i3 + # 'IB', # gw running in i3 (newer version?) + ] + + combos: dict[str, str] = { + # only required if we need a connection reset. + 'connection': ('ctrl+alt+r', 12), + + # data feed reset. + 'data': ('ctrl+alt+f', 6) + } + + for name in win_names: + results = t.find_titled(name) + print(f'results for {name}: {results}') + if results: + con = results[0] + print(f'Resetting data feed for {name}') + win_id = str(con.window) + w, h = con.rect.width, con.rect.height + + # TODO: only run the reconnect (2nd) kc on a detected + # disconnect? + key_combo, timeout = combos[reset_type] + # for key_combo, timeout in [ + # # only required if we need a connection reset. + # # ('ctrl+alt+r', 12), + # # data feed reset. + # ('ctrl+alt+f', 6) + # ]: + await trio.run_process([ + 'xdotool', + 'windowactivate', '--sync', win_id, + + # move mouse to bottom left of window (where there should + # be nothing to click). + 'mousemove_relative', '--sync', str(w-4), str(h-4), + + # NOTE: we may need to stick a `--retry 3` in here.. + 'click', '--window', win_id, + '--repeat', '3', '1', + + # hackzorzes + 'key', key_combo, + # ], + # timeout=timeout, + ]) + + # re-activate and focus original window + await trio.run_process([ + 'xdotool', + 'windowactivate', '--sync', str(orig_win_id), + 'click', '--window', str(orig_win_id), '1', + ]) + return True From 66ea74c6d52dd368b62e6b4d50cbdcae1d938ef4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Apr 2022 13:49:17 -0400 Subject: [PATCH 13/21] Put back more bars iters in loop to handle no-data in range cases --- piker/brokers/ib.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 65194ff5..b05e77a3 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1458,7 +1458,7 @@ async def get_bars( if end_dt: last_dt = pendulum.from_timestamp(end_dt.timestamp()) - for _ in range(2): + for _ in range(10): try: bars, bars_array = await proxy.bars( fqsn=fqsn, @@ -1592,7 +1592,7 @@ async def backfill_bars( # on that until we have the `marketstore` daemon in place in which # case the shm size will be driven by user config and available sys # memory. - count: int = 65, + count: int = 59, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -1637,11 +1637,14 @@ async def backfill_bars( out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) - if out == (None, None): + if out == None: # could be trying to retreive bars over weekend # TODO: add logic here to handle tradable hours and # only grab valid bars in the range log.error(f"Can't grab bars starting at {first_dt}!?!?") + + # XXX: get_bars() should internally decrement dt by + # 2k seconds and try again. continue (first_bars, bars_array, first_dt, last_dt) = out From 24fa1b8ff786534e1a6091db7b8c26b930730b8f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Mar 2022 07:25:45 -0500 Subject: [PATCH 14/21] Support an array field map to `ShmArray.push()`, start index 3days in --- piker/data/_sharedmem.py | 55 ++++++++++++++++++++++++++++++++-------- 1 file changed, 45 insertions(+), 10 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 103fa6d5..2ffa8465 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -152,7 +152,8 @@ def _make_token( class ShmArray: - """A shared memory ``numpy`` (compatible) array API. + ''' + A shared memory ``numpy`` (compatible) array API. An underlying shared memory buffer is allocated based on a user specified ``numpy.ndarray``. This fixed size array @@ -162,7 +163,7 @@ class ShmArray: ``SharedInt`` interfaces) values such that multiple processes can interact with the same array using a synchronized-index. - """ + ''' def __init__( self, shmarr: np.ndarray, @@ -209,7 +210,8 @@ class ShmArray: @property def array(self) -> np.ndarray: - '''Return an up-to-date ``np.ndarray`` view of the + ''' + Return an up-to-date ``np.ndarray`` view of the so-far-written data to the underlying shm buffer. ''' @@ -238,19 +240,21 @@ class ShmArray: self, data: np.ndarray, + field_map: Optional[dict[str, str]] = None, prepend: bool = False, start: Optional[int] = None, ) -> int: - '''Ring buffer like "push" to append data + ''' + Ring buffer like "push" to append data into the buffer and return updated "last" index. NB: no actual ring logic yet to give a "loop around" on overflow condition, lel. + ''' - self._post_init = True length = len(data) - index = start or self._last.value + index = start if start is not None else self._last.value if prepend: index = self._first.value - length @@ -263,10 +267,15 @@ class ShmArray: end = index + length - fields = self._write_fields + if field_map: + src_names, dst_names = zip(*field_map.items()) + else: + dst_names = src_names = self._write_fields try: - self._array[fields][index:end] = data[fields][:] + self._array[ + list(dst_names) + ][index:end] = data[list(src_names)][:] # NOTE: there was a race here between updating # the first and last indices and when the next reader @@ -281,9 +290,18 @@ class ShmArray: else: self._last.value = end + self._post_init = True return end except ValueError as err: + if field_map: + raise + # dsize = data.size + # if dsize > self._len: + # raise ValueError( + # f'Input data is size {dsize} > our shm buffer {self._len}' + # ) + # should raise if diff detected self.diff_err_fields(data) raise err @@ -339,7 +357,7 @@ class ShmArray: # how much is probably dependent on lifestyle _secs_in_day = int(60 * 60 * 24) # we try for 3 times but only on a run-every-other-day kinda week. -_default_size = 3 * _secs_in_day +_default_size = 4 * _secs_in_day def open_shm_array( @@ -392,7 +410,24 @@ def open_shm_array( ) ) - last.value = first.value = int(_secs_in_day) + # start the "real-time" updated section after 3-days worth of 1s + # sampled OHLC. this allows appending up to a days worth from + # tick/quote feeds before having to flush to a (tsdb) storage + # backend, and looks something like, + # ------------------------- + # | | i + # _________________________ + # <-------------> <-------> + # history real-time + # + # Once fully "prepended", the history section will leave the + # ``ShmArray._start.value: int = 0`` and the yet-to-be written + # real-time section will start at ``ShmArray.index: int``. + + # this sets the index to 3/4 of the length of the buffer + # leaving a "days worth of second samples" for the real-time + # section. + last.value = first.value = int(3*_secs_in_day) shmarr = ShmArray( array, From 0178fcd26f17c43dc294dd165b6e5485018d2d2d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 1 Apr 2022 13:45:33 -0400 Subject: [PATCH 15/21] Increase shm size to days of 1s steps --- piker/data/_sharedmem.py | 12 +++--------- 1 file changed, 3 insertions(+), 9 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 2ffa8465..771cdd18 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -19,7 +19,6 @@ NumPy compatible shared memory buffers for real-time IPC streaming. """ from __future__ import annotations -from dataclasses import dataclass, asdict from sys import byteorder from typing import Optional from multiprocessing.shared_memory import SharedMemory, _USE_POSIX @@ -30,7 +29,7 @@ if _USE_POSIX: import tractor import numpy as np -from pydantic import BaseModel, validator +from pydantic import BaseModel from ..log import get_logger from ._source import base_iohlc_dtype @@ -296,11 +295,6 @@ class ShmArray: except ValueError as err: if field_map: raise - # dsize = data.size - # if dsize > self._len: - # raise ValueError( - # f'Input data is size {dsize} > our shm buffer {self._len}' - # ) # should raise if diff detected self.diff_err_fields(data) @@ -357,7 +351,7 @@ class ShmArray: # how much is probably dependent on lifestyle _secs_in_day = int(60 * 60 * 24) # we try for 3 times but only on a run-every-other-day kinda week. -_default_size = 4 * _secs_in_day +_default_size = 6 * _secs_in_day def open_shm_array( @@ -427,7 +421,7 @@ def open_shm_array( # this sets the index to 3/4 of the length of the buffer # leaving a "days worth of second samples" for the real-time # section. - last.value = first.value = int(3*_secs_in_day) + last.value = first.value = int(5*_secs_in_day) shmarr = ShmArray( array, From 00a7f2029275f33a3eba7bd34d6c3d3df98fd008 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 5 Apr 2022 09:23:54 -0400 Subject: [PATCH 16/21] Up the shm size to 10d of 1s ohlc --- piker/data/_sharedmem.py | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 771cdd18..74707c6f 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -38,6 +38,14 @@ from ._source import base_iohlc_dtype log = get_logger(__name__) +# how much is probably dependent on lifestyle +_secs_in_day = int(60 * 60 * 24) +# we try for 3 times but only on a run-every-other-day kinda week. +_default_size = 10 * _secs_in_day +# where to start the new data append index +_rt_buffer_start = int(9*_secs_in_day) + + # Tell the "resource tracker" thing to fuck off. class ManTracker(mantracker.ResourceTracker): def register(self, name, rtype): @@ -348,12 +356,6 @@ class ShmArray: ... -# how much is probably dependent on lifestyle -_secs_in_day = int(60 * 60 * 24) -# we try for 3 times but only on a run-every-other-day kinda week. -_default_size = 6 * _secs_in_day - - def open_shm_array( key: Optional[str] = None, @@ -421,7 +423,7 @@ def open_shm_array( # this sets the index to 3/4 of the length of the buffer # leaving a "days worth of second samples" for the real-time # section. - last.value = first.value = int(5*_secs_in_day) + last.value = first.value = _rt_buffer_start shmarr = ShmArray( array, From d1f45b0883d0fa83d97aecac6cb9a69874c44d2f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 7 Apr 2022 14:20:40 -0400 Subject: [PATCH 17/21] Add `ShmArray.last()` docstr --- piker/data/_sharedmem.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 74707c6f..6bc69eb4 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -240,7 +240,13 @@ class ShmArray: def last( self, length: int = 1, + ) -> np.ndarray: + ''' + Return the last ``length``'s worth of ("row") entries from the + array. + + ''' return self.array[-length:] def push( From 80d70216f7c039aa3a0991e5809119db937cafe7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Apr 2022 13:53:38 -0400 Subject: [PATCH 18/21] Drop back down ohlc bars request count to not trigger feed hack --- piker/brokers/ib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index b05e77a3..4a24203b 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1592,7 +1592,7 @@ async def backfill_bars( # on that until we have the `marketstore` daemon in place in which # case the shm size will be driven by user config and available sys # memory. - count: int = 59, + count: int = 16, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, From 7586e20ab470e5bf27736b540e2c9f9af2b4c47d Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Apr 2022 11:05:25 -0400 Subject: [PATCH 19/21] Use new unpacker helper name --- piker/brokers/ib.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 4a24203b..1a230ec9 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -507,8 +507,8 @@ class Client: # inside our eventkit handler instead to bypass this entirely? if 'ib' in pattern: - from ..data._source import uncons_fqsn - broker, symbol, expiry = uncons_fqsn(pattern) + from ..data._source import unpack_fqsn + broker, symbol, expiry = unpack_fqsn(pattern) else: symbol = pattern From 8b1c521ae90af38035fe03271cea589e7855d52c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 8 Apr 2022 14:59:53 -0400 Subject: [PATCH 20/21] Ignore symbol-not-found errors --- piker/brokers/ib.py | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 1a230ec9..1d01907d 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -448,14 +448,17 @@ class Client: symbol=sym, exchange=exch, ) - possibles = await self.ib.qualifyContractsAsync(con) - for i, condict in enumerate(sorted( - map(asdict, possibles), - # sort by expiry - key=lambda con: con['lastTradeDateOrContractMonth'], - )): - expiry = condict['lastTradeDateOrContractMonth'] - results[f'{sym}.{exch}.{expiry}'] = condict + try: + possibles = await self.ib.qualifyContractsAsync(con) + for i, condict in enumerate(sorted( + map(asdict, possibles), + # sort by expiry + key=lambda con: con['lastTradeDateOrContractMonth'], + )): + expiry = condict['lastTradeDateOrContractMonth'] + results[f'{sym}.{exch}.{expiry}'] = condict + except RequestError as err: + log.warning(err.message) return results From 4d23f6e4d7132e274d403397c8922a8e10c2e1a8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 11 Apr 2022 14:54:16 -0400 Subject: [PATCH 21/21] Drop need for `ib_insync.IB.qualifyContractsAsync()' mod As per https://github.com/erdewit/ib_insync/pull/454 the more correct way to do this is with `.reqContractDetailsAsync()` which we wrap with `Client.con_deats()` and which works just as well. Further drop all the `dict`-ifying that was being done in that method and instead always return `ContractDetails` object in an fqsn-like explicitly keyed `dict`. --- piker/brokers/ib.py | 82 +++++++++++++++++++-------------------------- 1 file changed, 35 insertions(+), 47 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 1d01907d..51724e5a 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -355,28 +355,34 @@ class Client: # batch request all details results = await asyncio.gather(*futs) - # XXX: if there is more then one entry in the details list + # one set per future result details = {} for details_set in results: + # XXX: if there is more then one entry in the details list # then the contract is so called "ambiguous". for d in details_set: con = d.contract - unique_sym = f'{con.symbol}.{con.primaryExchange}' - as_dict = asdict(d) + key = '.'.join([ + con.symbol, + con.primaryExchange or con.exchange, + ]) + expiry = con.lastTradeDateOrContractMonth + if expiry: + key += f'.{expiry}' + # nested dataclass we probably don't need and that - # won't IPC serialize - as_dict.pop('secIdList') + # won't IPC serialize.. + d.secIdList = '' - details[unique_sym] = as_dict + details[key] = d return details async def search_stocks( self, pattern: str, - get_details: bool = False, upto: int = 3, # how many contracts to search "up to" ) -> dict[str, ContractDetails]: @@ -388,31 +394,13 @@ class Client: ''' descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) - if descriptions is not None: - descrs = descriptions[:upto] - - if get_details: - deats = await self.con_deats([d.contract for d in descrs]) - return deats - - else: - results = {} - for d in descrs: - con = d.contract - # sometimes there's a weird extra suffix returned - # from search? - exch = con.primaryExchange.rsplit('.')[0] - unique_sym = f'{con.symbol}.{exch}' - expiry = con.lastTradeDateOrContractMonth - if expiry: - unique_sym += f'{expiry}' - - results[unique_sym] = {} - - return results - else: + if descriptions is None: return {} + # limit + descrs = descriptions[:upto] + return await self.con_deats([d.contract for d in descrs]) + async def search_symbols( self, pattern: str, @@ -427,36 +415,30 @@ class Client: results = await self.search_stocks( pattern, upto=upto, - get_details=True, ) - for key, contracts in results.copy().items(): - tract = contracts['contract'] - sym = tract['symbol'] + for key, deats in results.copy().items(): + + tract = deats.contract + sym = tract.symbol + sectype = tract.secType - sectype = tract['secType'] if sectype == 'IND': results[f'{sym}.IND'] = tract results.pop(key) - exch = tract['exchange'] + exch = tract.exchange if exch in _futes_venues: # try get all possible contracts for symbol as per, # https://interactivebrokers.github.io/tws-api/basic_contracts.html#fut - con = Contract( - 'FUT+CONTFUT', + con = ibis.Future( symbol=sym, exchange=exch, ) try: - possibles = await self.ib.qualifyContractsAsync(con) - for i, condict in enumerate(sorted( - map(asdict, possibles), - # sort by expiry - key=lambda con: con['lastTradeDateOrContractMonth'], - )): - expiry = condict['lastTradeDateOrContractMonth'] - results[f'{sym}.{exch}.{expiry}'] = condict + all_deats = await self.con_deats([con]) + results |= all_deats + except RequestError as err: log.warning(err.message) @@ -600,6 +582,12 @@ class Client: raise ValueError(f"No contract could be found {con}") self._contracts[pattern] = contract + + # add an aditional entry with expiry suffix if available + conexp = contract.lastTradeDateOrContractMonth + if conexp: + self._contracts[pattern + f'.{conexp}'] = contract + return contract async def get_head_time( @@ -1640,7 +1628,7 @@ async def backfill_bars( out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) - if out == None: + if out is None: # could be trying to retreive bars over weekend # TODO: add logic here to handle tradable hours and # only grab valid bars in the range