From 8832804babdba0687ad2539b57aa6da3840700ee Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 25 Sep 2020 15:19:33 -0400 Subject: [PATCH] Sub each new symbol to shm incrementing --- piker/brokers/ib.py | 26 ++++++++++++++++++-------- 1 file changed, 18 insertions(+), 8 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 77a1018c..607d2e32 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -8,7 +8,7 @@ built on it) and thus actor aware API calls must be spawned with from contextlib import asynccontextmanager from dataclasses import asdict from functools import partial -from typing import List, Dict, Any, Tuple, Optional, AsyncGenerator, Callable +from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable import asyncio import logging import inspect @@ -29,7 +29,8 @@ from ..data import ( maybe_spawn_brokerd, iterticks, attach_shm_array, - get_shm_token + get_shm_token, + subscribe_ohlc_for_increment, ) from ..ui._source import from_df @@ -147,7 +148,7 @@ class Client: # durationStr='1 D', # time length calcs - durationStr='{count} S'.format(count=1000 * 5), + durationStr='{count} S'.format(count=100 * 5), barSizeSetting='5 secs', # always use extended hours @@ -494,14 +495,16 @@ def normalize( # TODO: figure out how to share quote feeds sanely despite # the wacky ``ib_insync`` api. # @tractor.msg.pub +@tractor.stream async def stream_quotes( + ctx: tractor.Context, symbols: List[str], shm_token: Tuple[str, str, List[tuple]], loglevel: str = None, # compat for @tractor.msg.pub topics: Any = None, get_topics: Callable = None, -) -> AsyncGenerator[str, Dict[str, Any]]: +) -> AsyncIterator[Dict[str, Any]]: """Stream symbol quotes. This is a ``trio`` callable routine meant to be invoked @@ -539,8 +542,12 @@ async def stream_quotes( shm.push(bars) shm_token = shm.token + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + subscribe_ohlc_for_increment(shm, delay_s) + # pass back token, and bool, signalling if we're the writer - yield shm_token, not writer_exists + await ctx.send_yield((shm_token, not writer_exists)) # first quote can be ignored as a 2nd with newer data is sent? first_ticker = await stream.__anext__() @@ -581,9 +588,11 @@ async def stream_quotes( topic = '.'.join((con['symbol'], con[suffix])).lower() first_quote = {topic: quote} ticker.ticks = [] - # yield first quote asap - yield first_quote + # yield first quote asap + await ctx.send_yield(first_quote) + + # real-time stream async for ticker in stream: quote = normalize( ticker, @@ -614,7 +623,8 @@ async def stream_quotes( con = quote['contract'] topic = '.'.join((con['symbol'], con[suffix])).lower() - yield {topic: quote} + + await ctx.send_yield({topic: quote}) # ugh, clear ticks since we've consumed them ticker.ticks = []