From 72c4a4366bd07858a0909e74cf717c03d7065fbf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 1 Mar 2021 12:01:48 -0500 Subject: [PATCH] Tag TWS trade events --- piker/brokers/ib.py | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 1577f0f6..3ed40876 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -908,11 +908,10 @@ async def stream_quotes( # TODO: support multiple subscriptions sym = symbols[0] - async with trio.open_nursery() as n: - contract, first_ticker, details = await _trio_run_client_method( - method='get_quote', - symbol=sym, - ) + contract, first_ticker, details = await _trio_run_client_method( + method='get_quote', + symbol=sym, + ) stream = await _trio_run_client_method( method='stream_ticker', @@ -986,7 +985,6 @@ async def stream_quotes( # for "traditional" assets, volume is normally discreet, not a float syminfo['lot_tick_size'] = 0.0 - # TODO: for loop through all symbols passed in init_msgs = { # pass back token, and bool, signalling if we're the writer @@ -1202,7 +1200,8 @@ async def stream_trades( # supposedly IB server fill time 'broker_time': execu.time, # converted to float by us - 'time': fill.time, # ns from main TCP handler by us inside ``ib_insync`` override + # ns from main TCP handler by us inside ``ib_insync`` override + 'time': fill.time, 'time_ns': time.time_ns(), # cuz why not 'action': action_map[execu.side], 'size': execu.shares, @@ -1220,7 +1219,13 @@ async def stream_trades( if msg['reqid'] == -1: log.error(pformat(msg)) - # don't forward, it's pointless.. - continue + # don't forward, it's pointless.. + continue + + if msg['reqid'] < -1: + # it's a trade event generated by TWS usage. + log.warning(f"TWS triggered trade:\n{pformat(msg)}") + + msg['reqid'] = 'tws-' + msg['reqid'] yield {'local_trades': (event_name, msg)}