diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 583e2509..8ff2b0e2 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -373,7 +373,9 @@ async def open_brokerd_trades_dialogue( broker = feed.mod.name # TODO: make a `tractor` bug/test for this! - # portal = feed._brokerd_portal + # if only i could member what the problem was.. + # probably some GC of the portal thing? + # portal = feed.portal # XXX: we must have our own portal + channel otherwise # when the data feed closes it may result in a half-closed diff --git a/piker/data/feed.py b/piker/data/feed.py index 9bfe95a9..bf003b9a 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -393,18 +393,23 @@ class Feed: shm: ShmArray mod: ModuleType first_quotes: dict # symbol names to first quote dicts - stream: trio.abc.ReceiveChannel[dict[str, Any]] - _brokerd_portal: tractor._portal.Portal + _portal: tractor.Portal + + stream: trio.abc.ReceiveChannel[dict[str, Any]] + throttle_rate: Optional[int] = None + _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None _max_sample_rate: int = 0 - search: Callable[..., Awaitable] = None - # cache of symbol info messages received as first message when # a stream startsc. symbols: dict[str, Symbol] = field(default_factory=dict) + @property + def portal(self) -> tractor.Portal: + return self._portal + async def receive(self) -> dict: return await self.stream.receive() @@ -418,7 +423,7 @@ class Feed: delay_s = delay_s or self._max_sample_rate async with open_sample_step_stream( - self._brokerd_portal, + self.portal, delay_s, ) as istream: yield istream @@ -526,7 +531,8 @@ async def open_feed( mod=mod, first_quotes=first_quotes, stream=stream, - _brokerd_portal=portal, + _portal=portal, + throttle_rate=tick_throttle, ) ohlc_sample_rates = []