diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 2b4fd4fb..6073d036 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -426,12 +426,15 @@ class Client: """ contract = await self.find_contract(symbol) + + details_fute = self.ib.reqContractDetailsAsync(contract) ticker: Ticker = self.ib.reqMktData( contract, snapshot=True, ) ticker = await ticker.updateEvent - return contract, ticker + details = (await details_fute)[0] + return contract, ticker, details # async to be consistent for the client proxy, and cuz why not. async def submit_limit( @@ -440,7 +443,7 @@ class Client: symbol: str, price: float, action: str, - size: int = 100, + size: int, ) -> int: """Place an order and return integer request id provided by client. @@ -870,6 +873,7 @@ async def stream_quotes( symbols: List[str], shm_token: Tuple[str, str, List[tuple]], loglevel: str = None, + # compat for @tractor.msg.pub topics: Any = None, get_topics: Callable = None, @@ -885,10 +889,11 @@ async def stream_quotes( # TODO: support multiple subscriptions sym = symbols[0] - contract, first_ticker = await _trio_run_client_method( - method='get_quote', - symbol=sym, - ) + async with trio.open_nursery() as n: + contract, first_ticker, details = await _trio_run_client_method( + method='get_quote', + symbol=sym, + ) stream = await _trio_run_client_method( method='stream_ticker', @@ -896,8 +901,8 @@ async def stream_quotes( symbol=sym, ) - async with aclosing(stream): - + shm = None + async with trio.open_nursery() as ln: # check if a writer already is alive in a streaming task, # otherwise start one and mark it as now existing @@ -908,86 +913,100 @@ async def stream_quotes( # maybe load historical ohlcv in to shared mem # check if shm has already been created by previous # feed initialization - async with trio.open_nursery() as ln: - if not writer_already_exists: - _local_buffer_writers[key] = True + if not writer_already_exists: + _local_buffer_writers[key] = True - shm = attach_shm_array( - token=shm_token, + shm = attach_shm_array( + token=shm_token, - # we are the buffer writer - readonly=False, - ) + # we are the buffer writer + readonly=False, + ) - # async def retrieve_and_push(): - start = time.time() + # async def retrieve_and_push(): + start = time.time() - bars, bars_array = await _trio_run_client_method( - method='bars', - symbol=sym, + bars, bars_array = await _trio_run_client_method( + method='bars', + symbol=sym, - ) + ) - log.info(f"bars_array request: {time.time() - start}") + log.info(f"bars_array request: {time.time() - start}") - if bars_array is None: - raise SymbolNotFound(sym) + if bars_array is None: + raise SymbolNotFound(sym) - # write historical data to buffer - shm.push(bars_array) - shm_token = shm.token + # write historical data to buffer + shm.push(bars_array) + shm_token = shm.token - # TODO: generalize this for other brokers - # start bar filler task in bg - ln.start_soon(fill_bars, sym, bars, shm) + # TODO: generalize this for other brokers + # start bar filler task in bg + ln.start_soon(fill_bars, sym, bars, shm) - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] - subscribe_ohlc_for_increment(shm, delay_s) + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + subscribe_ohlc_for_increment(shm, delay_s) + # pass back some symbol info like min_tick, trading_hours, etc. + # con = asdict(contract) + # syminfo = contract + symdeats = asdict(details) + symdeats.update(symdeats['contract']) + + # TODO: for loop through all symbols passed in + init_msgs = { # pass back token, and bool, signalling if we're the writer # and that history has been written - await ctx.send_yield((shm_token, not writer_already_exists)) + sym: { + 'is_shm_writer': not writer_already_exists, + 'shm_token': shm_token, + 'symbol_info': symdeats, + } + } + await ctx.send_yield(init_msgs) - # check for special contract types - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' - # should be real volume for this contract - calc_price = False - else: - # commodities and forex don't have an exchange name and - # no real volume so we have to calculate the price - suffix = 'secType' - calc_price = True - # ticker = first_ticker + # check for special contract types + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + suffix = 'exchange' + # should be real volume for this contract + calc_price = False + else: + # commodities and forex don't have an exchange name and + # no real volume so we have to calculate the price + suffix = 'secType' + calc_price = True + # ticker = first_ticker - # pass first quote asap - quote = normalize(first_ticker, calc_price=calc_price) - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic + # pass first quote asap + quote = normalize(first_ticker, calc_price=calc_price) + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic - first_quote = {topic: quote} + first_quote = {topic: quote} - # yield first quote asap - await ctx.send_yield(first_quote) + # yield first quote asap + await ctx.send_yield(first_quote) - # ticker.ticks = [] + # ticker.ticks = [] - # ugh, clear ticks since we've consumed them - # (ahem, ib_insync is stateful trash) - first_ticker.ticks = [] + # ugh, clear ticks since we've consumed them + # (ahem, ib_insync is stateful trash) + first_ticker.ticks = [] - log.debug(f"First ticker received {quote}") + log.debug(f"First ticker received {quote}") - if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): - suffix = 'exchange' + if type(first_ticker.contract) not in (ibis.Commodity, ibis.Forex): + suffix = 'exchange' - calc_price = False # should be real volume for contract + calc_price = False # should be real volume for contract - # with trio.move_on_after(10) as cs: - # wait for real volume on feed (trading might be closed) + # with trio.move_on_after(10) as cs: + # wait for real volume on feed (trading might be closed) + async with aclosing(stream): async for ticker in stream: # for a real volume contract we rait for the first @@ -1009,76 +1028,105 @@ async def stream_quotes( # ``aclosing()`` above? break - # if cs.cancelled_caught: - # await tractor.breakpoint() - - # real-time stream - async for ticker in stream: - - # print(ticker.vwap) - quote = normalize( - ticker, - calc_price=calc_price + # enter stream loop + try: + await stream_and_write( + stream=stream, + calc_price=calc_price, + topic=topic, + writer_already_exists=writer_already_exists, + shm=shm, + suffix=suffix, + ctx=ctx, ) - quote['symbol'] = topic - # TODO: in theory you can send the IPC msg *before* - # writing to the sharedmem array to decrease latency, - # however, that will require `tractor.msg.pub` support - # here or at least some way to prevent task switching - # at the yield such that the array write isn't delayed - # while another consumer is serviced.. - - # if we are the lone tick writer start writing - # the buffer with appropriate trade data + finally: if not writer_already_exists: - for tick in iterticks(quote, types=('trade', 'utrade',)): - last = tick['price'] - - # print(f"{quote['symbol']}: {tick}") - - # update last entry - # benchmarked in the 4-5 us range - o, high, low, v = shm.array[-1][ - ['open', 'high', 'low', 'volume'] - ] - - new_v = tick.get('size', 0) - - if v == 0 and new_v: - # no trades for this bar yet so the open - # is also the close/last trade price - o = last - - shm.array[[ - 'open', - 'high', - 'low', - 'close', - 'volume', - ]][-1] = ( - o, - max(high, last), - min(low, last), - last, - v + new_v, - ) - - con = quote['contract'] - topic = '.'.join((con['symbol'], con[suffix])).lower() - quote['symbol'] = topic - - await ctx.send_yield({topic: quote}) - - # ugh, clear ticks since we've consumed them - ticker.ticks = [] + _local_buffer_writers[key] = False -@tractor.msg.pub +async def stream_and_write( + stream, + calc_price: bool, + topic: str, + writer_already_exists: bool, + suffix: str, + ctx: tractor.Context, + shm: Optional['SharedArray'], # noqa +) -> None: + """Core quote streaming and shm writing loop; optimize for speed! + + """ + # real-time stream + async for ticker in stream: + + # print(ticker.vwap) + quote = normalize( + ticker, + calc_price=calc_price + ) + quote['symbol'] = topic + # TODO: in theory you can send the IPC msg *before* + # writing to the sharedmem array to decrease latency, + # however, that will require `tractor.msg.pub` support + # here or at least some way to prevent task switching + # at the yield such that the array write isn't delayed + # while another consumer is serviced.. + + # if we are the lone tick writer start writing + # the buffer with appropriate trade data + if not writer_already_exists: + for tick in iterticks(quote, types=('trade', 'utrade',)): + last = tick['price'] + + # print(f"{quote['symbol']}: {tick}") + + # update last entry + # benchmarked in the 4-5 us range + o, high, low, v = shm.array[-1][ + ['open', 'high', 'low', 'volume'] + ] + + new_v = tick.get('size', 0) + + if v == 0 and new_v: + # no trades for this bar yet so the open + # is also the close/last trade price + o = last + + shm.array[[ + 'open', + 'high', + 'low', + 'close', + 'volume', + ]][-1] = ( + o, + max(high, last), + min(low, last), + last, + v + new_v, + ) + + con = quote['contract'] + topic = '.'.join((con['symbol'], con[suffix])).lower() + quote['symbol'] = topic + + await ctx.send_yield({topic: quote}) + + # ugh, clear ticks since we've consumed them + ticker.ticks = [] + + +@tractor.msg.pub( + send_on_connect={'local_trades': 'start'} +) async def stream_trades( loglevel: str = None, get_topics: Callable = None, ) -> AsyncIterator[Dict[str, Any]]: + global _trades_stream_is_live + # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) @@ -1086,9 +1134,6 @@ async def stream_trades( method='recv_trade_updates', ) - # startup msg - yield {'local_trades': 'start'} - async for event_name, item in stream: # XXX: begin normalization of nonsense ib_insync internal