From eb8c9e1a99d27e9eb230e40510b6b579544b5704 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 23 Dec 2018 20:48:06 -0500 Subject: [PATCH] Symbol subs must be cid specific --- piker/brokers/data.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/piker/brokers/data.py b/piker/brokers/data.py index b076ef9c..f5cff4a8 100644 --- a/piker/brokers/data.py +++ b/piker/brokers/data.py @@ -159,17 +159,16 @@ async def fan_out_to_chans( ): chan_payloads = {} for quote in quotes: - # set symbol quotes for each subscriber - # for chan, cid in symbols2chans.get(quote['key'], set()): - for chan, cid in symbols2chans[quote['key']]: + packet = {quote['symbol']: quote} + for chan, cid in symbols2chans.get(quote['key'], set()): chan_payloads.setdefault( - chan, + (chan, cid), {'yield': {}, 'cid': cid} - )['yield'].update({quote['symbol']: quote}) + )['yield'].update(packet) # deliver to each subscriber (fan out) if chan_payloads: - for chan, payload in chan_payloads.items(): + for (chan, cid), payload in chan_payloads.items(): try: await chan.send(payload) except ( @@ -269,7 +268,7 @@ def modify_quote_stream(broker, feed_type, symbols, chan, cid): chanset = symbols2chans.get(ticker) # XXX: cid will be different on unsub call for item in chanset.copy(): - if chan in item: + if (chan, cid) == item: chanset.discard(item) if not chanset: