From 8ed48add1885b70f624db5eddd58e62e26a77415 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 4 Jan 2023 22:57:26 -0500 Subject: [PATCH] Drop `Flume.index_stream()`, `._sampling.open_sample_stream()` replaces it --- piker/data/flows.py | 42 ------------------------------------------ 1 file changed, 42 deletions(-) diff --git a/piker/data/flows.py b/piker/data/flows.py index 5958993b..9d8b3103 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -22,17 +22,11 @@ real-time data processing data-structures. """ from __future__ import annotations -from contextlib import asynccontextmanager as acm -from functools import partial from typing import ( - AsyncIterator, TYPE_CHECKING, ) import tractor -from tractor.trionics import ( - maybe_open_context, -) import pendulum import numpy as np @@ -45,9 +39,6 @@ from ._sharedmem import ( ShmArray, _Token, ) -from ._sampling import ( - iter_ohlc_periods, -) # from .._profile import ( # Profiler, # pg_profile_enabled, @@ -151,39 +142,6 @@ class Flume(Struct): async def receive(self) -> dict: return await self.stream.receive() - @acm - async def index_stream( - self, - delay_s: int = 1, - - ) -> AsyncIterator[int]: - - if not self.feed: - raise RuntimeError('This flume is not part of any ``Feed``?') - - # TODO: maybe a public (property) API for this in ``tractor``? - portal = self.stream._ctx._portal - assert portal - - # XXX: this should be singleton on a host, - # a lone broker-daemon per provider should be - # created for all practical purposes - async with maybe_open_context( - acm_func=partial( - portal.open_context, - iter_ohlc_periods, - ), - kwargs={'delay_s': delay_s}, - ) as (cache_hit, (ctx, first)): - async with ctx.open_stream() as istream: - if cache_hit: - # add a new broadcast subscription for the quote stream - # if this feed is likely already in use - async with istream.subscribe() as bistream: - yield bistream - else: - yield istream - def get_ds_info( self, ) -> tuple[float, float, float]: