From b7c924046ab2380c5985796829112e14f7f7e14a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 19 Aug 2020 07:42:49 -0400 Subject: [PATCH] Begin to use `@tractor.msg.pub` throughout streaming API Since the new FSP system will require time aligned data amongst actors, it makes sense to share broker data feeds as much as possible on a local system. There doesn't seem to be downside to this approach either since if not fanning-out in our code, the broker (server) has to do it anyway (and who knows how junk their implementation is) though with more clients, sockets etc. in memory on our end. It also preps the code for introducing a more "serious" pub-sub systems like zeromq/nanomessage. --- piker/brokers/ib.py | 47 ++++++++++++++++++++++++++++++------------ piker/data/__init__.py | 7 +++++-- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 75ace77a..a5a4b254 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -8,7 +8,7 @@ built on it) and thus actor aware API calls must be spawned with from contextlib import asynccontextmanager from dataclasses import asdict from functools import partial -from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator +from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator, Callable import asyncio import logging import inspect @@ -224,10 +224,14 @@ class Client: # use heuristics to figure out contract "type" sym, exch = symbol.upper().split('.') + # TODO: metadata system for all these exchange rules.. + if exch in ('PURE',): + currency = 'CAD' + if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): con = await self.get_cont_fute(symbol=sym, exchange=exch) - elif exch == 'CMDTY': # eg. XAUSUSD.CMDTY + elif exch == 'CMDTY': # eg. XAUUSD.CMDTY con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym] con = ibis.Commodity(**con_kwargs) con.bars_kwargs = bars_kwargs @@ -356,7 +360,10 @@ async def _trio_run_client_method( class _MethodProxy: - def __init__(self, portal: tractor._portal.Portal): + def __init__( + self, + portal: tractor._portal.Portal + ) -> None: self._portal = portal async def _run_method( @@ -420,9 +427,8 @@ def normalize( ticker.ticks = new_ticks - # some contracts don't have volume so we may want to - # calculate a midpoint price based on data we can acquire - # (such as bid / ask) + # some contracts don't have volume so we may want to calculate + # a midpoint price based on data we can acquire (such as bid / ask) if calc_price: ticker.ticks.append( {'type': 'trade', 'price': ticker.marketPrice()} @@ -439,7 +445,9 @@ def normalize( return data +@tractor.msg.pub async def stream_quotes( + get_topics: Callable, symbols: List[str], loglevel: str = None, ) -> AsyncGenerator[str, Dict[str, Any]]: @@ -451,24 +459,29 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) + # TODO: support multiple subscriptions + sym = symbols[0] + stream = await tractor.to_asyncio.run_task( _trio_run_client_method, method='stream_ticker', - symbol=symbols[0], + symbol=sym, ) async with aclosing(stream): # first quote can be ignored as a 2nd with newer data is sent? first_ticker = await stream.__anext__() - # quote_cache = {} if type(first_ticker.contract) not in (ibis.Commodity,): + suffix = 'exchange' calc_price = False # should be real volume for contract - data = normalize(first_ticker) + quote = normalize(first_ticker) + log.debug(f"First ticker received {quote}") - log.debug(f"First ticker received {data}") - yield data + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} async for ticker in stream: # spin consuming tickers until we get a real market datum @@ -476,19 +489,27 @@ async def stream_quotes( log.debug(f"New unsent ticker: {ticker}") continue else: - yield normalize(ticker) log.debug("Received first real volume tick") + quote = normalize(ticker) + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} + # XXX: this works because we don't use # ``aclosing()`` above? break else: + # commodities don't have an exchange name for some reason? + suffix = 'secType' calc_price = True async for ticker in stream: - yield normalize( + quote = normalize( ticker, calc_price=calc_price ) + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} # ugh, clear ticks since we've consumed them ticker.ticks = [] diff --git a/piker/data/__init__.py b/piker/data/__init__.py index bb8fca8b..25efe088 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -62,7 +62,6 @@ async def maybe_spawn_brokerd( dname = f'brokerd.{brokername}' async with tractor.find_actor(dname) as portal: # WTF: why doesn't this work? - log.info(f"YOYOYO {__name__}") if portal is not None: yield portal else: @@ -89,7 +88,7 @@ async def maybe_spawn_brokerd( async def open_feed( name: str, symbols: Sequence[str], - loglevel: str = 'info', + loglevel: Optional[str] = None, ) -> AsyncIterator[Dict[str, Any]]: """Open a "data feed" which provides streamed real-time quotes. """ @@ -98,6 +97,9 @@ async def open_feed( except ImportError: mod = get_ingestormod(name) + if loglevel is None: + loglevel = tractor.current_actor().loglevel + async with maybe_spawn_brokerd( mod.name, loglevel=loglevel, @@ -106,6 +108,7 @@ async def open_feed( mod.__name__, 'stream_quotes', symbols=symbols, + topics=symbols, ) # Feed is required to deliver an initial quote asap. # TODO: should we timeout and raise a more explicit error?