From 4aaf5a1f8bd2531219fb8c8061b62d349c7d946a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 24 Mar 2022 13:29:07 -0400 Subject: [PATCH] Drop sampler consumers that overrun 6x --- piker/data/_sampling.py | 15 +++++++++++++-- 1 file changed, 13 insertions(+), 2 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 1e65bf29..4228f809 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -20,6 +20,7 @@ financial data flows. """ from __future__ import annotations +from collections import Counter import time from typing import TYPE_CHECKING @@ -191,6 +192,8 @@ async def sample_and_broadcast( log.info("Started shared mem bar writer") + overruns = Counter() + # iterate stream delivered by broker async for quotes in quote_stream: # TODO: ``numba`` this! @@ -265,8 +268,8 @@ async def sample_and_broadcast( # should?) so we have to manually generate the correct # key here. bsym = f'{broker_symbol}.{brokername}' + lags: int = 0 - lags = 0 for (stream, tick_throttle) in subs: try: @@ -286,10 +289,18 @@ async def sample_and_broadcast( f'{ctx.channel.uid} !!!' ) else: + key = id(stream) + overruns[key] += 1 log.warning( f'Feed overrun {bus.brokername} -> ' f'feed @ {tick_throttle} Hz' ) + if overruns[key] > 6: + log.warning( + f'Dropping consumer {stream}' + ) + await stream.aclose() + raise trio.BrokenResourceError else: await stream.send( {bsym: quote} @@ -312,7 +323,7 @@ async def sample_and_broadcast( '`brokerd`-quotes-feed connection' ) if tick_throttle: - assert stream.closed() + assert stream._closed # XXX: do we need to deregister here # if it's done in the fee bus code?