diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 75ace77a..a5a4b254 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -8,7 +8,7 @@ built on it) and thus actor aware API calls must be spawned with from contextlib import asynccontextmanager from dataclasses import asdict from functools import partial -from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator +from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator, Callable import asyncio import logging import inspect @@ -224,10 +224,14 @@ class Client: # use heuristics to figure out contract "type" sym, exch = symbol.upper().split('.') + # TODO: metadata system for all these exchange rules.. + if exch in ('PURE',): + currency = 'CAD' + if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): con = await self.get_cont_fute(symbol=sym, exchange=exch) - elif exch == 'CMDTY': # eg. XAUSUSD.CMDTY + elif exch == 'CMDTY': # eg. XAUUSD.CMDTY con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym] con = ibis.Commodity(**con_kwargs) con.bars_kwargs = bars_kwargs @@ -356,7 +360,10 @@ async def _trio_run_client_method( class _MethodProxy: - def __init__(self, portal: tractor._portal.Portal): + def __init__( + self, + portal: tractor._portal.Portal + ) -> None: self._portal = portal async def _run_method( @@ -420,9 +427,8 @@ def normalize( ticker.ticks = new_ticks - # some contracts don't have volume so we may want to - # calculate a midpoint price based on data we can acquire - # (such as bid / ask) + # some contracts don't have volume so we may want to calculate + # a midpoint price based on data we can acquire (such as bid / ask) if calc_price: ticker.ticks.append( {'type': 'trade', 'price': ticker.marketPrice()} @@ -439,7 +445,9 @@ def normalize( return data +@tractor.msg.pub async def stream_quotes( + get_topics: Callable, symbols: List[str], loglevel: str = None, ) -> AsyncGenerator[str, Dict[str, Any]]: @@ -451,24 +459,29 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) + # TODO: support multiple subscriptions + sym = symbols[0] + stream = await tractor.to_asyncio.run_task( _trio_run_client_method, method='stream_ticker', - symbol=symbols[0], + symbol=sym, ) async with aclosing(stream): # first quote can be ignored as a 2nd with newer data is sent? first_ticker = await stream.__anext__() - # quote_cache = {} if type(first_ticker.contract) not in (ibis.Commodity,): + suffix = 'exchange' calc_price = False # should be real volume for contract - data = normalize(first_ticker) + quote = normalize(first_ticker) + log.debug(f"First ticker received {quote}") - log.debug(f"First ticker received {data}") - yield data + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} async for ticker in stream: # spin consuming tickers until we get a real market datum @@ -476,19 +489,27 @@ async def stream_quotes( log.debug(f"New unsent ticker: {ticker}") continue else: - yield normalize(ticker) log.debug("Received first real volume tick") + quote = normalize(ticker) + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} + # XXX: this works because we don't use # ``aclosing()`` above? break else: + # commodities don't have an exchange name for some reason? + suffix = 'secType' calc_price = True async for ticker in stream: - yield normalize( + quote = normalize( ticker, calc_price=calc_price ) + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} # ugh, clear ticks since we've consumed them ticker.ticks = [] diff --git a/piker/data/__init__.py b/piker/data/__init__.py index bb8fca8b..25efe088 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -62,7 +62,6 @@ async def maybe_spawn_brokerd( dname = f'brokerd.{brokername}' async with tractor.find_actor(dname) as portal: # WTF: why doesn't this work? - log.info(f"YOYOYO {__name__}") if portal is not None: yield portal else: @@ -89,7 +88,7 @@ async def maybe_spawn_brokerd( async def open_feed( name: str, symbols: Sequence[str], - loglevel: str = 'info', + loglevel: Optional[str] = None, ) -> AsyncIterator[Dict[str, Any]]: """Open a "data feed" which provides streamed real-time quotes. """ @@ -98,6 +97,9 @@ async def open_feed( except ImportError: mod = get_ingestormod(name) + if loglevel is None: + loglevel = tractor.current_actor().loglevel + async with maybe_spawn_brokerd( mod.name, loglevel=loglevel, @@ -106,6 +108,7 @@ async def open_feed( mod.__name__, 'stream_quotes', symbols=symbols, + topics=symbols, ) # Feed is required to deliver an initial quote asap. # TODO: should we timeout and raise a more explicit error?