From 7396624be0e7cb5a636f2f9e93896927e2e3d2c2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 28 Sep 2022 13:18:15 -0400 Subject: [PATCH] Rework history frame request concurrency Manual tinker-testing demonstrated that triggering data resets completely independent of the frame request gets more throughput and further, that repeated requests (for the same frame after cancelling on the `trio`-side) can yield duplicate frame responses. Re-work the dual-task structure to instead have one task wait indefinitely on the frame response (and thus not trigger duplicate frames) and the 2nd data reset task poll for the first task to complete in a poll loop which terminates when the frame arrives via an event. Dirty deatz: - make `get_bars()` take an optional timeout (which will eventually be dynamically passed from the history mgmt machinery) and move request logic inside a new `query()` closure meant to be spawned in a task which sets an event on frame arrival, add data reset poll loop in the main/parent task, deliver result on nursery completion. - handle frame request cancelled event case without crash. - on no-frame result (due to real history gap) hack in a 1 day decrement case which we need to eventually allow the caller to control likely based on measured frame rx latency. - make `wait_on_data_reset()` a predicate without output indicating reset success as well as `trio.Nursery.start()` compat so that it can be started in a new task with the started values yielded being a cancel scope and completion event. - drop the legacy `backfill_bars()`, not longer used. --- piker/brokers/ib/feed.py | 314 +++++++++++++++++---------------------- 1 file changed, 136 insertions(+), 178 deletions(-) diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 5117962f..6bef877c 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -22,6 +22,7 @@ import asyncio from contextlib import asynccontextmanager as acm from dataclasses import asdict from datetime import datetime +from functools import partial from math import isnan import time from typing import ( @@ -38,7 +39,6 @@ import tractor import trio from trio_typing import TaskStatus -from piker.data._sharedmem import ShmArray from .._util import SymbolNotFound, NoData from .api import ( # _adhoc_futes_set, @@ -111,6 +111,15 @@ async def open_history_client( that takes in ``pendulum.datetime`` and returns ``numpy`` arrays. ''' + # TODO: + # - add logic to handle tradable hours and only grab + # valid bars in the range? + # - we want to avoid overrunning the underlying shm array buffer and + # we should probably calc the number of calls to make depending on + # that until we have the `marketstore` daemon in place in which case + # the shm size will be driven by user config and available sys + # memory. + async with open_data_client() as proxy: async def get_hist( @@ -120,21 +129,19 @@ async def open_history_client( ) -> tuple[np.ndarray, str]: - out, fails = await get_bars( + out = await get_bars( proxy, symbol, timeframe, end_dt=end_dt, ) - # TODO: add logic here to handle tradable hours and only grab - # valid bars in the range if out is None: # could be trying to retreive bars over weekend log.error(f"Can't grab bars starting at {end_dt}!?!?") raise NoData( f'{end_dt}', - frame_size=2000, + # frame_size=2000, ) bars, bars_array, first_dt, last_dt = out @@ -162,11 +169,16 @@ _pacing: str = ( async def wait_on_data_reset( proxy: MethodProxy, - tries: int = 2, + reset_type: str = 'data', timeout: float = 16, - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, -): + task_status: TaskStatus[ + tuple[ + trio.CancelScope, + trio.Event, + ] + ] = trio.TASK_STATUS_IGNORED, +) -> bool: # TODO: we might have to put a task lock around this # method.. @@ -186,59 +198,43 @@ async def wait_on_data_reset( # try to wait on the reset event(s) to arrive, a timeout # will trigger a retry up to 6 times (for now). - # try 3 time with a data reset then fail over to - # a connection reset. - for i in range(1, tries): + done = trio.Event() + with trio.move_on_after(timeout) as cs: + + task_status.started((cs, done)) log.warning('Sending DATA RESET request') - await data_reset_hack(reset_type='data') + res = await data_reset_hack(reset_type=reset_type) - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - task_status.started(cs) - await ev.wait() - log.info(f"{name} DATA RESET") - break - - if ( - cs.cancelled_caught - and not cs.cancel_called - ): - log.warning( - f'Data reset {name} timeout, retrying {i}.' - ) - continue - else: - - log.warning('Sending CONNECTION RESET') - res = await data_reset_hack(reset_type='connection') if not res: log.warning( 'NO VNC DETECTED!\n' 'Manually press ctrl-alt-f on your IB java app' ) + done.set() + return False - with trio.move_on_after(timeout) as cs: - for name, ev in [ - # TODO: not sure if waiting on other events - # is all that useful here or not. in theory - # you could wait on one of the ones above - # first to verify the reset request was - # sent? - ('history', hist_ev), - ]: - await ev.wait() - log.info(f"{name} DATA RESET") + # TODO: not sure if waiting on other events + # is all that useful here or not. + # - in theory you could wait on one of the ones above first + # to verify the reset request was sent? + # - we need the same for real-time quote feeds which can + # sometimes flake out and stop delivering.. + for name, ev in [ + ('history', hist_ev), + ]: + await ev.wait() + log.info(f"{name} DATA RESET") + done.set() + return True - if cs.cancelled_caught: - log.warning('Data CONNECTION RESET timeout!?') + if cs.cancel_called: + log.warning( + 'Data reset task canceled?' + ) + + done.set() + return False async def get_bars( @@ -249,6 +245,7 @@ async def get_bars( # blank to start which tells ib to look up the latest datum end_dt: str = '', + timeout: float = 1.5, # how long before we trigger a feed reset task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, @@ -258,52 +255,44 @@ async def get_bars( a ``MethoProxy``. ''' - fails = 0 - bars: Optional[list] = None - first_dt: datetime = None - last_dt: datetime = None + data_cs: Optional[trio.CancelScope] = None + result: Optional[tuple[ + ibis.objects.BarDataList, + np.ndarray, + datetime, + datetime, + ]] = None + result_ready = trio.Event() - if end_dt: - last_dt = pendulum.from_timestamp(end_dt.timestamp()) - - timeout: float = float('inf') - async with trio.open_nursery() as nurse: - for _ in range(10): + async def query(): + nonlocal result, data_cs, end_dt + while True: try: - out = None - with trio.move_on_after(timeout) as cs: - out = await proxy.bars( - fqsn=fqsn, - end_dt=end_dt, - sample_period_s=timeframe, - ) - timeout = 3 + out = await proxy.bars( + fqsn=fqsn, + end_dt=end_dt, + sample_period_s=timeframe, - if ( - cs.cancelled_caught - and out is None - ): - print(f"RESETTING DATA after {timeout}") - await nurse.start( - wait_on_data_reset, - proxy, - timeout=float('inf'), - tries=100, - ) - # scale timeout up exponentially to avoid - # request-overruning the history farm. - # timeout *= 2 - continue - - if out: - bars, bars_array = out - - else: + # ideally we cancel the request just before we + # cancel on the ``trio``-side and trigger a data + # reset hack.. the problem is there's no way (with + # current impl) to detect a cancel case. + # timeout=timeout, + ) + if out is None: raise NoData( f'{end_dt}', # frame_size=2000, ) + bars, bars_array = out + + if not bars: + # TODO: duration lookup for this + end_dt = end_dt.subtract(days=1) + print("SUBTRACTING DAY") + continue + if bars_array is None: raise SymbolNotFound(fqsn) @@ -317,10 +306,18 @@ async def get_bars( assert time[-1] == last_dt.timestamp() assert time[0] == first_dt.timestamp() log.info( - f'{len(bars)} bars retreived for {first_dt} -> {last_dt}' + f'{len(bars)} bars retreived {first_dt} -> {last_dt}' ) - return (bars, bars_array, first_dt, last_dt), fails + if data_cs: + data_cs.cancel() + + result = (bars, bars_array, first_dt, last_dt) + + # signal data reset loop parent task + result_ready.set() + + return result except RequestError as err: msg = err.message @@ -345,14 +342,20 @@ async def get_bars( ) # try to decrement start point and look further back - # end_dt = last_dt = last_dt.subtract(seconds=2000) + # end_dt = end_dt.subtract(seconds=2000) + end_dt = end_dt.subtract(days=1) + print("SUBTRACTING DAY") + continue - raise NoData( - f'Symbol: {fqsn}', - # TODO: fix this since we don't strictly use 1s - # ohlc any more XD - frame_size=2000, + elif ( + err.code == 162 and + 'API historical data query cancelled' in err.message + ): + log.warning( + 'Query cancelled by IB (:eyeroll:):\n' + f'{err.message}' ) + continue # elif ( # err.code == 162 and @@ -362,103 +365,58 @@ async def get_bars( # log.warning("ignoring ip address warning") # continue + # XXX: more or less same as above timeout case elif _pacing in msg: log.warning( 'History throttle rate reached!\n' 'Resetting farms with `ctrl-alt-f` hack\n' ) - await wait_on_data_reset(proxy) + + # cancel any existing reset task + if data_cs: + data_cs.cancel() + + # spawn new data reset task + data_cs, reset_done = await nurse.start( + partial( + wait_on_data_reset, + proxy, + timeout=float('inf'), + reset_type='connection' + ) + ) + continue else: raise - return None, None - # else: # throttle wasn't fixed so error out immediately - # raise _err + async with trio.open_nursery() as nurse: + # start history request that we allow + # to run indefinitely until a result is acquired + nurse.start_soon(query) -async def backfill_bars( + # start history reset loop which waits up to the timeout + # for a result before triggering a data feed reset. + while not result_ready.is_set(): - fqsn: str, - shm: ShmArray, # type: ignore # noqa - timeframe: float = 1, # in seconds + with trio.move_on_after(timeout): + await result_ready.wait() + continue - # TODO: we want to avoid overrunning the underlying shm array buffer - # and we should probably calc the number of calls to make depending - # on that until we have the `marketstore` daemon in place in which - # case the shm size will be driven by user config and available sys - # memory. - count: int = 16, - - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, - -) -> None: - ''' - Fill historical bars into shared mem / storage afap. - - TODO: avoid pacing constraints: - https://github.com/pikers/piker/issues/128 - - ''' - # last_dt1 = None - last_dt = None - - with trio.CancelScope() as cs: - - async with open_data_client() as proxy: - - out, fails = await get_bars(proxy, fqsn, timeframe) - - if out is None: - raise RuntimeError("Could not pull currrent history?!") - - (first_bars, bars_array, first_dt, last_dt) = out - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - last_dt = first_dt - - # write historical data to buffer - shm.push(bars_array) - - task_status.started(cs) - - i = 0 - while i < count: - - out, fails = await get_bars( + # spawn new data reset task + data_cs, reset_done = await nurse.start( + partial( + wait_on_data_reset, proxy, - fqsn, - timeframe, - end_dt=first_dt, + timeout=float('inf'), + # timeout=timeout, ) + ) + # sync wait on reset to complete + await reset_done.wait() - if out is None: - # could be trying to retreive bars over weekend - # TODO: add logic here to handle tradable hours and - # only grab valid bars in the range - log.error(f"Can't grab bars starting at {first_dt}!?!?") - - # XXX: get_bars() should internally decrement dt by - # 2k seconds and try again. - continue - - (first_bars, bars_array, first_dt, last_dt) = out - # last_dt1 = last_dt - # last_dt = first_dt - - # volume cleaning since there's -ve entries, - # wood luv to know what crookery that is.. - vlm = bars_array['volume'] - vlm[vlm < 0] = 0 - - # TODO we should probably dig into forums to see what peeps - # think this data "means" and then use it as an indicator of - # sorts? dinkus has mentioned that $vlms for the day dont' - # match other platforms nor the summary stat tws shows in - # the monitor - it's probably worth investigating. - - shm.push(bars_array, prepend=True) - i += 1 + return result asset_type_map = {