diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 71175b07..2f4cdb78 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -1164,6 +1164,12 @@ def norm_trade_records( exch = record['exchange'] lexch = record.get('listingExchange') + # NOTE: remove null values since `tomlkit` can't serialize + # them to file. + dnc = record.pop('deltaNeutralContract', False) + if dnc is not None: + record['deltaNeutralContract'] = dnc + suffix = lexch or exch symbol = record['symbol'] diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index eb9fc7aa..eacfca7b 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -58,13 +58,18 @@ from .api import ( open_client_proxies, get_preferred_data_client, Ticker, - RequestError, Contract, + RequestError, ) from ._util import data_reset_hack from piker._cacheables import ( async_lifo_cache, ) +from piker.accounting import ( + Asset, + MktPair, +) +from piker.data.validate import FeedInit # https://interactivebrokers.github.io/tws-api/tick_types.html @@ -115,7 +120,9 @@ async def open_data_client() -> MethodProxy: @acm async def open_history_client( - fqsn: str, + fqme: str, + + # mkt: MktPair | None = None, ) -> tuple[Callable, int]: ''' @@ -134,6 +141,11 @@ async def open_history_client( async with open_data_client() as proxy: + # TODO: maybe strip the `MktPair.src: Asset` key here? + # see the comment below.. + # if mkt is not None: + # fqme: str = mkt.fqme.remove(mkt.src.name) + max_timeout: float = 2. mean: float = 0 count: int = 0 @@ -141,10 +153,10 @@ async def open_history_client( head_dt: None | datetime = None if ( # fx cons seem to not provide this endpoint? - 'idealpro' not in fqsn + 'idealpro' not in fqme ): try: - head_dt = await proxy.get_head_time(fqsn=fqsn) + head_dt = await proxy.get_head_time(fqsn=fqme) except RequestError: head_dt = None @@ -159,7 +171,7 @@ async def open_history_client( query_start = time.time() out, timedout = await get_bars( proxy, - fqsn, + fqme, timeframe, end_dt=end_dt, ) @@ -517,7 +529,9 @@ async def get_bars( return result, data_cs is not None -asset_type_map = { +# re-mapping to piker asset type names +# https://github.com/erdewit/ib_insync/blob/master/ib_insync/contract.py#L113 +_asset_type_map = { 'STK': 'stock', 'OPT': 'option', 'FUT': 'future', @@ -558,7 +572,7 @@ async def _setup_quote_stream( '294', # Trade rate / minute '295', # Vlm rate / minute ), - contract: Optional[Contract] = None, + contract: Contract | None = None, ) -> trio.abc.ReceiveChannel: ''' @@ -745,19 +759,19 @@ async def get_mkt_info( proxy: MethodProxy | None = None, -) -> tuple[MktPair, Pair]: +) -> tuple[MktPair, ibis.ContractDetails]: - # we don't need to split off any fqme broker part? + # XXX: we don't need to split off any fqme broker part? # bs_fqme, _, broker = fqme.partition('.') proxy: MethodProxy + get_details: bool = False if proxy is not None: client_ctx = nullcontext(proxy) else: client_ctx = open_data_client async with client_ctx as proxy: - try: ( con, # Contract @@ -767,27 +781,61 @@ async def get_mkt_info( log.exception(f'Proxy is ded {proxy._aio_ns}') raise - # pair: Pair = await client.exch_info(fqme.upper()) + # TODO: more consistent field translation + init_info: dict = {} + atype = _asset_type_map[con.secType] - # mkt = MktPair( - # dst=Asset( - # name=pair.baseAsset, - # atype='crypto', - # tx_tick=digits_to_dec(pair.baseAssetPrecision), - # ), - # src=Asset( - # name=pair.quoteAsset, - # atype='crypto', - # tx_tick=digits_to_dec(pair.quoteAssetPrecision), - # ), - # price_tick=pair.price_tick, - # size_tick=pair.size_tick, - # bs_mktid=pair.symbol, - # broker='binance', - # ) + venue = con.primaryExchange or con.exchange + price_tick: Decimal = Decimal(str(details.minTick)) - # return both - return con, details + if atype == 'stock': + # XXX: GRRRR they don't support fractional share sizes for + # stocks from the API?! + # if con.secType == 'STK': + size_tick = Decimal('1') + else: + size_tick: Decimal = Decimal(str(details.minSize).rstrip('0')) + # |-> TODO: there is also the Contract.sizeIncrement, bt wtf is it? + + # NOTE: this is duplicate from the .broker.norm_trade_records() + # routine, we should factor all this parsing somewhere.. + expiry_str = str(con.lastTradeDateOrContractMonth) + # if expiry: + # expiry_str: str = str(pendulum.parse( + # str(expiry).strip(' ') + # )) + + mkt = MktPair( + dst=Asset( + name=con.symbol.lower(), + atype=atype, + tx_tick=size_tick, + ), + + # TODO: currently we can't pass the fiat src asset because + # then we'll get a `MNQUSD` request for history data.. + # we need to figure out how we're going to handle this (later?) + # but likely we want all backends to eventually handle + # ``dst/src.venue.`` style? + # src=Asset( + # name=str(con.currency), + # atype='fiat', + # tx_tick=Decimal('0.01'), # right? + # ), + + price_tick=price_tick, + size_tick=size_tick, + + bs_mktid=str(con.conId), + venue=str(venue), + expiry=expiry_str, + broker='ib', + + # TODO: options contract info as str? + # contract_info= + ) + + return mkt, details async def stream_quotes( @@ -812,83 +860,36 @@ async def stream_quotes( sym = symbols[0] log.info(f'request for real-time quotes: {sym}') + init_msgs: list[FeedInit] = [] + proxy: MethodProxy + mkt: MktPair + details: ibis.ContractDetails async with open_data_client() as proxy: - con, details = await get_mkt_info(sym, proxy=proxy) + mkt, details = await get_mkt_info( + sym, + proxy=proxy, # passed to avoid implicit client load + ) - first_ticker = await proxy.get_quote(contract=con) - first_quote = normalize(first_ticker) + init_msg = FeedInit(mkt_info=mkt) + + has_vlm: bool = True + if mkt.dst.atype in { + 'forex', + 'index', + 'commodity', + }: + has_vlm = False + # tell sampler config that it shouldn't do vlm summing. + init_msg.shm_write_opts['sum_tick_vlm'] = False + + init_msgs.append(init_msg) + + con: Contract = details.contract + first_ticker: Ticker = await proxy.get_quote(contract=con) + first_quote: dict = normalize(first_ticker) log.runtime(f'FIRST QUOTE: {first_quote}') - def mk_init_msgs() -> dict[str, dict]: - ''' - Collect a bunch of meta-data useful for feed startup and - pack in a `dict`-msg. - - ''' - # pass back some symbol info like min_tick, trading_hours, etc. - con: Contract = details.contract - syminfo = asdict(details) - syminfo.update(syminfo['contract']) - - # nested dataclass we probably don't need and that won't IPC - # serialize - syminfo.pop('secIdList') - - # TODO: more consistent field translation - atype = syminfo['asset_type'] = asset_type_map[syminfo['secType']] - - if atype in { - 'forex', - 'index', - 'commodity', - }: - syminfo['no_vlm'] = True - - # XXX: pretty sure we don't need this any more right? - # for stocks it seems TWS reports too small a tick size - # such that you can't submit orders with that granularity? - # min_price_tick = Decimal('0.01') if atype == 'stock' else 0 - # price_tick = max(price_tick, min_tick) - - price_tick: Decimal = Decimal(str(syminfo['minTick'])) - size_tick: Decimal = Decimal(str(syminfo['minSize']).rstrip('0')) - - # XXX: GRRRR they don't support fractional share sizes for - # stocks from the API?! - if con.secType == 'STK': - size_tick = Decimal('1') - - syminfo['price_tick_size'] = price_tick - # NOTE: as you'd expect for "legacy" assets, the "volume - # precision" is normally discreet. - syminfo['lot_tick_size'] = size_tick - - # should be at top level right? - syminfo['bs_mktid'] = con.conId - - # ibclient = proxy._aio_ns.ib.client - # host, port = ibclient.host, ibclient.port - fqsn = first_quote['fqsn'] - - # TODO: for loop through all symbols passed in - init_msgs: dict[str, dict] = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': syminfo, - 'fqsn': fqsn, - 'bs_mktid': con.conId, - }, - # 'status': { - # 'data_ep': f'{host}:{port}', - # }, - - } - return init_msgs, syminfo - - init_msgs, syminfo = mk_init_msgs() - # TODO: we should instead spawn a task that waits on a feed to start # and let it wait indefinitely..instead of this hard coded stuff. with trio.move_on_after(1): @@ -954,13 +955,14 @@ async def stream_quotes( nurse.start_soon(reset_on_feed) async with aclosing(stream): - if syminfo.get('no_vlm', False): + # if syminfo.get('no_vlm', False): + if not has_vlm: # generally speaking these feeds don't # include vlm data. - atype = syminfo['asset_type'] + atype = mkt.dst.atype log.info( - f'No-vlm {sym}@{atype}, skipping quote poll' + f'No-vlm {mkt.fqme}@{atype}, skipping quote poll' ) else: