diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 8fda1dec..cb525a13 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -23,7 +23,7 @@ built on it) and thus actor aware API calls must be spawned with """ from contextlib import asynccontextmanager as acm -from dataclasses import asdict +from dataclasses import asdict, astuple from datetime import datetime from functools import partial import itertools @@ -41,6 +41,7 @@ import platform from random import randint import time + import trio from trio_typing import TaskStatus import tractor @@ -60,7 +61,7 @@ import numpy as np from .. import config from ..log import get_logger, get_console_log -from ..data._source import from_df +from ..data._source import base_ohlc_dtype from ..data._sharedmem import ShmArray from ._util import SymbolNotFound, NoData from ..clearing._messages import ( @@ -229,6 +230,28 @@ _exch_skip_list = { _enters = 0 +def bars_to_np(bars: list) -> np.ndarray: + ''' + Convert a "bars list thing" (``BarsList`` type from ibis) + into a numpy struct array. + + ''' + # TODO: maybe rewrite this faster with ``numba`` + np_ready = [] + for bardata in bars: + ts = bardata.date.timestamp() + t = astuple(bardata)[:7] + np_ready.append((ts, ) + t[1:7]) + + nparr = np.array( + np_ready, + dtype=base_ohlc_dtype, + ) + assert nparr['time'][0] == bars[0].date.timestamp() + assert nparr['time'][-1] == bars[-1].date.timestamp() + return nparr + + class Client: ''' IB wrapped for our broker backend API. @@ -255,6 +278,7 @@ class Client: async def bars( self, fqsn: str, + # EST in ISO 8601 format is required... below is EPOCH start_dt: Union[datetime, str] = "1970-01-01T00:00:00.000000-05:00", end_dt: Union[datetime, str] = "", @@ -262,7 +286,6 @@ class Client: sample_period_s: str = 1, # ohlc sample period period_count: int = int(2e3), # <- max per 1s sample query - is_paid_feed: bool = False, # placeholder ) -> list[dict[str, Any]]: ''' Retreive OHLCV bars for a fqsn over a range to the present. @@ -313,10 +336,8 @@ class Client: # TODO: raise underlying error here raise ValueError(f"No bars retreived for {fqsn}?") - # TODO: rewrite this faster with ``numba`` - # convert to pandas dataframe: - df = ibis.util.df(bars) - return bars, from_df(df) + nparr = bars_to_np(bars) + return bars, nparr async def con_deats( self, @@ -1214,7 +1235,6 @@ async def open_client_proxy() -> MethodProxy: code = getattr(err, 'code', None) if code: msg = err.message - # await tractor.breakpoint() # TODO: retreive underlying ``ib_insync`` error? if ( @@ -1362,7 +1382,9 @@ async def get_bars( proxy: MethodProxy, fqsn: str, - end_dt: str = "", + + # blank to start which tells ib to look up the latest datum + end_dt: str = '', ) -> (dict, np.ndarray): ''' @@ -1370,87 +1392,83 @@ async def get_bars( a ``MethoProxy``. ''' - _err: Optional[Exception] = None + import pendulum + fails = 0 bars: Optional[list] = None - - async def get(): - - bars, bars_array = await proxy.bars( - fqsn=fqsn, - end_dt=end_dt, - ) - if bars_array is None: - raise SymbolNotFound(fqsn) - - next_dt = bars[0].date - log.info(f'ib datetime {next_dt}') - - return (bars, bars_array, next_dt), fails - in_throttle: bool = False + first_dt: datetime = None + last_dt: datetime = None + + if end_dt: + last_dt = pendulum.from_timestamp(end_dt.timestamp()) for _ in range(10): try: - return await get() + bars, bars_array = await proxy.bars( + fqsn=fqsn, + end_dt=end_dt, + ) + + if bars_array is None: + raise SymbolNotFound(fqsn) + + first_dt = pendulum.from_timestamp( + bars[0].date.timestamp()) + + last_dt = pendulum.from_timestamp( + bars[-1].date.timestamp()) + + time = bars_array['time'] + assert time[-1] == last_dt.timestamp() + assert time[0] == first_dt.timestamp() + log.info(f'bars retreived for dts {first_dt}:{last_dt}') + + return (bars, bars_array, first_dt, last_dt), fails except RequestError as err: + msg = err.message # why do we always need to rebind this? - _err = err + # _err = err - # TODO: retreive underlying ``ib_insync`` error? - if err.code == 162: + if 'No market data permissions for' in msg: + # TODO: signalling for no permissions searches + raise NoData(f'Symbol: {fqsn}') + break - # TODO: so this error is normally raised (it seems) if - # we try to retrieve history for a time range for which - # there is none. in that case we should not only report - # the "empty range" but also do a iteration on the time - # step for ``next_dt`` to see if we can pull older - # history. - if 'HMDS query returned no data' in err.message: - # means we hit some kind of historical "empty space" - # and further requests will need to decrement the - # start time dt in order to not receive a further - # error? - # OLDER: seem to always cause throttling despite low rps + elif ( + err.code == 162 + and 'HMDS query returned no data' in err.message + ): + # try to decrement start point and look further back + end_dt = last_dt = last_dt.subtract(seconds=2000) + log.warning( + f'No data found ending @ {end_dt}\n' + f'Starting another request for {end_dt}' + ) - # TODO: if there is not bars returned from the first - # query we need to manually calculate the next step - # back and convert to an expected datetime format. - # if not bars: - # raise + continue - # try to decrement start point and look further back - next_dt = bars[0].date - log.info(f'ib datetime {next_dt}') - continue + else: + log.exception( + "Data query rate reached: Press `ctrl-alt-f`" + "in TWS" + ) - elif 'No market data permissions for' in err.message: + # TODO: should probably create some alert on screen + # and then somehow get that to trigger an event here + # that restarts/resumes this task? + if not in_throttle: + await tractor.breakpoint() - # TODO: signalling for no permissions searches - raise NoData(f'Symbol: {fqsn}') - break + # TODO: wait on data con reset event + # then begin backfilling again. + # await proxy.wait_for_data() - else: - log.exception( - "Data query rate reached: Press `ctrl-alt-f`" - "in TWS" - ) - print(_err) + in_throttle = True + fails += 1 + continue - # TODO: should probably create some alert on screen - # and then somehow get that to trigger an event here - # that restarts/resumes this task? - if not in_throttle: - await tractor.breakpoint() - - # TODO: wait on data con reset event - # then begin backfilling again. - # await proxy.wait_for_data() - - in_throttle = True - fails += 1 - continue return None, None # else: # throttle wasn't fixed so error out immediately @@ -1480,14 +1498,14 @@ async def open_history_client( log.error(f"Can't grab bars starting at {end_dt}!?!?") raise NoData(f'{end_dt}') - bars, bars_array, next_dt = out + bars, bars_array, first_dt, last_dt = out # volume cleaning since there's -ve entries, # wood luv to know what crookery that is.. vlm = bars_array['volume'] vlm[vlm < 0] = 0 - return bars_array, next_dt + return bars_array, first_dt, last_dt yield get_hist @@ -1503,7 +1521,7 @@ async def backfill_bars( # case the shm size will be driven by user config and available sys # memory. # count: int = 120, - count: int = 22, + count: int = 36, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -1515,6 +1533,9 @@ async def backfill_bars( https://github.com/pikers/piker/issues/128 ''' + # last_dt1 = None + last_dt = None + with trio.CancelScope() as cs: # async with open_history_client(fqsn) as proxy: @@ -1530,9 +1551,10 @@ async def backfill_bars( if out is None: raise RuntimeError("Could not pull currrent history?!") - (first_bars, bars_array, next_dt) = out + (first_bars, bars_array, first_dt, last_dt) = out vlm = bars_array['volume'] vlm[vlm < 0] = 0 + last_dt = first_dt # write historical data to buffer shm.push(bars_array) @@ -1542,7 +1564,7 @@ async def backfill_bars( i = 0 while i < count: - out, fails = await get_bars(proxy, fqsn, end_dt=next_dt) + out, fails = await get_bars(proxy, fqsn, end_dt=first_dt) if fails is None or fails > 1: break @@ -1551,10 +1573,12 @@ async def backfill_bars( # could be trying to retreive bars over weekend # TODO: add logic here to handle tradable hours and # only grab valid bars in the range - log.error(f"Can't grab bars starting at {next_dt}!?!?") + log.error(f"Can't grab bars starting at {first_dt}!?!?") continue - bars, bars_array, next_dt = out + (first_bars, bars_array, first_dt, last_dt) = out + # last_dt1 = last_dt + # last_dt = first_dt # volume cleaning since there's -ve entries, # wood luv to know what crookery that is.. @@ -1787,7 +1811,7 @@ async def stream_quotes( # TODO: we should instead spawn a task that waits on a feed to start # and let it wait indefinitely..instead of this hard coded stuff. - with trio.move_on_after(6): + with trio.move_on_after(1): contract, first_ticker, details = await _trio_run_client_method( method='get_quote', symbol=sym,