From c0d1facf3be870697eb6e495e8b7357b5f718539 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 18 Mar 2022 15:05:32 -0400 Subject: [PATCH] Append broker name to symbols before quotes broadcast in sampler task --- piker/data/_sampling.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 4264a06f..1e65bf29 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -184,6 +184,7 @@ async def sample_and_broadcast( bus: '_FeedsBus', # noqa shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, + brokername: str, sum_tick_vlm: bool = True, ) -> None: @@ -193,8 +194,7 @@ async def sample_and_broadcast( # iterate stream delivered by broker async for quotes in quote_stream: # TODO: ``numba`` this! - for sym, quote in quotes.items(): - + for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that # will require at least some way to prevent task switching @@ -258,7 +258,13 @@ async def sample_and_broadcast( # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. - subs = bus._subscribers[sym.lower()] + subs = bus._subscribers[broker_symbol.lower()] + + # NOTE: by default the broker backend doesn't append + # it's own "name" into the fqsn schema (but maybe it + # should?) so we have to manually generate the correct + # key here. + bsym = f'{broker_symbol}.{brokername}' lags = 0 for (stream, tick_throttle) in subs: @@ -269,7 +275,9 @@ async def sample_and_broadcast( # this is a send mem chan that likely # pushes to the ``uniform_rate_send()`` below. try: - stream.send_nowait((sym, quote)) + stream.send_nowait( + (bsym, quote) + ) except trio.WouldBlock: ctx = getattr(stream, '_ctx', None) if ctx: @@ -283,7 +291,9 @@ async def sample_and_broadcast( f'feed @ {tick_throttle} Hz' ) else: - await stream.send({sym: quote}) + await stream.send( + {bsym: quote} + ) if cs.cancelled_caught: lags += 1