From bb6452b969b7c8ddb89370bf23b8cbf93d631904 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Nov 2022 18:57:15 -0500 Subject: [PATCH] Further feed syncing fixes wrt to `Flumes` Sync per-symbol sampler loop start to subscription registers such that the loop can't start until the consumer's stream subscription is added; the task-sync uses a `trio.Event`. This patch also drops a ton of commented cruft. Further adjustments needed to get parity with prior functionality: - pass init msg 'symbol_info' field to the `Symbol.broker_info: dict`. - ensure the `_FeedsBus._subscriptions` table uses the broker specific (without brokername suffix) as keys for lookup so that the sampler loop doesn't have to append in the brokername as a suffix. - ensure the `open_feed_bus()` flumes-table-msg returned sent by `tractor.Context.started()` uses the `.to_msg()` form of all flume structs. - ensure `maybe_open_feed()` uses `tractor.MsgStream.subscribe()` on all `Flume.stream`s on cache hits using the `tractor.trionics.gather_contexts()` helper. --- piker/data/feed.py | 229 +++++++++++++++----------------------------- tests/test_feeds.py | 35 ++++--- 2 files changed, 103 insertions(+), 161 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index e87c00be..1a5eba0c 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -22,10 +22,6 @@ This module is enabled for ``brokerd`` daemons. """ from __future__ import annotations from contextlib import asynccontextmanager as acm -# from dataclasses import ( -# dataclass, -# field, -# ) from datetime import datetime from functools import partial from types import ModuleType @@ -36,7 +32,6 @@ from typing import ( Callable, Optional, Awaitable, - Sequence, TYPE_CHECKING, Union, ) @@ -243,7 +238,7 @@ def diff_history( time = array['time'] to_push = array[time >= last_tsdb_dt.timestamp()] - log.info( + log.debug( f'Pushing partial frame {to_push.size} to shm' ) @@ -359,7 +354,7 @@ async def start_backfill( # last retrieved start dt to the next request as # it's end dt. while start_dt > last_tsdb_dt: - log.info( + log.debug( f'Requesting {step_size_s}s frame ending in {start_dt}' ) @@ -721,13 +716,18 @@ async def manage_history( ''' - from tractor._state import _runtime_vars - port = _runtime_vars['_root_mailbox'][1] + # TODO: is there a way to make each shm file key + # actor-tree-discovery-addr unique so we avoid collisions + # when doing tests which also allocate shms for certain instruments + # that may be in use on the system by some other running daemons? + # from tractor._state import _runtime_vars + # port = _runtime_vars['_root_mailbox'][1] # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. hist_shm, opened = maybe_open_shm_array( - key=f'{fqsn}_hist', #_p{port}', + # key=f'{fqsn}_hist_p{port}', + key=f'{fqsn}_hist', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -744,7 +744,8 @@ async def manage_history( ) rt_shm, opened = maybe_open_shm_array( - key=f'{fqsn}_rt', #_p{port}', + # key=f'{fqsn}_rt_p{port}', + key=f'{fqsn}_rt', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -874,6 +875,7 @@ class Flume(Struct): izero_hist: int = 0 izero_rt: int = 0 throttle_rate: int | None = None + feed: Feed | None = None @property def rt_shm(self) -> ShmArray: @@ -907,12 +909,15 @@ class Flume(Struct): ) -> AsyncIterator[int]: + if not self.feed: + raise RuntimeError('This flume is not part of any ``Feed``?') + # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes async with maybe_open_context( acm_func=partial( - self.portal.open_context, + self.feed.portal.open_context, iter_ohlc_periods, ), kwargs={'delay_s': delay_s}, @@ -964,11 +969,12 @@ class Flume(Struct): def to_msg(self) -> dict: msg = self.to_dict() msg['symbol'] = msg['symbol'].to_dict() - # can't serialize the stream object, it's - # expected you'll have a ref to it since - # this msg should be rxed on a stream on - # whatever far end IPC.. + + # can't serialize the stream or feed objects, it's expected + # you'll have a ref to it since this msg should be rxed on + # a stream on whatever far end IPC.. msg.pop('stream') + msg.pop('feed') return msg @classmethod @@ -982,6 +988,7 @@ class Flume(Struct): async def allocate_persistent_feed( bus: _FeedsBus, + sub_registered: trio.Event, brokername: str, symstr: str, @@ -1064,8 +1071,9 @@ async def allocate_persistent_feed( symbol = Symbol.from_fqsn( fqsn=fqsn, - info=msg, + info=msg['symbol_info'], ) + assert symbol.type_key # HISTORY storage, run 2 tasks: # - a history loader / maintainer @@ -1090,46 +1098,16 @@ async def allocate_persistent_feed( feed_is_live, ) - # we hand an IPC-msg compatible shm token to the caller so it - # can read directly from the memory which will be written by - # this task. - - # msg['hist_shm_token'] = hist_shm.token - # msg['izero_hist'] = izero_hist - # msg['izero_rt'] = izero_rt - # msg['rt_shm_token'] = rt_shm.token - - # add a fqsn entry that includes the ``.`` suffix - # and an entry that includes the broker-specific fqsn (including - # any new suffixes or elements as injected by the backend). - # init_msg[fqsn] = msg - # init_msg[bfqsn] = msg - - # TODO: pretty sure we don't need this? why not just leave 1s as - # the fastest "sample period" since we'll probably always want that - # for most purposes. - # pass OHLC sample rate in seconds (be sure to use python int type) - # init_msg[symbol]['sample_rate'] = 1 #int(delay_s) - # yield back control to starting nursery once we receive either # some history or a real-time quote. log.info(f'waiting on history to load: {fqsn}') await some_data_ready.wait() - # append ``.`` suffix to each quote symbol - # acceptable_not_fqsn_with_broker_suffix = symbol + f'.{brokername}' - - # generic_first_quotes = { - # acceptable_not_fqsn_with_broker_suffix: first_quote, - # fqsn: first_quote, - # } - flume = Flume( symbol=symbol, _hist_shm_token=hist_shm.token, _rt_shm_token=rt_shm.token, first_quote=first_quote, - # stream=stream, izero_hist=izero_hist, izero_rt=izero_rt, # throttle_rate=tick_throttle, @@ -1138,9 +1116,6 @@ async def allocate_persistent_feed( # for ambiguous names we simply apply the retreived # feed to that name (for now). bus.feeds[symstr] = bus.feeds[bfqsn] = flume - # init_msg, - # generic_first_quotes, - # ) # insert 1s ohlc into the increment buffer set # to update and shift every second @@ -1184,9 +1159,14 @@ async def allocate_persistent_feed( rt_shm.array[ohlckeys][-2:] = hist_shm.array['close'][-1] rt_shm.array['volume'][-2] = 0 + # wait the spawning parent task to register its subscriber + # send-stream entry before we start the sample loop. + await sub_registered.wait() + # start sample loop and shm incrementer task for OHLC style sampling # at the above registered step periods. try: + log.info(f'Starting sampler task for {fqsn}') await sample_and_broadcast( bus, rt_shm, @@ -1235,14 +1215,16 @@ async def open_feed_bus( assert brokername in servicename bus = get_feed_bus(brokername) + sub_registered = trio.Event() flumes: dict[str, Flume] = {} + for symbol in symbols: # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery - entry = bus.feeds.get(symbol) - if entry is None: + flume = bus.feeds.get(symbol) + if flume is None: # allocate a new actor-local stream bus which # will persist for this `brokerd`'s service lifetime. async with bus.task_lock: @@ -1251,6 +1233,7 @@ async def open_feed_bus( allocate_persistent_feed, bus=bus, + sub_registered=sub_registered, brokername=brokername, # here we pass through the selected symbol in native # "format" (i.e. upper vs. lowercase depending on @@ -1263,24 +1246,17 @@ async def open_feed_bus( # TODO: we can remove this? # assert isinstance(bus.feeds[symbol], tuple) - # XXX: ``first_quotes`` may be outdated here if this is secondary + # XXX: ``.first_quote`` may be outdated here if this is secondary # subscriber - # init_msg, first_quotes = bus.feeds[symbol] flume = bus.feeds[symbol] - # assert bus.feeds[bfqsn] is flume + sym = flume.symbol + bfqsn = sym.key + fqsn = sym.fqsn # true fqsn + assert bfqsn in fqsn and brokername in fqsn - # msg = init_msg[symbol] - # bfqsn = msg['fqsn'].lower() - bfqsn = flume.symbol.key - - # true fqsn - fqsn = '.'.join([bfqsn, brokername]) - assert fqsn == flume.symbol.fqsn - # assert fqsn in first_quotes - - # broker-ambiguous symbol (provided on cli - eg. mnq.globex.ib) - # bsym = symbol + f'.{brokername}' - # assert bsym in first_quotes + if sym.suffix: + bfqsn = fqsn.rstrip(f'.{brokername}') + log.warning(f'{brokername} expanded symbol {symbol} -> {bfqsn}') # pack for ``.started()`` sync msg flumes[fqsn] = flume @@ -1290,13 +1266,12 @@ async def open_feed_bus( # expected to append it's own name to the fqsn, so we filter # on keys which *do not* include that name (e.g .ib) . bus._subscribers.setdefault(bfqsn, []) + # await tractor.breakpoint() - # send this even to subscribers to existing feed? - # deliver initial info message a first quote asap - await ctx.started(flumes) - # init_msg, - # first_quotes, - # )) + # sync feed subscribers with flume handles + await ctx.started( + {fqsn: flume.to_msg() for fqsn, flume in flumes.items()} + ) if not start_stream: log.warning(f'Not opening real-time stream for {fqsn}') @@ -1352,10 +1327,13 @@ async def open_feed_bus( # maybe use the current task-id to key the sub list that's # added / removed? Or maybe we can add a general # pause-resume by sub-key api? + bfqsn = fqsn.rstrip(f'.{brokername}') bus_subs = bus._subscribers[bfqsn] bus_subs.append(sub) local_subs.append(sub) + sub_registered.set() + try: uid = ctx.chan.uid @@ -1396,7 +1374,6 @@ async def open_feed_bus( log.warning(f'{sub} for {symbol} was already removed?') -# @dataclass class Feed(Struct): ''' A per-provider API for client-side consumption from real-time data @@ -1410,31 +1387,17 @@ class Feed(Struct): similarly allocated shm arrays. ''' - # name: str - # hist_shm: ShmArray - # rt_shm: ShmArray mod: ModuleType _portal: tractor.Portal - # symbol names to first quote dicts - # shms: dict[str, tuple[ShmArray, Shmarray]] flumes: dict[str, Flume] = {} - # first_quotes: dict[str, dict] = {} streams: dict[ str, trio.abc.ReceiveChannel[dict[str, Any]], ] = {} status: dict[str, Any] - # izero_hist: int = 0 - # izero_rt: int = 0 - # throttle_rate: Optional[int] = None - _max_sample_rate: int = 1 - # cache of symbol info messages received as first message when - # a stream startsc. - # symbols: dict[str, Symbol] = {} - @property def portal(self) -> tractor.Portal: return self._portal @@ -1496,8 +1459,6 @@ async def open_feed( Open a "data feed" which provides streamed real-time quotes. ''' - # fqsn = fqsns[0].lower() - providers: dict[ModuleType, list[str]] = {} for fqsn in fqsns: @@ -1531,7 +1492,7 @@ async def open_feed( brokerd_ctxs, ) as portals: - bus_ctxs = [] + bus_ctxs: list[AsyncContextManager] = [] for ( portal, (brokermod, bfqsns), @@ -1551,8 +1512,9 @@ async def open_feed( 'actor_name': feed.portal.channel.uid[0], 'host': host, 'port': port, - # 'shm': f'{humanize(feed.hist_shm._shm.size)}', - # 'throttle_rate': feed.throttle_rate, + 'hist_shm': 'NA', + 'rt_shm': 'NA', + 'throttle_rate': tick_throttle, }) # feed.status.update(init_msg.pop('status', {})) @@ -1571,6 +1533,7 @@ async def open_feed( async with ( gather_contexts(bus_ctxs) as ctxs, ): + remote_scopes = [] for ( (ctx, flumes_msg_dict), (brokermod, bfqsns), @@ -1581,20 +1544,7 @@ async def open_feed( flume = Flume.from_msg(flume_msg) assert flume.symbol.fqsn == fqsn feed.flumes[fqsn] = flume - - # TODO: this is ugly but eventually we could - # in theory do all this "tabling" of flumes on - # the brokerd-side, in which case we'll likely - # want to make each flume IPC-msg-native? - # bfqsn = list(init_msgs)[0] - # init = init_msg[bfqsn] - - # si = data['symbol_info'] - # fqsn = data['fqsn'] + f'.{brokername}' - # symbol = Symbol.from_fqsn( - # fqsn, - # info=si, - # ) + flume.feed = feed # attach and cache shm handles rt_shm = flume.rt_shm @@ -1602,11 +1552,18 @@ async def open_feed( hist_shm = flume.hist_shm assert hist_shm + feed.status['hist_shm'] = ( + f'{humanize(hist_shm._shm.size)}' + ) + feed.status['rt_shm'] = f'{humanize(rt_shm._shm.size)}' + + remote_scopes.append(ctx) stream_ctxs.append( ctx.open_stream( - # XXX: be explicit about stream backpressure since we should - # **never** overrun on feeds being too fast, which will - # pretty much always happen with HFT XD + # XXX: be explicit about stream backpressure + # since we should **never** overrun on feeds + # being too fast, which will pretty much + # always happen with HFT XD backpressure=backpressure, ) ) @@ -1619,49 +1576,15 @@ async def open_feed( (brokermod, bfqsns), ) in zip(streams, providers.items()): - for bfqsn in bfqsns: - fqsn = '.'.join((bfqsn, brokermod.name)) + # for bfqsn in bfqsns: + for fqsn in flumes_msg_dict: # apply common rt steam to each flume # (normally one per broker) feed.flumes[fqsn].stream = stream feed.streams[brokermod.name] = stream - try: - yield feed - finally: - # drop the infinite stream connection - await ctx.cancel() - - # we can only read from shm - # hist_shm = attach_shm_array( - # token=init['hist_shm_token'], - # readonly=True, - # ) - # rt_shm = attach_shm_array( - # token=init['rt_shm_token'], - # readonly=True, - # ) - - # for sym, data in init_msg.items(): - - # symbol.broker_info[brokername] = si - # feed.symbols[fqsn] = symbol - # feed.symbols[f'{sym}.{brokername}'] = symbol - - # cast shm dtype to list... can't member why we need this - # for shm_key, shm in [ - # ('rt_shm_token', rt_shm), - # ('hist_shm_token', hist_shm), - # ]: - # shm_token = flume[shm_key] - - # # XXX: msgspec won't relay through the tuples XD - # shm_token['dtype_descr'] = tuple( - # map(tuple, shm_token['dtype_descr'])) - - # assert shm_token == shm.token # sanity - # assert fqsn in first_quotes + yield feed @acm @@ -1703,7 +1626,13 @@ async def maybe_open_feed( log.info(f'Using cached feed for {fqsn}') # add a new broadcast subscription for the quote stream # if this feed is likely already in use - async with feed.stream.subscribe() as bstream: - yield feed, bstream + + async with gather_contexts( + mngrs=[stream.subscribe() for stream in feed.streams.values()] + ) as bstreams: + for bstream, flume in zip(bstreams, feed.flumes.values()): + flume.stream = bstream + + yield feed else: - yield feed, feed.stream + yield feed diff --git a/tests/test_feeds.py b/tests/test_feeds.py index b0b97690..2fb5f693 100644 --- a/tests/test_feeds.py +++ b/tests/test_feeds.py @@ -2,10 +2,11 @@ Data feed layer APIs, performance, msg throttling. ''' +from collections import Counter from pprint import pprint import pytest -# import tractor +import tractor import trio from piker import ( open_piker_runtime, @@ -17,12 +18,12 @@ from piker.data import ShmArray @pytest.mark.parametrize( 'fqsns', [ - ['btcusdt.binance', 'ethusdt.binance'] + {'btcusdt.binance', 'ethusdt.binance'} ], ids=lambda param: f'fqsns={param}', ) def test_basic_rt_feed( - fqsns: list[str], + fqsns: set[str], ): ''' Start a real-time data feed for provided fqsn and pull @@ -33,11 +34,12 @@ def test_basic_rt_feed( async with ( open_piker_runtime( 'test_basic_rt_feed', + # XXX tractor BUG: this doesn't translate through to the # ``tractor._state._runtimevars``... registry_addr=('127.0.0.1', 6666), - debug_mode=True, - loglevel='runtime', + + # debug_mode=True, ), open_feed( fqsns, @@ -58,20 +60,29 @@ def test_basic_rt_feed( # stream some ticks and ensure we see data from both symbol # subscriptions. - quote_count: int = 0 stream = feed.streams['binance'] - # pull the first couple startup quotes and ensure - # they match the history buffer last entries. + # pull the first startup quotes, one for each fqsn, and + # ensure they match each flume's startup quote value. + fqsns_copy = fqsns.copy() for _ in range(1): first_quotes = await stream.receive() for fqsn, quote in first_quotes.items(): - assert fqsn in fqsns + + # XXX: TODO: WTF apparently this error will get + # supressed and only show up in the teardown + # excgroup if we don't have the fix from + # + # assert 0 + + fqsns_copy.remove(fqsn) flume = feed.flumes[fqsn] assert quote['last'] == flume.first_quote['last'] + cntr = Counter() async for quotes in stream: for fqsn, quote in quotes.items(): + cntr[fqsn] += 1 # await tractor.breakpoint() flume = feed.flumes[fqsn] @@ -91,9 +102,11 @@ def test_basic_rt_feed( f'rt_ohlc: {rt_row}\n' f'hist_ohlc: {hist_row}\n' ) - quote_count += 1 - if quote_count >= 100: + if cntr.total() >= 100: break + # await tractor.breakpoint() + assert set(cntr.keys()) == fqsns + trio.run(main)