diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index c0f47d8e..2079eb71 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -227,7 +227,7 @@ async def sample_and_broadcast( # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. - subs = bus.subscribers[sym] + subs = bus._subscribers[sym] for ctx in subs: # print(f'sub is {ctx.chan.uid}') try: diff --git a/piker/data/feed.py b/piker/data/feed.py index 9455f7ae..5d39c7c4 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -67,11 +67,19 @@ class _FeedsBus(BaseModel): brokername: str nursery: trio.Nursery feeds: Dict[str, trio.CancelScope] = {} - subscribers: Dict[str, List[tractor.Context]] = {} + task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() + # XXX: so weird but, apparently without this being `._` private + # pydantic will complain about private `tractor.Context` instance + # vars (namely `._portal` and `._cancel_scope`) at import time. + # Reported this bug: + # https://github.com/samuelcolvin/pydantic/issues/2816 + _subscribers: Dict[str, List[tractor.Context]] = {} + class Config: arbitrary_types_allowed = True + underscore_attrs_are_private = False async def cancel_all(self) -> None: for sym, (cs, msg, quote) in self.feeds.items(): @@ -256,7 +264,7 @@ async def attach_feed_bus( loglevel=loglevel, ) ) - bus.subscribers.setdefault(symbol, []).append(ctx) + bus._subscribers.setdefault(symbol, []).append(ctx) else: sub_only = True @@ -269,12 +277,12 @@ async def attach_feed_bus( await ctx.send_yield(first_quote) if sub_only: - bus.subscribers[symbol].append(ctx) + bus._subscribers[symbol].append(ctx) try: await trio.sleep_forever() finally: - bus.subscribers[symbol].remove(ctx) + bus._subscribers[symbol].remove(ctx) @dataclass