From 1abadeb5063e8bdfd14cafd9d293fc3e4ef1b019 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 24 Jun 2020 13:17:29 -0400 Subject: [PATCH 01/17] Add initial IB broker backend using ib_insync Start working towards meeting the backend client api. Infect `asyncio` using `trio`'s new guest mode and demonstrate real-time ticker streaming to console. --- piker/brokers/ib.py | 242 ++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 242 insertions(+) create mode 100644 piker/brokers/ib.py diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py new file mode 100644 index 00000000..0e200c9f --- /dev/null +++ b/piker/brokers/ib.py @@ -0,0 +1,242 @@ +""" +Interactive Brokers API backend. +""" +import asyncio +from dataclasses import asdict +from typing import List, Dict, Any +from contextlib import asynccontextmanager + +import trio +import ib_insync as ibis +from ib_insync.ticker import Ticker +from ib_insync.contract import Contract, ContractDetails + + +_time_frames = { + '1s': '1 Sec', + '1m': 'OneMinute', + '2m': 'TwoMinutes', + '3m': 'ThreeMinutes', + '4m': 'FourMinutes', + '5m': 'FiveMinutes', + '10m': 'TenMinutes', + '15m': 'FifteenMinutes', + '20m': 'TwentyMinutes', + '30m': 'HalfHour', + '1h': 'OneHour', + '2h': 'TwoHours', + '4h': 'FourHours', + 'D': 'OneDay', + 'W': 'OneWeek', + 'M': 'OneMonth', + 'Y': 'OneYear', +} + + +class Client: + """IB wrapped for our broker backend API. + """ + def __init__( + self, + ib: ibis.IB, + ) -> None: + self.ib = ib + # connect data feed callback... + self.ib.pendingTickersEvent.connect(self.on_tickers) + + async def bars( + self, + symbol: str, + # EST in ISO 8601 format is required... below is EPOCH + start_date: str = "1970-01-01T00:00:00.000000-05:00", + time_frame: str = '1m', + count: int = int(20e3), # <- max allowed per query + is_paid_feed: bool = False, + ) -> List[Dict[str, Any]]: + """Retreive OHLCV bars for a symbol over a range to the present. + """ + contract = ibis.ContFuture('ES', exchange='GLOBEX') + # contract = ibis.Stock('WEED', 'SMART', 'CAD') + bars = self.ib.reqHistoricalData( + contract, + endDateTime='', + # durationStr='60 S', + durationStr='2000 S', + barSizeSetting='1 secs', + whatToShow='TRADES', + useRTH=False + ) + # barSizeSetting='1 min', whatToShow='MIDPOINT', useRTH=True) + # convert to pandas dataframe: + df = ibis.util.df(bars) + print(df[['date', 'open', 'high', 'low', 'close', 'volume']]) + from piker.ui._source import from_df + a = from_df(df) + # breakpoint() + print(a) + + # TODO: reimplement this using async batch requests + # see https://github.com/erdewit/ib_insync/issues/262 + async def search_stocks( + self, + pattern: str, + # how many contracts to search "up to" + upto: int = 3, + asdicts: bool = True, + ) -> Dict[str, ContractDetails]: + """Search for stocks matching provided ``str`` pattern. + + Return a dictionary of ``upto`` entries worth of contract details. + """ + descriptions = self.ib.reqMatchingSymbols(pattern) + details = {} + for description in descriptions: + con = description.contract + deats = self.ib.reqContractDetails(con) + # XXX: if there is more then one entry in the details list + # then the contract is so called "ambiguous". + for d in deats: + unique_sym = f'{con.symbol}.{con.primaryExchange}' + details[unique_sym] = asdict(d) if asdicts else d + if len(details) == upto: + return details + + return details + + async def search_futes( + self, + pattern: str, + # how many contracts to search "up to" + upto: int = 3, + asdicts: bool = True, + ) -> Dict[str, ContractDetails]: + raise NotImplementedError + + def get_cont_fute( + self, + symbol: str, + ) -> Contract: + raise NotImplementedError + + +# default config ports +_tws_port: int = 7497 +_gw_port: int = 4002 + + +@asynccontextmanager +async def get_client( + host: str = '127.0.0.1', + port: int = None, + client_id: int = 1, +) -> Client: + """Return an ``ib_insync.IB`` instance wrapped in our client API. + """ + ib = ibis.IB() + # TODO: some detection magic to figure out if tws vs. the + # gateway is up ad choose the appropriate port + if port is None: + ports = [_tws_port, _gw_port] + else: + ports = [port] + + _err = None + # try all default ports + for port in ports: + try: + await ib.connectAsync(host, port, clientId=client_id) + break + except ConnectionRefusedError as ce: + _err = ce + print(f'failed to connect on {port}') + else: + raise ConnectionRefusedError(_err) + + yield Client(ib) + ib.disconnect() + + +if __name__ == '__main__': + + con_es = ibis.ContFuture('ES', exchange='GLOBEX') + es = ibis.Future('ES', '20200918', exchange='GLOBEX') + spy = ibis.Stock('SPY', exchange='ARCA') + + # ticker = client.ib.reqTickByTickData( + # contract, + # tickType='Last', + # numberOfTicks=1, + # ) + # client.ib.reqTickByTickData( + # contract, + # tickType='AllLast', + # numberOfTicks=1, + # ) + # client.ib.reqTickByTickData( + # contract, + # tickType='BidAsk', + # numberOfTicks=1, + # ) + + # ITC (inter task comms) + from_trio = asyncio.Queue() + to_trio, from_aio = trio.open_memory_channel(float("inf")) + + async def start_ib(from_trio, to_trio): + print("starting the EYEEEEBEEEEE GATEWAYYYYYYY!") + async with get_client() as client: + + # stream ticks to trio task + def ontick(ticker: Ticker): + for t in ticker.ticks: + # send tick data to trio + to_trio.send_nowait(t) + + ticker = client.ib.reqMktData(spy, '588', False, False, None) + ticker.updateEvent += ontick + + n = await from_trio.get() + assert n == 0 + + # sleep and let the engine run + await asyncio.sleep(float('inf')) + + # TODO: cmd processing from trio + # while True: + # n = await from_trio.get() + # print(f"aio got: {n}") + # to_trio.send_nowait(n + 1) + + async def trio_main(): + print("trio_main!") + + asyncio.create_task( + start_ib(from_trio, to_trio) + ) + + from_trio.put_nowait(0) + + async for tick in from_aio: + print(f"trio got: {tick}") + + # TODO: send cmds to asyncio + # from_trio.put_nowait(n + 1) + + async def aio_main(): + loop = asyncio.get_running_loop() + + trio_done_fut = asyncio.Future() + + def trio_done_callback(main_outcome): + print(f"trio_main finished: {main_outcome!r}") + trio_done_fut.set_result(main_outcome) + + trio.lowlevel.start_guest_run( + trio_main, + run_sync_soon_threadsafe=loop.call_soon_threadsafe, + done_callback=trio_done_callback, + ) + + (await trio_done_fut).unwrap() + + asyncio.run(aio_main()) From b8209cd506582ab9af8d5513851d27314b3d6c87 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Jul 2020 12:54:34 -0400 Subject: [PATCH 02/17] Add a mostly actor aware API to IB backend Infected `asyncio` support is being added to `tractor` in goodboy/tractor#121 so delegate to all that new machinery. Start building out an "actor-aware" api which takes care of all the `trio`-`asyncio` interaction for data streaming and request handling. Add a little (shudder) method proxy system which can be used to invoke client methods from another actor. Start on a streaming api in preparation for real-time charting. --- piker/brokers/ib.py | 310 ++++++++++++++++++++++++++++++-------------- 1 file changed, 213 insertions(+), 97 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 0e200c9f..06ab8774 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1,16 +1,29 @@ """ Interactive Brokers API backend. + +Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is +built on it) and thus actor aware apis must be spawned with +``infected_aio==True``. """ import asyncio from dataclasses import asdict -from typing import List, Dict, Any +from functools import partial +import inspect +from typing import List, Dict, Any, Tuple from contextlib import asynccontextmanager +import time -import trio +import tractor +from async_generator import aclosing import ib_insync as ibis from ib_insync.ticker import Ticker from ib_insync.contract import Contract, ContractDetails +from ..log import get_logger, get_console_log + + +log = get_logger(__name__) + _time_frames = { '1s': '1 Sec', @@ -35,14 +48,14 @@ _time_frames = { class Client: """IB wrapped for our broker backend API. + + Note: this client requires running inside an ``asyncio`` loop. """ def __init__( self, ib: ibis.IB, ) -> None: self.ib = ib - # connect data feed callback... - self.ib.pendingTickersEvent.connect(self.on_tickers) async def bars( self, @@ -57,7 +70,7 @@ class Client: """ contract = ibis.ContFuture('ES', exchange='GLOBEX') # contract = ibis.Stock('WEED', 'SMART', 'CAD') - bars = self.ib.reqHistoricalData( + bars = await self.ib.reqHistoricalDataAsync( contract, endDateTime='', # durationStr='60 S', @@ -88,16 +101,25 @@ class Client: Return a dictionary of ``upto`` entries worth of contract details. """ - descriptions = self.ib.reqMatchingSymbols(pattern) + descriptions = await self.ib.reqMatchingSymbolsAsync(pattern) + + futs = [] + for d in descriptions: + con = d.contract + futs.append(self.ib.reqContractDetailsAsync(con)) + + # batch request all details + results = await asyncio.gather(*futs) + + # XXX: if there is more then one entry in the details list details = {} - for description in descriptions: - con = description.contract - deats = self.ib.reqContractDetails(con) - # XXX: if there is more then one entry in the details list + for details_set in results: # then the contract is so called "ambiguous". - for d in deats: + for d in details_set: + con = d.contract unique_sym = f'{con.symbol}.{con.primaryExchange}' details[unique_sym] = asdict(d) if asdicts else d + if len(details) == upto: return details @@ -118,6 +140,22 @@ class Client: ) -> Contract: raise NotImplementedError + async def stream_ticker( + self, + symbol: str, + to_trio, + opts: Tuple[int] = ('233', '375'), + ) -> None: + """Stream a ticker using the std L1 api. + """ + sym, exch = symbol.split('.') + contract = ibis.Stock(sym.upper(), exchange=exch.upper()) + ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) + ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) + + # let the engine run and stream + await self.ib.disconnectedEvent + # default config ports _tws_port: int = 7497 @@ -125,7 +163,7 @@ _gw_port: int = 4002 @asynccontextmanager -async def get_client( +async def _aio_get_client( host: str = '127.0.0.1', port: int = None, client_id: int = 1, @@ -133,8 +171,7 @@ async def get_client( """Return an ``ib_insync.IB`` instance wrapped in our client API. """ ib = ibis.IB() - # TODO: some detection magic to figure out if tws vs. the - # gateway is up ad choose the appropriate port + if port is None: ports = [_tws_port, _gw_port] else: @@ -152,91 +189,170 @@ async def get_client( else: raise ConnectionRefusedError(_err) - yield Client(ib) - ib.disconnect() + try: + yield Client(ib) + except BaseException: + ib.disconnect() + raise + + +async def _aio_run_client_method( + meth: str, + to_trio, + from_trio, + **kwargs, +) -> None: + log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!") + async with _aio_get_client() as client: + + async_meth = getattr(client, meth) + + # handle streaming methods + args = tuple(inspect.getfullargspec(async_meth).args) + if 'to_trio' in args: + kwargs['to_trio'] = to_trio + + return await async_meth(**kwargs) + + +async def _trio_run_client_method( + method: str, + **kwargs, +) -> None: + ca = tractor.current_actor() + assert ca.is_infected_aio() + + # if the method is an async gen stream for it + meth = getattr(Client, method) + if inspect.isasyncgenfunction(meth): + kwargs['_treat_as_stream'] = True + + # if the method is an async func but streams back results + # make sure to also stream from it + args = tuple(inspect.getfullargspec(meth).args) + if 'to_trio' in args: + kwargs['_treat_as_stream'] = True + + result = await tractor.to_asyncio.run_task( + _aio_run_client_method, + meth=method, + **kwargs + ) + return result + + +def get_method_proxy(portal): + + class MethodProxy: + def __init__(self, portal: tractor._portal.Portal): + self._portal = portal + + async def _run_method( + self, + *, + meth: str = None, + **kwargs + ) -> Any: + return await self._portal.run( + __name__, + '_trio_run_client_method', + method=meth, + **kwargs + ) + + proxy = MethodProxy(portal) + + # mock all remote methods + for name, method in inspect.getmembers( + Client, predicate=inspect.isfunction + ): + if '_' == name[0]: + continue + setattr(proxy, name, partial(proxy._run_method, meth=name)) + + return proxy + + +@asynccontextmanager +async def maybe_spawn_brokerd( + **kwargs, +) -> tractor._portal.Portal: + async with tractor.find_actor('brokerd_ib') as portal: + if portal is None: # no broker daemon created yet + + async with tractor.open_nursery() as n: + # XXX: this needs to somehow be hidden + portal = await n.start_actor( + 'brokerd_ib', + rpc_module_paths=[__name__], + infect_asyncio=True, + ) + async with tractor.wait_for_actor( + 'brokerd_ib' + ) as portal: + yield portal + + # client code may block indefinitely so cancel when + # teardown is invoked + await n.cancel() + + +@asynccontextmanager +async def get_client( + **kwargs, +) -> Client: + """Init the ``ib_insync`` client in another actor and return + a method proxy to it. + """ + async with maybe_spawn_brokerd(**kwargs) as portal: + yield get_method_proxy(portal) + + +async def trio_stream_ticker(sym): + get_console_log('info') + + # con_es = ibis.ContFuture('ES', exchange='GLOBEX') + # es = ibis.Future('ES', '20200918', exchange='GLOBEX') + + stream = await tractor.to_asyncio.run_task( + _trio_run_client_method, + method='stream_ticker', + symbol=sym, + ) + async with aclosing(stream): + async for ticker in stream: + lft = ticker.lastFillTime + for tick_data in ticker.ticks: + value = tick_data._asdict() + now = time.time() + value['time'] = now + value['last_fill_time'] = lft + if lft: + value['latency'] = now - lft + yield value + + +async def stream_from_brokerd(sym): + + async with maybe_spawn_brokerd() as portal: + stream = await portal.run( + __name__, + 'trio_stream_ticker', + sym=sym, + ) + async for tick in stream: + print(f"trio got: {tick}") if __name__ == '__main__': + import sys - con_es = ibis.ContFuture('ES', exchange='GLOBEX') - es = ibis.Future('ES', '20200918', exchange='GLOBEX') - spy = ibis.Stock('SPY', exchange='ARCA') + sym = sys.argv[1] - # ticker = client.ib.reqTickByTickData( - # contract, - # tickType='Last', - # numberOfTicks=1, - # ) - # client.ib.reqTickByTickData( - # contract, - # tickType='AllLast', - # numberOfTicks=1, - # ) - # client.ib.reqTickByTickData( - # contract, - # tickType='BidAsk', - # numberOfTicks=1, - # ) - - # ITC (inter task comms) - from_trio = asyncio.Queue() - to_trio, from_aio = trio.open_memory_channel(float("inf")) - - async def start_ib(from_trio, to_trio): - print("starting the EYEEEEBEEEEE GATEWAYYYYYYY!") - async with get_client() as client: - - # stream ticks to trio task - def ontick(ticker: Ticker): - for t in ticker.ticks: - # send tick data to trio - to_trio.send_nowait(t) - - ticker = client.ib.reqMktData(spy, '588', False, False, None) - ticker.updateEvent += ontick - - n = await from_trio.get() - assert n == 0 - - # sleep and let the engine run - await asyncio.sleep(float('inf')) - - # TODO: cmd processing from trio - # while True: - # n = await from_trio.get() - # print(f"aio got: {n}") - # to_trio.send_nowait(n + 1) - - async def trio_main(): - print("trio_main!") - - asyncio.create_task( - start_ib(from_trio, to_trio) - ) - - from_trio.put_nowait(0) - - async for tick in from_aio: - print(f"trio got: {tick}") - - # TODO: send cmds to asyncio - # from_trio.put_nowait(n + 1) - - async def aio_main(): - loop = asyncio.get_running_loop() - - trio_done_fut = asyncio.Future() - - def trio_done_callback(main_outcome): - print(f"trio_main finished: {main_outcome!r}") - trio_done_fut.set_result(main_outcome) - - trio.lowlevel.start_guest_run( - trio_main, - run_sync_soon_threadsafe=loop.call_soon_threadsafe, - done_callback=trio_done_callback, - ) - - (await trio_done_fut).unwrap() - - asyncio.run(aio_main()) + tractor.run( + stream_from_brokerd, + sym, + # XXX: must be multiprocessing + start_method='forkserver', + loglevel='info' + ) From 450a39ce1c49f1e1829d5a292b3db8a3a00e38f5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Jul 2020 10:33:47 -0400 Subject: [PATCH 03/17] Add better contract search/lookup Add a `Client.find_contract()` which internally takes a . str as input and uses `IB.qualifyContractsAsync()` internally to try and validate the most likely contract. Make the module script call this using `asyncio.run()` for console testing. --- piker/brokers/ib.py | 123 ++++++++++++++++++++++++++++---------------- 1 file changed, 78 insertions(+), 45 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 06ab8774..b704d371 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -24,9 +24,16 @@ from ..log import get_logger, get_console_log log = get_logger(__name__) +_time_units = { + 's': ' sec', + 'm': ' mins', + 'h': ' hours', +} _time_frames = { '1s': '1 Sec', + '5s': '5 Sec', + '30s': '30 Sec', '1m': 'OneMinute', '2m': 'TwoMinutes', '3m': 'ThreeMinutes', @@ -63,33 +70,31 @@ class Client: # EST in ISO 8601 format is required... below is EPOCH start_date: str = "1970-01-01T00:00:00.000000-05:00", time_frame: str = '1m', - count: int = int(20e3), # <- max allowed per query + count: int = int(2e3), # <- max allowed per query is_paid_feed: bool = False, ) -> List[Dict[str, Any]]: """Retreive OHLCV bars for a symbol over a range to the present. """ - contract = ibis.ContFuture('ES', exchange='GLOBEX') - # contract = ibis.Stock('WEED', 'SMART', 'CAD') + contract = await self.find_contract(symbol) + # _min = min(2000*100, count) bars = await self.ib.reqHistoricalDataAsync( contract, endDateTime='', # durationStr='60 S', - durationStr='2000 S', - barSizeSetting='1 secs', + # durationStr='1 D', + durationStr='{count} S'.format(count=3000*5), + barSizeSetting='5 secs', whatToShow='TRADES', useRTH=False ) + assert bars # barSizeSetting='1 min', whatToShow='MIDPOINT', useRTH=True) # convert to pandas dataframe: df = ibis.util.df(bars) - print(df[['date', 'open', 'high', 'low', 'close', 'volume']]) + # print(df[['date', 'open', 'high', 'low', 'close', 'volume']]) from piker.ui._source import from_df - a = from_df(df) - # breakpoint() - print(a) + return from_df(df) - # TODO: reimplement this using async batch requests - # see https://github.com/erdewit/ib_insync/issues/262 async def search_stocks( self, pattern: str, @@ -134,11 +139,41 @@ class Client: ) -> Dict[str, ContractDetails]: raise NotImplementedError - def get_cont_fute( + async def get_cont_fute( self, symbol: str, + exchange: str, ) -> Contract: - raise NotImplementedError + """Get an unqualifed contract for the current "continous" future. + """ + contcon = ibis.ContFuture(symbol, exchange=exchange) + frontcon = (await self.ib.qualifyContractsAsync(contcon))[0] + return ibis.Future(conId=frontcon.conId) + + async def find_contract( + self, + symbol, + currency: str = 'USD', + **kwargs, + ) -> Contract: + # use heuristics to figure out contract "type" + sym, exch = symbol.upper().split('.') + + if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): + con = await self.get_cont_fute(symbol=sym, exchange=exch) + + elif exch == 'CMDTY': # eg. XAUSUSD.CMDTY + con = ibis.Commodity(symbol=sym) + + else: + con = ibis.Stock(symbol=sym, exchange=exch, currency=currency) + + try: + exch = 'SMART' if not exch else exch + contract = (await self.ib.qualifyContractsAsync(con))[0] + except IndexError: + raise ValueError(f"No contract could be found {con}") + return contract async def stream_ticker( self, @@ -148,8 +183,7 @@ class Client: ) -> None: """Stream a ticker using the std L1 api. """ - sym, exch = symbol.split('.') - contract = ibis.Stock(sym.upper(), exchange=exch.upper()) + contract = await self.find_contract(symbol) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) @@ -160,6 +194,8 @@ class Client: # default config ports _tws_port: int = 7497 _gw_port: int = 4002 +# list of ports to try in order +_try_ports = [_tws_port, _gw_port] @asynccontextmanager @@ -171,21 +207,15 @@ async def _aio_get_client( """Return an ``ib_insync.IB`` instance wrapped in our client API. """ ib = ibis.IB() - - if port is None: - ports = [_tws_port, _gw_port] - else: - ports = [port] - + ports = _try_ports if port is None else [port] _err = None - # try all default ports for port in ports: try: await ib.connectAsync(host, port, clientId=client_id) break except ConnectionRefusedError as ce: _err = ce - print(f'failed to connect on {port}') + log.warning(f'Failed to connect on {port}') else: raise ConnectionRefusedError(_err) @@ -198,8 +228,8 @@ async def _aio_get_client( async def _aio_run_client_method( meth: str, - to_trio, - from_trio, + to_trio=None, + from_trio=None, **kwargs, ) -> None: log.info("Connecting to the EYEEEEBEEEEE GATEWAYYYYYYY!") @@ -209,7 +239,7 @@ async def _aio_run_client_method( # handle streaming methods args = tuple(inspect.getfullargspec(async_meth).args) - if 'to_trio' in args: + if to_trio and 'to_trio' in args: kwargs['to_trio'] = to_trio return await async_meth(**kwargs) @@ -222,13 +252,13 @@ async def _trio_run_client_method( ca = tractor.current_actor() assert ca.is_infected_aio() - # if the method is an async gen stream for it + # if the method is an *async gen* stream for it meth = getattr(Client, method) if inspect.isasyncgenfunction(meth): kwargs['_treat_as_stream'] = True - # if the method is an async func but streams back results - # make sure to also stream from it + # if the method is an *async func* but manually + # streams back results, make sure to also stream it args = tuple(inspect.getfullargspec(meth).args) if 'to_trio' in args: kwargs['_treat_as_stream'] = True @@ -241,7 +271,7 @@ async def _trio_run_client_method( return result -def get_method_proxy(portal): +def get_method_proxy(portal, target): class MethodProxy: def __init__(self, portal: tractor._portal.Portal): @@ -264,7 +294,7 @@ def get_method_proxy(portal): # mock all remote methods for name, method in inspect.getmembers( - Client, predicate=inspect.isfunction + target, predicate=inspect.isfunction ): if '_' == name[0]: continue @@ -278,8 +308,11 @@ async def maybe_spawn_brokerd( **kwargs, ) -> tractor._portal.Portal: async with tractor.find_actor('brokerd_ib') as portal: - if portal is None: # no broker daemon created yet - + # WTF: why doesn't this work? + print(__name__) + if portal is not None: + yield portal + else: # no broker daemon created yet async with tractor.open_nursery() as n: # XXX: this needs to somehow be hidden portal = await n.start_actor( @@ -305,15 +338,12 @@ async def get_client( a method proxy to it. """ async with maybe_spawn_brokerd(**kwargs) as portal: - yield get_method_proxy(portal) + yield get_method_proxy(portal, Client) async def trio_stream_ticker(sym): get_console_log('info') - # con_es = ibis.ContFuture('ES', exchange='GLOBEX') - # es = ibis.Future('ES', '20200918', exchange='GLOBEX') - stream = await tractor.to_asyncio.run_task( _trio_run_client_method, method='stream_ticker', @@ -321,14 +351,18 @@ async def trio_stream_ticker(sym): ) async with aclosing(stream): async for ticker in stream: - lft = ticker.lastFillTime + # TODO: validate this value + lft = ticker.rtTime for tick_data in ticker.ticks: value = tick_data._asdict() now = time.time() value['time'] = now value['last_fill_time'] = lft if lft: + # convert from milliseconds + lft = float(lft) / 1000. value['latency'] = now - lft + yield value @@ -346,13 +380,12 @@ async def stream_from_brokerd(sym): if __name__ == '__main__': import sys - sym = sys.argv[1] - tractor.run( - stream_from_brokerd, - sym, - # XXX: must be multiprocessing - start_method='forkserver', - loglevel='info' + contract = asyncio.run( + _aio_run_client_method( + 'find_contract', + symbol=sym, + ) ) + print(contract) From 41c6517a231753e4c958144aa36a967b2867fc93 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Jul 2020 08:40:20 -0400 Subject: [PATCH 04/17] Port to new streaming api, yield whole tickers --- piker/brokers/ib.py | 152 +++++++++++++++++++++----------------------- 1 file changed, 71 insertions(+), 81 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index b704d371..e874a8a0 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -2,28 +2,35 @@ Interactive Brokers API backend. Note the client runs under an ``asyncio`` loop (since ``ib_insync`` is -built on it) and thus actor aware apis must be spawned with +built on it) and thus actor aware API calls must be spawned with ``infected_aio==True``. """ -import asyncio +from contextlib import asynccontextmanager from dataclasses import asdict from functools import partial +from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator +import asyncio import inspect -from typing import List, Dict, Any, Tuple -from contextlib import asynccontextmanager +import itertools import time -import tractor from async_generator import aclosing -import ib_insync as ibis -from ib_insync.ticker import Ticker from ib_insync.contract import Contract, ContractDetails +from ib_insync.ticker import Ticker +import ib_insync as ibis +import tractor from ..log import get_logger, get_console_log +from ..data import maybe_spawn_brokerd +from ..ui._source import from_df log = get_logger(__name__) +# passed to ``tractor.ActorNursery.start_actor()`` +_spawn_kwargs = { + 'infect_asyncio': True, +} _time_units = { 's': ' sec', 'm': ' mins', @@ -82,17 +89,15 @@ class Client: endDateTime='', # durationStr='60 S', # durationStr='1 D', - durationStr='{count} S'.format(count=3000*5), + durationStr='{count} S'.format(count=3000 * 5), barSizeSetting='5 secs', whatToShow='TRADES', useRTH=False ) + # TODO: raise underlying error here assert bars - # barSizeSetting='1 min', whatToShow='MIDPOINT', useRTH=True) # convert to pandas dataframe: df = ibis.util.df(bars) - # print(df[['date', 'open', 'high', 'low', 'close', 'volume']]) - from piker.ui._source import from_df return from_df(df) async def search_stocks( @@ -194,7 +199,6 @@ class Client: # default config ports _tws_port: int = 7497 _gw_port: int = 4002 -# list of ports to try in order _try_ports = [_tws_port, _gw_port] @@ -202,10 +206,20 @@ _try_ports = [_tws_port, _gw_port] async def _aio_get_client( host: str = '127.0.0.1', port: int = None, - client_id: int = 1, + client_id: Optional[int] = None, ) -> Client: """Return an ``ib_insync.IB`` instance wrapped in our client API. """ + if client_id is None: + # if this is a persistent brokerd, try to allocate a new id for + # each client + try: + ss = tractor.current_actor().statespace + client_id = next(ss.setdefault('client_ids', itertools.count())) + except RuntimeError: + # tractor likely isn't running + client_id = 1 + ib = ibis.IB() ports = _try_ports if port is None else [port] _err = None @@ -271,26 +285,27 @@ async def _trio_run_client_method( return result -def get_method_proxy(portal, target): +class _MethodProxy: + def __init__(self, portal: tractor._portal.Portal): + self._portal = portal - class MethodProxy: - def __init__(self, portal: tractor._portal.Portal): - self._portal = portal - - async def _run_method( - self, - *, - meth: str = None, + async def _run_method( + self, + *, + meth: str = None, + **kwargs + ) -> Any: + return await self._portal.run( + __name__, + '_trio_run_client_method', + method=meth, **kwargs - ) -> Any: - return await self._portal.run( - __name__, - '_trio_run_client_method', - method=meth, - **kwargs - ) + ) - proxy = MethodProxy(portal) + +def get_method_proxy(portal, target) -> _MethodProxy: + + proxy = _MethodProxy(portal) # mock all remote methods for name, method in inspect.getmembers( @@ -303,33 +318,6 @@ def get_method_proxy(portal, target): return proxy -@asynccontextmanager -async def maybe_spawn_brokerd( - **kwargs, -) -> tractor._portal.Portal: - async with tractor.find_actor('brokerd_ib') as portal: - # WTF: why doesn't this work? - print(__name__) - if portal is not None: - yield portal - else: # no broker daemon created yet - async with tractor.open_nursery() as n: - # XXX: this needs to somehow be hidden - portal = await n.start_actor( - 'brokerd_ib', - rpc_module_paths=[__name__], - infect_asyncio=True, - ) - async with tractor.wait_for_actor( - 'brokerd_ib' - ) as portal: - yield portal - - # client code may block indefinitely so cancel when - # teardown is invoked - await n.cancel() - - @asynccontextmanager async def get_client( **kwargs, @@ -337,45 +325,47 @@ async def get_client( """Init the ``ib_insync`` client in another actor and return a method proxy to it. """ - async with maybe_spawn_brokerd(**kwargs) as portal: + async with maybe_spawn_brokerd( + brokername='ib', + expose_mods=[__name__], + infect_asyncio=True, + **kwargs + ) as portal: yield get_method_proxy(portal, Client) -async def trio_stream_ticker(sym): +async def stream_quotes( + symbols: List[str], +) -> AsyncGenerator[str, Dict[str, Any]]: + """Stream symbol quotes. + + This is a ``trio`` callable routine meant to be invoked + once the brokerd is up. + """ get_console_log('info') stream = await tractor.to_asyncio.run_task( _trio_run_client_method, method='stream_ticker', - symbol=sym, + symbol=symbols[0], ) async with aclosing(stream): async for ticker in stream: - # TODO: validate this value - lft = ticker.rtTime - for tick_data in ticker.ticks: - value = tick_data._asdict() - now = time.time() - value['time'] = now - value['last_fill_time'] = lft - if lft: - # convert from milliseconds - lft = float(lft) / 1000. - value['latency'] = now - lft + # convert named tuples to dicts so we send usable keys + # for tick_data in ticker.ticks: + ticker.ticks = [td._asdict() for td in ticker.ticks] - yield value + data = asdict(ticker) + # add time stamps for downstream latency measurements + data['brokerd_ts'] = time.time() + if ticker.rtTime: + data['rtTime_s'] = float(ticker.rtTime) / 1000. -async def stream_from_brokerd(sym): + yield data - async with maybe_spawn_brokerd() as portal: - stream = await portal.run( - __name__, - 'trio_stream_ticker', - sym=sym, - ) - async for tick in stream: - print(f"trio got: {tick}") + # ugh, clear ticks since we've consumed them + ticker.ticks = [] if __name__ == '__main__': From 4ce99e62e086228b8db6a6d28b5f8a5eb65e84b4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 15 Jul 2020 09:54:24 -0400 Subject: [PATCH 05/17] Override annoying stuff in ib_insync --- piker/brokers/ib.py | 37 ++++++++++++++++++++++++++++++++++++- 1 file changed, 36 insertions(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index e874a8a0..5d7dd748 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -10,6 +10,7 @@ from dataclasses import asdict from functools import partial from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator import asyncio +import logging import inspect import itertools import time @@ -18,6 +19,8 @@ from async_generator import aclosing from ib_insync.contract import Contract, ContractDetails from ib_insync.ticker import Ticker import ib_insync as ibis +from ib_insync.wrapper import Wrapper +from ib_insync.client import Client as ib_Client import tractor from ..log import get_logger, get_console_log @@ -60,6 +63,38 @@ _time_frames = { } +# overrides to sidestep pretty questionable design decisions in +# ``ib_insync``: + +class NonShittyWrapper(Wrapper): + def tcpDataArrived(self): + """Override time stamps to be floats for now. + """ + self.lastTime = time.time() + for ticker in self.pendingTickers: + ticker.rtTime = None + ticker.ticks = [] + ticker.tickByTicks = [] + ticker.domTicks = [] + self.pendingTickers = set() + + +class NonShittyIB(ibis.IB): + """The beginning of overriding quite a few quetionable decisions + in this lib. + + - Don't use datetimes + - Don't use named tuples + """ + def __init__(self): + self._createEvents() + self.wrapper = NonShittyWrapper(self) + self.client = ib_Client(self.wrapper) + self.errorEvent += self._onError + self.client.apiEnd += self.disconnectedEvent + self._logger = logging.getLogger('ib_insync.ib') + + class Client: """IB wrapped for our broker backend API. @@ -220,7 +255,7 @@ async def _aio_get_client( # tractor likely isn't running client_id = 1 - ib = ibis.IB() + ib = NonShittyIB() ports = _try_ports if port is None else [port] _err = None for port in ports: From aeb58c03e25b7428e3acbd583e4d4f9dfd2942de Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 28 Jul 2020 14:44:32 -0400 Subject: [PATCH 06/17] Add startup logic to handle market closure --- piker/brokers/ib.py | 31 ++++++++++++++++++++++++++----- 1 file changed, 26 insertions(+), 5 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 5d7dd748..6e2e208c 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -70,6 +70,7 @@ class NonShittyWrapper(Wrapper): def tcpDataArrived(self): """Override time stamps to be floats for now. """ + # use a float to store epoch time instead of datetime self.lastTime = time.time() for ticker in self.pendingTickers: ticker.rtTime = None @@ -88,6 +89,7 @@ class NonShittyIB(ibis.IB): """ def __init__(self): self._createEvents() + # XXX: just to override this wrapper self.wrapper = NonShittyWrapper(self) self.client = ib_Client(self.wrapper) self.errorEvent += self._onError @@ -385,11 +387,16 @@ async def stream_quotes( symbol=symbols[0], ) async with aclosing(stream): - async for ticker in stream: - # convert named tuples to dicts so we send usable keys - # for tick_data in ticker.ticks: - ticker.ticks = [td._asdict() for td in ticker.ticks] + # first quote can be ignored as a 2nd with newer data is sent? + first_ticker = await stream.__anext__() + data = asdict(first_ticker) + log.debug(f"First ticker received {data}") + yield data + quote_cache = {} + def proc_ticker(ticker: Ticker) -> dict: + # convert named tuples to dicts so we send usable keys + ticker.ticks = [td._asdict() for td in ticker.ticks] data = asdict(ticker) # add time stamps for downstream latency measurements @@ -397,7 +404,21 @@ async def stream_quotes( if ticker.rtTime: data['rtTime_s'] = float(ticker.rtTime) / 1000. - yield data + return data + + async for ticker in stream: + # spin consuming tickers until we get a real market datum + if not ticker.rtTime: + log.debug(f"New unsent ticker: {ticker}") + continue + else: + yield proc_ticker(ticker) + log.debug("Received first real volume tick") + # XXX: this works because we don't use ``aclosing()`` above? + break + + async for ticker in stream: + yield proc_ticker(ticker) # ugh, clear ticks since we've consumed them ticker.ticks = [] From 482dc510fa6fce969c7348dd6716fedfde3a7a81 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 31 Jul 2020 00:03:17 -0400 Subject: [PATCH 07/17] Add normalization step for ticks Start a draft normalization format for (sampled) tick data. Ideally we move toward the dense tick format (DFT) enforced by techtonicDB, but for now let's just get a dict of something simple going: `{'type': 'trade', 'price': List[Dict[str, Any]]: """Retreive OHLCV bars for a symbol over a range to the present. """ + bars_kwargs = {'whatToShow': 'TRADES'} + contract = await self.find_contract(symbol) + bars_kwargs.update(getattr(contract, 'bars_kwargs', {})) + # _min = min(2000*100, count) bars = await self.ib.reqHistoricalDataAsync( contract, endDateTime='', # durationStr='60 S', # durationStr='1 D', + + # time length calcs durationStr='{count} S'.format(count=3000 * 5), barSizeSetting='5 secs', - whatToShow='TRADES', - useRTH=False + + # always use extended hours + useRTH=False, + + # restricted per contract type + **bars_kwargs, + # whatToShow='MIDPOINT', + # whatToShow='TRADES', ) - # TODO: raise underlying error here - assert bars + if not bars: + # TODO: raise underlying error here + raise ValueError(f"No bars retreived for {symbol}?") + # convert to pandas dataframe: df = ibis.util.df(bars) return from_df(df) @@ -205,8 +228,9 @@ class Client: con = await self.get_cont_fute(symbol=sym, exchange=exch) elif exch == 'CMDTY': # eg. XAUSUSD.CMDTY - con = ibis.Commodity(symbol=sym) - + con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym] + con = ibis.Commodity(**con_kwargs) + con.bars_kwargs = bars_kwargs else: con = ibis.Stock(symbol=sym, exchange=exch, currency=currency) @@ -222,12 +246,19 @@ class Client: symbol: str, to_trio, opts: Tuple[int] = ('233', '375'), + # opts: Tuple[int] = ('459',), ) -> None: """Stream a ticker using the std L1 api. """ contract = await self.find_contract(symbol) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) - ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) + # ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) + + def push(t): + log.debug(t) + to_trio.send_nowait(t) + + ticker.updateEvent.connect(push) # let the engine run and stream await self.ib.disconnectedEvent @@ -371,15 +402,52 @@ async def get_client( yield get_method_proxy(portal, Client) +def normalize( + ticker: Ticker, + calc_price: bool = False +) -> dict: + # convert named tuples to dicts so we send usable keys + new_ticks = [] + for tick in ticker.ticks: + td = tick._asdict() + + if td['tickType'] in (48, 77): + td['type'] = 'trade' + + new_ticks.append(td) + + ticker.ticks = new_ticks + + # some contracts don't have volume so we may want to + # calculate a midpoint price based on data we can acquire + # (such as bid / ask) + if calc_price: + ticker.ticks.append( + {'type': 'trade', 'price': ticker.marketPrice()} + ) + + # serialize for transport + data = asdict(ticker) + + # add time stamps for downstream latency measurements + data['brokerd_ts'] = time.time() + if ticker.rtTime: + data['rtTime_s'] = float(ticker.rtTime) / 1000. + + return data + + async def stream_quotes( symbols: List[str], + loglevel: str = None, ) -> AsyncGenerator[str, Dict[str, Any]]: """Stream symbol quotes. This is a ``trio`` callable routine meant to be invoked once the brokerd is up. """ - get_console_log('info') + # XXX: required to propagate ``tractor`` loglevel to piker logging + get_console_log(loglevel or tractor.current_actor().loglevel) stream = await tractor.to_asyncio.run_task( _trio_run_client_method, @@ -389,36 +457,36 @@ async def stream_quotes( async with aclosing(stream): # first quote can be ignored as a 2nd with newer data is sent? first_ticker = await stream.__anext__() - data = asdict(first_ticker) - log.debug(f"First ticker received {data}") - yield data - quote_cache = {} + # quote_cache = {} - def proc_ticker(ticker: Ticker) -> dict: - # convert named tuples to dicts so we send usable keys - ticker.ticks = [td._asdict() for td in ticker.ticks] - data = asdict(ticker) + if type(first_ticker.contract) not in (ibis.Commodity,): - # add time stamps for downstream latency measurements - data['brokerd_ts'] = time.time() - if ticker.rtTime: - data['rtTime_s'] = float(ticker.rtTime) / 1000. + calc_price = False # should be real volume for contract - return data + data = normalize(first_ticker) + + log.debug(f"First ticker received {data}") + yield data + + async for ticker in stream: + # spin consuming tickers until we get a real market datum + if not ticker.rtTime: + log.debug(f"New unsent ticker: {ticker}") + continue + else: + yield normalize(ticker) + log.debug("Received first real volume tick") + # XXX: this works because we don't use + # ``aclosing()`` above? + break + else: + calc_price = True async for ticker in stream: - # spin consuming tickers until we get a real market datum - if not ticker.rtTime: - log.debug(f"New unsent ticker: {ticker}") - continue - else: - yield proc_ticker(ticker) - log.debug("Received first real volume tick") - # XXX: this works because we don't use ``aclosing()`` above? - break - - async for ticker in stream: - yield proc_ticker(ticker) + yield normalize( + ticker, + calc_price=calc_price + ) # ugh, clear ticks since we've consumed them ticker.ticks = [] From 0bf265a96fb7ec83c2252d5b5c72aeb68f4cbf69 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 9 Aug 2020 00:02:04 -0400 Subject: [PATCH 08/17] Future todo --- piker/brokers/ib.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 4e4aeb6c..75ace77a 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -284,6 +284,8 @@ async def _aio_get_client( try: ss = tractor.current_actor().statespace client_id = next(ss.setdefault('client_ids', itertools.count())) + # TODO: in case the arbiter has no record + # of existing brokerd we need to broadcase for one. except RuntimeError: # tractor likely isn't running client_id = 1 From b7c924046ab2380c5985796829112e14f7f7e14a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 19 Aug 2020 07:42:49 -0400 Subject: [PATCH 09/17] Begin to use `@tractor.msg.pub` throughout streaming API Since the new FSP system will require time aligned data amongst actors, it makes sense to share broker data feeds as much as possible on a local system. There doesn't seem to be downside to this approach either since if not fanning-out in our code, the broker (server) has to do it anyway (and who knows how junk their implementation is) though with more clients, sockets etc. in memory on our end. It also preps the code for introducing a more "serious" pub-sub systems like zeromq/nanomessage. --- piker/brokers/ib.py | 47 ++++++++++++++++++++++++++++++------------ piker/data/__init__.py | 7 +++++-- 2 files changed, 39 insertions(+), 15 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 75ace77a..a5a4b254 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -8,7 +8,7 @@ built on it) and thus actor aware API calls must be spawned with from contextlib import asynccontextmanager from dataclasses import asdict from functools import partial -from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator +from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator, Callable import asyncio import logging import inspect @@ -224,10 +224,14 @@ class Client: # use heuristics to figure out contract "type" sym, exch = symbol.upper().split('.') + # TODO: metadata system for all these exchange rules.. + if exch in ('PURE',): + currency = 'CAD' + if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): con = await self.get_cont_fute(symbol=sym, exchange=exch) - elif exch == 'CMDTY': # eg. XAUSUSD.CMDTY + elif exch == 'CMDTY': # eg. XAUUSD.CMDTY con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym] con = ibis.Commodity(**con_kwargs) con.bars_kwargs = bars_kwargs @@ -356,7 +360,10 @@ async def _trio_run_client_method( class _MethodProxy: - def __init__(self, portal: tractor._portal.Portal): + def __init__( + self, + portal: tractor._portal.Portal + ) -> None: self._portal = portal async def _run_method( @@ -420,9 +427,8 @@ def normalize( ticker.ticks = new_ticks - # some contracts don't have volume so we may want to - # calculate a midpoint price based on data we can acquire - # (such as bid / ask) + # some contracts don't have volume so we may want to calculate + # a midpoint price based on data we can acquire (such as bid / ask) if calc_price: ticker.ticks.append( {'type': 'trade', 'price': ticker.marketPrice()} @@ -439,7 +445,9 @@ def normalize( return data +@tractor.msg.pub async def stream_quotes( + get_topics: Callable, symbols: List[str], loglevel: str = None, ) -> AsyncGenerator[str, Dict[str, Any]]: @@ -451,24 +459,29 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) + # TODO: support multiple subscriptions + sym = symbols[0] + stream = await tractor.to_asyncio.run_task( _trio_run_client_method, method='stream_ticker', - symbol=symbols[0], + symbol=sym, ) async with aclosing(stream): # first quote can be ignored as a 2nd with newer data is sent? first_ticker = await stream.__anext__() - # quote_cache = {} if type(first_ticker.contract) not in (ibis.Commodity,): + suffix = 'exchange' calc_price = False # should be real volume for contract - data = normalize(first_ticker) + quote = normalize(first_ticker) + log.debug(f"First ticker received {quote}") - log.debug(f"First ticker received {data}") - yield data + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} async for ticker in stream: # spin consuming tickers until we get a real market datum @@ -476,19 +489,27 @@ async def stream_quotes( log.debug(f"New unsent ticker: {ticker}") continue else: - yield normalize(ticker) log.debug("Received first real volume tick") + quote = normalize(ticker) + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} + # XXX: this works because we don't use # ``aclosing()`` above? break else: + # commodities don't have an exchange name for some reason? + suffix = 'secType' calc_price = True async for ticker in stream: - yield normalize( + quote = normalize( ticker, calc_price=calc_price ) + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + yield {topic: quote} # ugh, clear ticks since we've consumed them ticker.ticks = [] diff --git a/piker/data/__init__.py b/piker/data/__init__.py index bb8fca8b..25efe088 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -62,7 +62,6 @@ async def maybe_spawn_brokerd( dname = f'brokerd.{brokername}' async with tractor.find_actor(dname) as portal: # WTF: why doesn't this work? - log.info(f"YOYOYO {__name__}") if portal is not None: yield portal else: @@ -89,7 +88,7 @@ async def maybe_spawn_brokerd( async def open_feed( name: str, symbols: Sequence[str], - loglevel: str = 'info', + loglevel: Optional[str] = None, ) -> AsyncIterator[Dict[str, Any]]: """Open a "data feed" which provides streamed real-time quotes. """ @@ -98,6 +97,9 @@ async def open_feed( except ImportError: mod = get_ingestormod(name) + if loglevel is None: + loglevel = tractor.current_actor().loglevel + async with maybe_spawn_brokerd( mod.name, loglevel=loglevel, @@ -106,6 +108,7 @@ async def open_feed( mod.__name__, 'stream_quotes', symbols=symbols, + topics=symbols, ) # Feed is required to deliver an initial quote asap. # TODO: should we timeout and raise a more explicit error? From 103014aa588397179875bcea9cf14ae73e10908a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 26 Aug 2020 21:43:21 -0400 Subject: [PATCH 10/17] Properly teardown data feed on cancel --- piker/brokers/ib.py | 31 +++++++++++++++++++++++++------ 1 file changed, 25 insertions(+), 6 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index a5a4b254..50119c38 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -21,6 +21,7 @@ from ib_insync.ticker import Ticker import ib_insync as ibis from ib_insync.wrapper import Wrapper from ib_insync.client import Client as ib_Client +import trio import tractor from ..log import get_logger, get_console_log @@ -141,7 +142,7 @@ class Client: # durationStr='1 D', # time length calcs - durationStr='{count} S'.format(count=3000 * 5), + durationStr='{count} S'.format(count=2000 * 5), barSizeSetting='5 secs', # always use extended hours @@ -256,11 +257,19 @@ class Client: """ contract = await self.find_contract(symbol) ticker: Ticker = self.ib.reqMktData(contract, ','.join(opts)) - # ticker.updateEvent.connect(lambda t: to_trio.send_nowait(t)) def push(t): log.debug(t) - to_trio.send_nowait(t) + try: + to_trio.send_nowait(t) + except trio.BrokenResourceError: + # XXX: eventkit's ``Event.emit()`` for whatever redic + # reason will catch and ignore regular exceptions + # resulting in tracebacks spammed to console.. + # Manually do the dereg ourselves. + ticker.updateEvent.disconnect(push) + log.error(f"Disconnected stream for `{symbol}`") + self.ib.cancelMktData(contract) ticker.updateEvent.connect(push) @@ -440,16 +449,18 @@ def normalize( # add time stamps for downstream latency measurements data['brokerd_ts'] = time.time() if ticker.rtTime: - data['rtTime_s'] = float(ticker.rtTime) / 1000. + data['broker_ts'] = data['rtTime_s'] = float(ticker.rtTime) / 1000. return data -@tractor.msg.pub +# @tractor.msg.pub async def stream_quotes( - get_topics: Callable, symbols: List[str], loglevel: str = None, + # compat for @tractor.msg.pub + topics: Any = None, + get_topics: Callable = None, ) -> AsyncGenerator[str, Dict[str, Any]]: """Stream symbol quotes. @@ -483,6 +494,10 @@ async def stream_quotes( topic = '.'.join((con['symbol'], con[suffix])).lower() yield {topic: quote} + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] + async for ticker in stream: # spin consuming tickers until we get a real market datum if not ticker.rtTime: @@ -494,6 +509,10 @@ async def stream_quotes( topic = '.'.join((con['symbol'], con[suffix])).lower() yield {topic: quote} + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + ticker.ticks = [] + # XXX: this works because we don't use # ``aclosing()`` above? break From ad08cb7a66320e7e1305ff2666501f49ee117002 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 30 Aug 2020 12:31:32 -0400 Subject: [PATCH 11/17] Try to find cad stocks --- piker/brokers/ib.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 50119c38..9e306de1 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -225,18 +225,22 @@ class Client: # use heuristics to figure out contract "type" sym, exch = symbol.upper().split('.') - # TODO: metadata system for all these exchange rules.. - if exch in ('PURE',): - currency = 'CAD' - + # futes if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): con = await self.get_cont_fute(symbol=sym, exchange=exch) + # commodities elif exch == 'CMDTY': # eg. XAUUSD.CMDTY con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym] con = ibis.Commodity(**con_kwargs) con.bars_kwargs = bars_kwargs + + # stonks else: + # TODO: metadata system for all these exchange rules.. + if exch in ('PURE', 'TSE'): # non-yankee + currency = 'CAD' + con = ibis.Stock(symbol=sym, exchange=exch, currency=currency) try: From b499631d62435fc9c44887b06ddd80406d1888c3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Sep 2020 12:46:30 -0400 Subject: [PATCH 12/17] Drop to 1k bars on init load --- piker/brokers/ib.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 9e306de1..3764a849 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -142,7 +142,7 @@ class Client: # durationStr='1 D', # time length calcs - durationStr='{count} S'.format(count=2000 * 5), + durationStr='{count} S'.format(count=1000 * 5), barSizeSetting='5 secs', # always use extended hours @@ -458,6 +458,8 @@ def normalize( return data +# TODO: figure out how to share quote feeds sanely despite +# the wacky ``ib_insync`` api. # @tractor.msg.pub async def stream_quotes( symbols: List[str], From aad9cb2dd01dead09be71d719ee1f7eb10b2a0f0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 7 Sep 2020 16:57:46 -0400 Subject: [PATCH 13/17] Support forex pair lookup on ib --- piker/brokers/ib.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 3764a849..a6da1123 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -104,6 +104,7 @@ _adhoc_cmdty_data_map = { # NOTE: cmdtys don't have trade data: # https://groups.io/g/twsapi/message/44174 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), + 'XAUUSD': ({'conId': 69067924}, {'whatToShow': 'MIDPOINT'}), } @@ -142,7 +143,7 @@ class Client: # durationStr='1 D', # time length calcs - durationStr='{count} S'.format(count=1000 * 5), + durationStr='{count} S'.format(count=3000 * 5), barSizeSetting='5 secs', # always use extended hours @@ -223,12 +224,25 @@ class Client: **kwargs, ) -> Contract: # use heuristics to figure out contract "type" - sym, exch = symbol.upper().split('.') + try: + sym, exch = symbol.upper().rsplit('.', maxsplit=1) + except ValueError: + # likely there's an embedded `.` for a forex pair + await tractor.breakpoint() # futes if exch in ('GLOBEX', 'NYMEX', 'CME', 'CMECRYPTO'): con = await self.get_cont_fute(symbol=sym, exchange=exch) + elif exch in ('FOREX'): + currency = '' + symbol, currency = sym.split('/') + con = ibis.Forex( + symbol=symbol, + currency=currency, + ) + con.bars_kwargs = {'whatToShow': 'MIDPOINT'} + # commodities elif exch == 'CMDTY': # eg. XAUUSD.CMDTY con_kwargs, bars_kwargs = _adhoc_cmdty_data_map[sym] @@ -488,7 +502,7 @@ async def stream_quotes( # first quote can be ignored as a 2nd with newer data is sent? first_ticker = await stream.__anext__() - if type(first_ticker.contract) not in (ibis.Commodity,): + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): suffix = 'exchange' calc_price = False # should be real volume for contract From 5bb11826f37e220bf6f8f69b63a9a37749d7c385 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Sep 2020 09:58:41 -0400 Subject: [PATCH 14/17] Drop unmarketable trades for now --- piker/brokers/ib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index a6da1123..fe68aa01 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -268,7 +268,7 @@ class Client: self, symbol: str, to_trio, - opts: Tuple[int] = ('233', '375'), + opts: Tuple[int] = ('375',), # '233', ), # opts: Tuple[int] = ('459',), ) -> None: """Stream a ticker using the std L1 api. From 2f8737af6a81cd6a75205b4e782573488750b00e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 17 Sep 2020 14:14:52 -0400 Subject: [PATCH 15/17] Fix PURE contracts lookup... --- piker/brokers/ib.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index fe68aa01..67c3e799 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -252,11 +252,21 @@ class Client: # stonks else: # TODO: metadata system for all these exchange rules.. + primaryExchange = '' + if exch in ('PURE', 'TSE'): # non-yankee currency = 'CAD' + if exch in ('PURE',): + # stupid ib... + exch = 'SMART' + primaryExchange = 'PURE' - con = ibis.Stock(symbol=sym, exchange=exch, currency=currency) - + con = ibis.Stock( + symbol=sym, + exchange=exch, + primaryExchange=primaryExchange, + currency=currency, + ) try: exch = 'SMART' if not exch else exch contract = (await self.ib.qualifyContractsAsync(con))[0] From a526008a95479c04db81203151ea49a4773a7d37 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 26 Sep 2020 14:17:00 -0400 Subject: [PATCH 16/17] Add github actions CI; thanks @guilledk! --- .github/workflows/ci.yml | 21 +++++++++++++++++++++ 1 file changed, 21 insertions(+) create mode 100644 .github/workflows/ci.yml diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 00000000..fa7b7c6a --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,21 @@ +name: CI + +on: push + +jobs: + mypy: + name: 'pip install' + runs-on: ubuntu-latest + steps: + - name: Checkout + uses: actions/checkout@v2 + with: + ref: chart_hacking + - name: Setup python + uses: actions/setup-python@v2 + with: + python-version: '3.8' + - name: Install dependencies + run: pip install -e . --upgrade-strategy eager -r requirements.txt + - name: Run piker + run: piker From 934de1d40a8089ee89e1b8d75ddb6f51e73e79af Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 2 Oct 2020 10:39:56 -0400 Subject: [PATCH 17/17] Switch to asyncio support branch in tractor --- requirements.txt | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 requirements.txt diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 00000000..18ec5994 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,3 @@ +# no pypi package for tractor (yet) +# we require the asyncio-via-guest-mode dev branch +-e git+git://github.com/goodboy/tractor.git@infect_asyncio#egg=tractor