From 57a35a3c6cb44edbbb41e081768f1e45ed844e47 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 14 Jun 2021 10:55:01 -0400 Subject: [PATCH] Port feed bus endpoint to a `@tractor.context` --- piker/clearing/_ems.py | 2 +- piker/data/_sampling.py | 6 +- piker/data/feed.py | 155 +++++++++++++++++----------------------- 3 files changed, 68 insertions(+), 95 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 2378465d..9b1c3246 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -724,7 +724,7 @@ async def _emsd_main( _router.feeds[(broker, symbol)] = feed # XXX: this should be initial price quote from target provider - first_quote = await feed.receive() + first_quote = feed.first_quote # open a stream with the brokerd backend for order # flow dialogue diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 566f2b07..2aab0ecf 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -229,10 +229,10 @@ async def sample_and_broadcast( # thus other consumers still attached. subs = bus._subscribers[sym.lower()] - for ctx in subs: + for stream in subs: # print(f'sub is {ctx.chan.uid}') try: - await ctx.send_yield({sym: quote}) + await stream.send({sym: quote}) except ( trio.BrokenResourceError, trio.ClosedResourceError @@ -241,4 +241,4 @@ async def sample_and_broadcast( # if it's done in the fee bus code? # so far seems like no since this should all # be single-threaded. - log.error(f'{ctx.chan.uid} dropped connection') + log.error(f'{stream._ctx.chan.uid} dropped connection') diff --git a/piker/data/feed.py b/piker/data/feed.py index 4a41500d..689b8f93 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -149,7 +149,6 @@ async def _setup_persistent_brokerd( async def allocate_persistent_feed( - ctx: tractor.Context, bus: _FeedsBus, brokername: str, symbol: str, @@ -240,7 +239,7 @@ async def allocate_persistent_feed( await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) -@tractor.stream +@tractor.context async def attach_feed_bus( ctx: tractor.Context, @@ -260,10 +259,11 @@ async def attach_feed_bus( assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) - sub_only: bool = False entry = bus.feeds.get(symbol) + bus._subscribers.setdefault(symbol, []) + # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery @@ -272,7 +272,7 @@ async def attach_feed_bus( init_msg, first_quote = await bus.nursery.start( partial( allocate_persistent_feed, - ctx=ctx, + bus=bus, brokername=brokername, @@ -284,29 +284,24 @@ async def attach_feed_bus( loglevel=loglevel, ) ) - bus._subscribers.setdefault(symbol, []).append(ctx) assert isinstance(bus.feeds[symbol], tuple) - else: - sub_only = True - # XXX: ``first_quote`` may be outdated here if this is secondary # subscriber cs, init_msg, first_quote = bus.feeds[symbol] # send this even to subscribers to existing feed? - await ctx.send_yield(init_msg) + # deliver initial info message a first quote asap + await ctx.started((init_msg, first_quote)) - # deliver a first quote asap - await ctx.send_yield(first_quote) + async with ctx.open_stream() as stream: - if sub_only: - bus._subscribers[symbol].append(ctx) + bus._subscribers[symbol].append(stream) - try: - await trio.sleep_forever() - finally: - bus._subscribers[symbol].remove(ctx) + try: + await trio.sleep_forever() + finally: + bus._subscribers[symbol].remove(stream) @dataclass @@ -322,6 +317,7 @@ class Feed: stream: AsyncIterator[Dict[str, Any]] shm: ShmArray mod: ModuleType + first_quote: dict _brokerd_portal: tractor._portal.Portal _index_stream: Optional[AsyncIterator[int]] = None @@ -357,36 +353,6 @@ class Feed: else: yield self._index_stream - @asynccontextmanager - async def receive_trades_data(self) -> AsyncIterator[dict]: - - if not getattr(self.mod, 'stream_trades', False): - log.warning( - f"{self.mod.name} doesn't have trade data support yet :(") - - if not self._trade_stream: - raise RuntimeError( - f'Can not stream trade data from {self.mod.name}') - - # NOTE: this can be faked by setting a rx chan - # using the ``_.set_fake_trades_stream()`` method - if self._trade_stream is None: - - async with self._brokerd_portal.open_stream_from( - - self.mod.stream_trades, - - # do we need this? -> yes - # the broker side must declare this key - # in messages, though we could probably use - # more then one? - topics=['local_trades'], - ) as self._trade_stream: - - yield self._trade_stream - else: - yield self._trade_stream - def sym_to_shm_key( broker: str, @@ -463,63 +429,70 @@ async def open_feed( # no feed for broker exists so maybe spawn a data brokerd - async with maybe_spawn_brokerd( - brokername, - loglevel=loglevel - ) as portal: + async with ( - async with portal.open_stream_from( + maybe_spawn_brokerd( + brokername, + loglevel=loglevel + ) as portal, + + portal.open_context( attach_feed_bus, brokername=brokername, symbol=sym, loglevel=loglevel - ) as stream: + ) as (ctx, (init_msg, first_quote)), - # TODO: can we make this work better with the proposed - # context based bidirectional streaming style api proposed in: - # https://github.com/goodboy/tractor/issues/53 - init_msg = await stream.receive() + ctx.open_stream() as stream, + ): - # we can only read from shm - shm = attach_shm_array( - token=init_msg[sym]['shm_token'], - readonly=True, + # TODO: can we make this work better with the proposed + # context based bidirectional streaming style api proposed in: + # https://github.com/goodboy/tractor/issues/53 + # init_msg = await stream.receive() + + # we can only read from shm + shm = attach_shm_array( + token=init_msg[sym]['shm_token'], + readonly=True, + ) + + feed = Feed( + name=brokername, + stream=stream, + shm=shm, + mod=mod, + first_quote=first_quote, + _brokerd_portal=portal, + ) + ohlc_sample_rates = [] + + for sym, data in init_msg.items(): + + si = data['symbol_info'] + ohlc_sample_rates.append(data['sample_rate']) + + symbol = Symbol( + key=sym, + type_key=si.get('asset_type', 'forex'), + tick_size=si.get('price_tick_size', 0.01), + lot_tick_size=si.get('lot_tick_size', 0.0), ) + symbol.broker_info[brokername] = si - feed = Feed( - name=brokername, - stream=stream, - shm=shm, - mod=mod, - _brokerd_portal=portal, - ) - ohlc_sample_rates = [] + feed.symbols[sym] = symbol - for sym, data in init_msg.items(): + # cast shm dtype to list... can't member why we need this + shm_token = data['shm_token'] - si = data['symbol_info'] - ohlc_sample_rates.append(data['sample_rate']) + # XXX: msgspec won't relay through the tuples XD + shm_token['dtype_descr'] = list( + map(tuple, shm_token['dtype_descr'])) - symbol = Symbol( - key=sym, - type_key=si.get('asset_type', 'forex'), - tick_size=si.get('price_tick_size', 0.01), - lot_tick_size=si.get('lot_tick_size', 0.0), - ) - symbol.broker_info[brokername] = si + assert shm_token == shm.token # sanity - feed.symbols[sym] = symbol + feed._max_sample_rate = max(ohlc_sample_rates) - # cast shm dtype to list... can't member why we need this - shm_token = data['shm_token'] - - # XXX: msgspec won't relay through the tuples XD - shm_token['dtype_descr'] = list(map(tuple, shm_token['dtype_descr'])) - - assert shm_token == shm.token # sanity - - feed._max_sample_rate = max(ohlc_sample_rates) - - yield feed + yield feed