diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 75ace77a..a5a4b254 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -8,7 +8,7 @@ built on it) and thus actor aware API calls must be spawned with from contextlib import asynccontextmanager from dataclasses import asdict from functools import partial -from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator +from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator, Callable import asyncio import logging import inspect @@ -224,10 +224,14 @@ class Client: # use heuristics to figure out contract "type" sym, exch = symbol.upper().split('.') + # TODO: metadata system for all these exchange rules.. + if exch in ('PURE',): + currency = 'CAD' + if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): con = await self.get_cont_fute(symbol=sym, exchange=exch) - elif exch == 'CMDTY': # eg. XAUSUSD.CMDTY + elif exch == 'CMDTY': # eg. XAUUSD.CMDTY con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym] con = ibis.Commodity(**con_kwargs) con.bars_kwargs = bars_kwargs @@ -356,7 +360,10 @@ async def _trio_run_client_method( class _MethodProxy: - def __init__(self, portal: tractor._portal.Portal): + def __init__( + self, + portal: tractor._portal.Portal + ) -> None: self._portal = portal async def _run_method( @@ -420,9 +427,8 @@ def normalize( ticker.ticks = new_ticks - # some contracts don't have volume so we may want to - # calculate a midpoint price based on data we can acquire - # (such as bid / ask) + # some contracts don't have volume so we may want to calculate + # a midpoint price based on data we can acquire (such as bid / ask) if calc_price: ticker.ticks.append( {'type': 'trade', 'price': ticker.marketPrice()} @@ -439,7 +445,9 @@ def normalize( return data +@tractor.msg.pub async def stream_quotes( + get_topics: Callable, symbols: List[str], loglevel: str = None, ) -> AsyncGenerator[str, Dict[str, Any]]: @@ -451,24 +459,29 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) + # TODO: support multiple subscriptions + sym = symbols[0] + stream = await tractor.to_asyncio.run_task( _trio_run_client_method, method='stream_ticker', - symbol=symbols[0], + symbol=sym, ) async with aclosing(stream): # first quote can be ignored as a 2nd with newer data is sent? first_ticker = await stream.__anext__() - # quote_cache = {} if type(first_ticker.contract) not in (ibis.Commodity,): + suffix = 'exchange' calc_price = False # should be real volume for contract - data = normalize(first_ticker) + quote = normalize(first_ticker) + log.debug(f"First ticker received {quote}") - log.debug(f"First ticker received {data}") - yield data + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} async for ticker in stream: # spin consuming tickers until we get a real market datum @@ -476,19 +489,27 @@ async def stream_quotes( log.debug(f"New unsent ticker: {ticker}") continue else: - yield normalize(ticker) log.debug("Received first real volume tick") + quote = normalize(ticker) + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} + # XXX: this works because we don't use # ``aclosing()`` above? break else: + # commodities don't have an exchange name for some reason? + suffix = 'secType' calc_price = True async for ticker in stream: - yield normalize( + quote = normalize( ticker, calc_price=calc_price ) + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} # ugh, clear ticks since we've consumed them ticker.ticks = [] diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 605c5177..ea3850fd 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -4,7 +4,7 @@ Kraken backend. from contextlib import asynccontextmanager from dataclasses import dataclass, asdict, field from itertools import starmap -from typing import List, Dict, Any +from typing import List, Dict, Any, Callable import json import time @@ -144,7 +144,9 @@ class OHLC: setattr(self, f, val.type(getattr(self, f))) +@tractor.msg.pub async def stream_quotes( + get_topics: Callable, # These are the symbols not expected by the ws api # they are looked up inside this routine. symbols: List[str] = ['XBTUSD', 'XMRUSD'], @@ -181,6 +183,10 @@ async def stream_quotes( # 'depth': '25', }, } + # TODO: we want to eventually allow unsubs which should + # be completely fine to request from a separate task + # since internally the ws methods appear to be FIFO + # locked. await ws.send_message(json.dumps(subs)) async def recv(): @@ -222,7 +228,14 @@ async def stream_quotes( # pull a first quote and deliver ohlc_gen = recv_ohlc() ohlc_last = await ohlc_gen.__anext__() - yield asdict(ohlc_last) + + # seriously eh? what's with this non-symmetry everywhere + # in subscription systems... + quote = asdict(ohlc_last) + topic = quote['pair'].replace('/', '') + + # packetize as {topic: quote} + yield {topic: quote} # keep start of last interval for volume tracking last_interval_start = ohlc_last.etime @@ -246,8 +259,16 @@ async def stream_quotes( 'price': ohlc.close, 'size': tick_volume, }) - yield asdict(ohlc) + + # XXX: format required by ``tractor.msg.pub`` + # requires a ``Dict[topic: str, quote: dict]`` + quote = asdict(ohlc) + print(quote) + topic = quote['pair'].replace('/', '') + yield {topic: quote} + ohlc_last = ohlc + except (ConnectionClosed, DisconnectionTimeout): log.exception("Good job kraken...reconnecting") diff --git a/piker/data/__init__.py b/piker/data/__init__.py index bb8fca8b..25efe088 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -62,7 +62,6 @@ async def maybe_spawn_brokerd( dname = f'brokerd.{brokername}' async with tractor.find_actor(dname) as portal: # WTF: why doesn't this work? - log.info(f"YOYOYO {__name__}") if portal is not None: yield portal else: @@ -89,7 +88,7 @@ async def maybe_spawn_brokerd( async def open_feed( name: str, symbols: Sequence[str], - loglevel: str = 'info', + loglevel: Optional[str] = None, ) -> AsyncIterator[Dict[str, Any]]: """Open a "data feed" which provides streamed real-time quotes. """ @@ -98,6 +97,9 @@ async def open_feed( except ImportError: mod = get_ingestormod(name) + if loglevel is None: + loglevel = tractor.current_actor().loglevel + async with maybe_spawn_brokerd( mod.name, loglevel=loglevel, @@ -106,6 +108,7 @@ async def open_feed( mod.__name__, 'stream_quotes', symbols=symbols, + topics=symbols, ) # Feed is required to deliver an initial quote asap. # TODO: should we timeout and raise a more explicit error?