From db92683ede4dd6cac310b2b416c4fd73ea37d1c8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jun 2021 14:19:55 -0400 Subject: [PATCH] Port ib orders to new msgs and bidir streaming api --- piker/brokers/ib.py | 270 +++++++++++++++++++++++++++++++------------- 1 file changed, 193 insertions(+), 77 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 915278c3..321cc4cd 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -25,7 +25,7 @@ from contextlib import asynccontextmanager from dataclasses import asdict from datetime import datetime from functools import partial -from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable +from typing import List, Dict, Any, Tuple, Optional, AsyncIterator import asyncio from pprint import pformat import inspect @@ -39,7 +39,8 @@ import tractor from async_generator import aclosing from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails, Option -from ib_insync.order import Order +from ib_insync.order import Order, Trade, OrderStatus +from ib_insync.objects import Fill, Execution from ib_insync.ticker import Ticker from ib_insync.objects import Position import ib_insync as ibis @@ -53,6 +54,12 @@ from .._daemon import maybe_spawn_brokerd from ..data._source import from_df from ..data._sharedmem import ShmArray from ._util import SymbolNotFound, NoData +from ..clearing._messages import ( + BrokerdOrder, BrokerdOrderAck, BrokerdStatus, + BrokerdPosition, BrokerdCancel, + BrokerdFill, + # BrokerdError, +) log = get_logger(__name__) @@ -472,7 +479,7 @@ class Client: # XXX: by default 0 tells ``ib_insync`` methods that there is no # existing order so ask the client to create a new one (which it # seems to do by allocating an int counter - collision prone..) - brid: int = None, + reqid: int = None, ) -> int: """Place an order and return integer request id provided by client. @@ -488,7 +495,7 @@ class Client: trade = self.ib.placeOrder( contract, Order( - orderId=brid or 0, # stupid api devs.. + orderId=reqid or 0, # stupid api devs.. action=action.upper(), # BUY/SELL orderType='LMT', lmtPrice=price, @@ -582,6 +589,7 @@ class Client: self, to_trio: trio.abc.SendChannel, ) -> None: + # connect error msgs def push_err( reqId: int, @@ -589,13 +597,16 @@ class Client: errorString: str, contract: Contract, ) -> None: + log.error(errorString) + try: to_trio.send_nowait(( 'error', + # error "object" {'reqid': reqId, - 'message': errorString, + 'reason': errorString, 'contract': contract} )) except trio.BrokenResourceError: @@ -635,6 +646,8 @@ async def _aio_get_client( """Return an ``ib_insync.IB`` instance wrapped in our client API. Client instances are cached for later use. + + TODO: consider doing this with a ctx mngr eventually? """ # first check cache for existing client @@ -848,7 +861,7 @@ async def get_bars( end_dt: str = "", ) -> (dict, np.ndarray): - _err = None + _err: Optional[Exception] = None fails = 0 for _ in range(2): @@ -885,12 +898,12 @@ async def get_bars( raise NoData(f'Symbol: {sym}') break - else: log.exception( "Data query rate reached: Press `ctrl-alt-f`" "in TWS" ) + print(_err) # TODO: should probably create some alert on screen # and then somehow get that to trigger an event here @@ -937,7 +950,7 @@ async def backfill_bars( if fails is None or fails > 1: break - if out is (None, None): + if out == (None, None): # could be trying to retreive bars over weekend # TODO: add logic here to handle tradable hours and only grab # valid bars in the range @@ -1188,114 +1201,217 @@ def pack_position(pos: Position) -> Dict[str, Any]: else: symbol = con.symbol - return { - 'broker': 'ib', - 'account': pos.account, - 'symbol': symbol, - 'currency': con.currency, - 'size': float(pos.position), - 'avg_price': float(pos.avgCost) / float(con.multiplier or 1.0), - } + return BrokerdPosition( + broker='ib', + account=pos.account, + symbol=symbol, + currency=con.currency, + size=float(pos.position), + avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + ) -@tractor.msg.pub( - send_on_connect={'local_trades': 'start'} -) -async def stream_trades( +async def handle_order_requests( + ems_order_stream: tractor.MsgStream, + +) -> None: + + # request_msg: dict + async for request_msg in ems_order_stream: + log.info(f'Received order request {request_msg}') + + action = request_msg['action'] + + if action in {'buy', 'sell'}: + # validate + order = BrokerdOrder(**request_msg) + + # call our client api to submit the order + reqid = await _trio_run_client_method( + + method='submit_limit', + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + + # XXX: by default 0 tells ``ib_insync`` methods that + # there is no existing order so ask the client to create + # a new one (which it seems to do by allocating an int + # counter - collision prone..) + reqid=order.reqid, + ) + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + # ems order request id + oid=order.oid, + # broker specific request id + reqid=reqid, + time_ns=time.time_ns(), + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + + await _trio_run_client_method( + method='submit_cancel', + reqid=msg.reqid + ) + + else: + log.error(f'Unknown order command: {request_msg}') + + +@tractor.context +async def trades_dialogue( + + ctx: tractor.Context, loglevel: str = None, - get_topics: Callable = None, ) -> AsyncIterator[Dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - stream = await _trio_run_client_method( + ib_trade_events_stream = await _trio_run_client_method( method='recv_trade_updates', ) # deliver positions to subscriber before anything else positions = await _trio_run_client_method(method='positions') + + all_positions = {} + for pos in positions: - yield {'local_trades': ('position', pack_position(pos))} + msg = pack_position(pos) + all_positions[msg.symbol] = msg.dict() + + await ctx.started(all_positions) action_map = {'BOT': 'buy', 'SLD': 'sell'} - async for event_name, item in stream: + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + # start order request handler **before** local trades event loop + n.start_soon(handle_order_requests, ems_stream) - # XXX: begin normalization of nonsense ib_insync internal - # object-state tracking representations... + async for event_name, item in ib_trade_events_stream: - if event_name == 'status': + # XXX: begin normalization of nonsense ib_insync internal + # object-state tracking representations... - # unwrap needed data from ib_insync internal objects - trade = item - status = trade.orderStatus + if event_name == 'status': - # skip duplicate filled updates - we get the deats - # from the execution details event - msg = { - 'reqid': trade.order.orderId, - 'status': status.status, - 'filled': status.filled, - 'reason': status.whyHeld, + # unwrap needed data from ib_insync internal types + trade: Trade = item + status: OrderStatus = trade.orderStatus - # this seems to not be necessarily up to date in the - # execDetails event.. so we have to send it here I guess? - 'remaining': status.remaining, - } + # skip duplicate filled updates - we get the deats + # from the execution details event + msg = BrokerdStatus( - elif event_name == 'fill': + reqid=trade.order.orderId, + time_ns=time.time_ns(), # cuz why not + status=status.status.lower(), # force lower case - trade, fill = item - execu = fill.execution + filled=status.filled, + reason=status.whyHeld, + # this seems to not be necessarily up to date in the + # execDetails event.. so we have to send it here I guess? + remaining=status.remaining, - msg = { - 'reqid': execu.orderId, - 'execid': execu.execId, + broker_details={'name': 'ib'}, + ) + + elif event_name == 'fill': + + # for wtv reason this is a separate event type + # from IB, not sure why it's needed other then for extra + # complexity and over-engineering :eyeroll:. + # we may just end up dropping these events (or + # translating them to ``Status`` msgs) if we can + # show the equivalent status events are no more latent. + + # unpack ib_insync types + # pep-0526 style: + # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations + trade: Trade + fill: Fill + trade, fill = item + execu: Execution = fill.execution + + # TODO: normalize out commissions details? + details = { + 'contract': asdict(fill.contract), + 'execution': asdict(fill.execution), + 'commissions': asdict(fill.commissionReport), + } # supposedly IB server fill time - 'broker_time': execu.time, # converted to float by us - # ns from main TCP handler by us inside ``ib_insync`` override - 'time': fill.time, - 'time_ns': time.time_ns(), # cuz why not - 'action': action_map[execu.side], - 'size': execu.shares, - 'price': execu.price, - } + details['broker_time'] = execu.time + details['name'] = 'ib' - elif event_name == 'error': - msg = item + msg = BrokerdFill( + # should match the value returned from `.submit_limit()` + reqid=execu.orderId, + time_ns=time.time_ns(), # cuz why not - # f$#$% gawd dammit insync.. - con = msg['contract'] - if isinstance(con, Contract): - msg['contract'] = asdict(con) + action=action_map[execu.side], + size=execu.shares, + price=execu.price, - if msg['reqid'] == -1: - log.error(pformat(msg)) + broker_details=details, + # XXX: required by order mode currently + broker_time=details['execution']['time'], - # don't forward, it's pointless.. - continue + ) - elif event_name == 'position': - msg = pack_position(item) + elif event_name == 'error': - if msg.get('reqid', 0) < -1: - # it's a trade event generated by TWS usage. - log.warning(f"TWS triggered trade:\n{pformat(msg)}") + err: dict = item - msg['reqid'] = 'tws-' + str(-1 * msg['reqid']) + # f$#$% gawd dammit insync.. + con = err['contract'] + if isinstance(con, Contract): + err['contract'] = asdict(con) - # mark msg as from "external system" - # TODO: probably something better then this.. - msg['external'] = True + if err['reqid'] == -1: + log.error(f'TWS external order error:\n{pformat(err)}') - yield {'remote_trades': (event_name, msg)} - continue + # don't forward for now, it's unecessary.. but if we wanted to, + # msg = BrokerdError(**err) + continue - yield {'local_trades': (event_name, msg)} + elif event_name == 'position': + msg = pack_position(item) + # msg = BrokerdPosition(**item) + + # if msg.get('reqid', 0) < -1: + if getattr(msg, 'reqid', 0) < -1: + + # it's a trade event generated by TWS usage. + log.warning(f"TWS triggered trade:\n{pformat(msg)}") + + msg['reqid'] = 'tws-' + str(-1 * msg['reqid']) + + # mark msg as from "external system" + # TODO: probably something better then this.. and start + # considering multiplayer/group trades tracking + msg['external'] = True + continue + + # XXX: we always serialize to a dict for msgpack + # translations, ideally we can move to an msgspec (or other) + # encoder # that can be enabled in ``tractor`` ahead of + # time so we can pass through the message types directly. + await ems_stream.send(msg.dict()) @tractor.context