From e9dfd28aace1bea5076b066f07dcfe10e93c76f5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 3 Aug 2023 16:56:33 -0400 Subject: [PATCH] ib: add back `src/dst` parsing for fiat pairs --- piker/brokers/ib/api.py | 12 +++++---- piker/brokers/ib/feed.py | 53 ++++++++++++++++++++++++++++++++-------- 2 files changed, 50 insertions(+), 15 deletions(-) diff --git a/piker/brokers/ib/api.py b/piker/brokers/ib/api.py index a85d989f..7ab827c3 100644 --- a/piker/brokers/ib/api.py +++ b/piker/brokers/ib/api.py @@ -650,12 +650,14 @@ class Client: exch in {'IDEALPRO'} or sectype == 'CASH' ): - # if '/' in symbol: - # currency = '' - # symbol, currency = symbol.split('/') + pair: str = symbol + if '/' in symbol: + src, dst = symbol.split('/') + pair: str = ''.join([src, dst]) + con = Forex( - pair=''.join((symbol, currency)), - currency=currency, + pair=pair, + currency='', ) con.bars_kwargs = {'whatToShow': 'MIDPOINT'} diff --git a/piker/brokers/ib/feed.py b/piker/brokers/ib/feed.py index 8a6ac949..72f51964 100644 --- a/piker/brokers/ib/feed.py +++ b/piker/brokers/ib/feed.py @@ -30,6 +30,7 @@ import time from typing import ( Any, Callable, + TYPE_CHECKING, ) from async_generator import aclosing @@ -65,6 +66,9 @@ from .api import ( from ._util import data_reset_hack from .symbols import get_mkt_info +if TYPE_CHECKING: + from trio._core._run import Task + # XXX NOTE: See available types table docs: # https://interactivebrokers.github.io/tws-api/tick_types.html @@ -308,7 +312,7 @@ async def wait_on_data_reset( return False -_data_resetter_task: trio.Task | None = None +_data_resetter_task: Task | None = None _failed_resets: int = 0 @@ -334,7 +338,15 @@ async def get_bars( task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, -) -> (dict, np.ndarray): +) -> tuple[ + tuple[ # result tuple + ibis.objects.BarDataList, + np.ndarray, + datetime, + datetime, + ] | None, + bool, # timed out hint +]: ''' Retrieve historical data from a ``trio``-side task using a ``MethoProxy``. @@ -420,7 +432,12 @@ async def get_bars( if data_cs: data_cs.cancel() - result = (bars, bars_array, first_dt, last_dt) + result = ( + bars, # ib native + bars_array, # numpy + first_dt, + last_dt, + ) # signal data reset loop parent task result_ready.set() @@ -428,7 +445,7 @@ async def get_bars( return result except RequestError as err: - msg = err.message + msg: str = err.message if 'No market data permissions for' in msg: # TODO: signalling for no permissions searches @@ -466,21 +483,29 @@ async def get_bars( nodatas_count += 1 continue - elif 'API historical data query cancelled' in err.message: + elif ( + 'API historical data query cancelled' + in + err.message + ): log.warning( 'Query cancelled by IB (:eyeroll:):\n' f'{err.message}' ) continue + elif ( 'Trading TWS session is connected from a different IP' - in err.message + in + err.message ): log.warning("ignoring ip address warning") continue # XXX: more or less same as above timeout case - elif _pacing in msg: + elif ( + _pacing in msg + ): log.warning( 'History throttle rate reached!\n' 'Resetting farms with `ctrl-alt-f` hack\n' @@ -532,9 +557,10 @@ async def get_bars( # don't double invoke the reset hack if another # requester task already has it covered. continue + else: _data_resetter_task = trio.lowlevel.current_task() - unset_resetter = True + unset_resetter: bool = True # spawn new data reset task data_cs, reset_done = await nurse.start( @@ -547,9 +573,16 @@ async def get_bars( # sync wait on reset to complete await reset_done.wait() - _data_resetter_task = None if unset_resetter else _data_resetter_task + _data_resetter_task = ( + None + if unset_resetter + else _data_resetter_task + ) assert result - return result, data_cs is not None + return ( + result, + data_cs is not None, + ) _quote_streams: dict[str, trio.abc.ReceiveStream] = {}