diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index a9d75d55..eb9fc7aa 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -19,7 +19,10 @@ Data feed endpoints pre-wrapped and ready for use with ``tractor``/``trio``. """ from __future__ import annotations import asyncio -from contextlib import asynccontextmanager as acm +from contextlib import ( + asynccontextmanager as acm, + nullcontext, +) from decimal import Decimal from dataclasses import asdict from datetime import datetime @@ -59,6 +62,9 @@ from .api import ( Contract, ) from ._util import data_reset_hack +from piker._cacheables import ( + async_lifo_cache, +) # https://interactivebrokers.github.io/tws-api/tick_types.html @@ -733,41 +739,55 @@ def normalize( return data -# TODO! -# async def get_mkt_info( -# fqme: str, +@async_lifo_cache() +async def get_mkt_info( + fqme: str, -# _cache: dict[str, MktPair] = {} + proxy: MethodProxy | None = None, -# ) -> tuple[MktPair, Pair]: +) -> tuple[MktPair, Pair]: -# both = _cache.get(fqme) -# if both: -# return both + # we don't need to split off any fqme broker part? + # bs_fqme, _, broker = fqme.partition('.') -# proxy: MethodProxy -# async with open_data_client() as proxy: + proxy: MethodProxy + if proxy is not None: + client_ctx = nullcontext(proxy) + else: + client_ctx = open_data_client -# pair: Pair = await client.exch_info(fqme.upper()) -# mkt = MktPair( -# dst=Asset( -# name=pair.baseAsset, -# atype='crypto', -# tx_tick=digits_to_dec(pair.baseAssetPrecision), -# ), -# src=Asset( -# name=pair.quoteAsset, -# atype='crypto', -# tx_tick=digits_to_dec(pair.quoteAssetPrecision), -# ), -# price_tick=pair.price_tick, -# size_tick=pair.size_tick, -# bs_mktid=pair.symbol, -# broker='binance', -# ) -# both = mkt, pair -# _cache[fqme] = both -# return both + async with client_ctx as proxy: + + try: + ( + con, # Contract + details, # ContractDetails + ) = await proxy.get_sym_details(symbol=fqme) + except ConnectionError: + log.exception(f'Proxy is ded {proxy._aio_ns}') + raise + + # pair: Pair = await client.exch_info(fqme.upper()) + + # mkt = MktPair( + # dst=Asset( + # name=pair.baseAsset, + # atype='crypto', + # tx_tick=digits_to_dec(pair.baseAssetPrecision), + # ), + # src=Asset( + # name=pair.quoteAsset, + # atype='crypto', + # tx_tick=digits_to_dec(pair.quoteAssetPrecision), + # ), + # price_tick=pair.price_tick, + # size_tick=pair.size_tick, + # bs_mktid=pair.symbol, + # broker='binance', + # ) + + # return both + return con, details async def stream_quotes( @@ -794,18 +814,11 @@ async def stream_quotes( proxy: MethodProxy async with open_data_client() as proxy: - try: - ( - con, # Contract - first_ticker, # Ticker - details, # ContractDetails - ) = await proxy.get_sym_details(symbol=sym) - except ConnectionError: - log.exception(f'Proxy is ded {proxy._aio_ns}') - raise + con, details = await get_mkt_info(sym, proxy=proxy) + first_ticker = await proxy.get_quote(contract=con) first_quote = normalize(first_ticker) - # print(f'first quote: {first_quote}') + log.runtime(f'FIRST QUOTE: {first_quote}') def mk_init_msgs() -> dict[str, dict]: ''' @@ -879,7 +892,7 @@ async def stream_quotes( # TODO: we should instead spawn a task that waits on a feed to start # and let it wait indefinitely..instead of this hard coded stuff. with trio.move_on_after(1): - contract, first_ticker, details = await proxy.get_quote(symbol=sym) + first_ticker = await proxy.get_quote(contract=con) # it might be outside regular trading hours so see if we can at # least grab history.