diff --git a/piker/data/feed.py b/piker/data/feed.py index 9d4e09d9..9beec93b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -26,7 +26,7 @@ from collections import ( Counter, ) from contextlib import asynccontextmanager as acm -from decimal import Decimal +# from decimal import Decimal from datetime import datetime from functools import partial import time @@ -55,8 +55,8 @@ import numpy as np from ..brokers import get_brokermod from ..calc import humanize -from ..log import ( - get_logger, +from ._util import ( + log, get_console_log, ) from ..service import ( @@ -64,6 +64,10 @@ from ..service import ( check_for_service, ) from .flows import Flume +from .validate import ( + FeedInit, + validate_backend, +) from ._sharedmem import ( maybe_open_shm_array, ShmArray, @@ -72,10 +76,8 @@ from ._sharedmem import ( from .ingest import get_ingestormod from .types import Struct from ..accounting._mktinfo import ( - Asset, MktPair, unpack_fqme, - Symbol, ) from ._source import base_iohlc_dtype from ..ui import _search @@ -91,8 +93,6 @@ from ..brokers._util import ( if TYPE_CHECKING: from ..service.marketstore import Storage -log = get_logger(__name__) - class _FeedsBus(Struct): ''' @@ -568,7 +568,7 @@ async def tsdb_backfill( timeframe=timeframe, ) - broker, symbol, expiry = unpack_fqme(fqsn) + broker, *_ = unpack_fqme(fqsn) try: ( latest_start_dt, @@ -790,13 +790,14 @@ async def manage_history( # port = _runtime_vars['_root_mailbox'][1] uid = tractor.current_actor().uid - suffix = '.'.join(uid) + name, uuid = uid + service = name.rstrip(f'.{mod.name}') # (maybe) allocate shm array for this broker/symbol which will # be used for fast near-term history capture and processing. hist_shm, opened = maybe_open_shm_array( # key=f'{fqsn}_hist_p{port}', - key=f'{fqsn}_hist.{suffix}', + key=f'piker.{service}[{uuid[:16]}.{fqsn}.hist', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -814,7 +815,8 @@ async def manage_history( rt_shm, opened = maybe_open_shm_array( # key=f'{fqsn}_rt_p{port}', - key=f'{fqsn}_rt.{suffix}', + # key=f'piker.{service}.{fqsn}_rt.{uuid}', + key=f'piker.{service}[{uuid[:16]}.{fqsn}.rt', # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -933,24 +935,6 @@ async def manage_history( await trio.sleep_forever() -class BackendInitMsg(Struct, frozen=True): - ''' - A stringent data provider startup msg schema validator. - - The fields defined here are matched with those absolutely required - from each backend broker/data provider. - - ''' - fqme: str - symbol_info: dict | None = None - mkt_info: MktPair | None = None - shm_write_opts: dict[str, Any] | None = None - - -def validate_init_msg() -> None: - ... - - async def allocate_persistent_feed( bus: _FeedsBus, sub_registered: trio.Event, @@ -961,7 +945,7 @@ async def allocate_persistent_feed( loglevel: str, start_stream: bool = True, - task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[FeedInit] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -991,22 +975,37 @@ async def allocate_persistent_feed( some_data_ready = trio.Event() feed_is_live = trio.Event() - symstr = symstr.lower() - # establish broker backend quote stream by calling - # ``stream_quotes()``, which is a required broker backend endpoint. + # ``stream_quotes()``, a required broker backend endpoint. + init_msgs: ( + list[FeedInit] # new + | dict[str, dict[str, str]] # legacy / deprecated + ) + + # TODO: probably make a struct msg type for this as well + # since eventually we do want to have more efficient IPC.. + first_quote: dict[str, Any] + + symstr = symstr.lower() ( - init_msg, + init_msgs, first_quote, ) = await bus.nursery.start( partial( mod.stream_quotes, send_chan=send, feed_is_live=feed_is_live, + + # NOTE / TODO: eventualy we may support providing more then + # one input here such that a datad daemon can multiplex + # multiple live feeds from one task, instead of getting + # a new request (and thus new task) for each subscription. symbols=[symstr], + loglevel=loglevel, ) ) + # TODO: this is indexed by symbol for now since we've planned (for # some time) to expect backends to handle single # ``.stream_quotes()`` calls with multiple symbols inputs to just @@ -1029,58 +1028,15 @@ async def allocate_persistent_feed( # a small streaming machine around the remote feed which can then # do the normal work of sampling and writing shm buffers # (depending on if we want sampling done on the far end or not?) - per_mkt_init_msg = init_msg[symstr] - - # the broker-specific fully qualified symbol name, - # but ensure it is lower-cased for external use. - bs_mktid = per_mkt_init_msg['fqsn'].lower() - - # true fqme including broker/provider suffix - fqme = '.'.join((bs_mktid, brokername)) - - mktinfo = per_mkt_init_msg.get('mkt_info') - if not mktinfo: - - log.warning( - f'BACKEND {brokername} is using old `Symbol` style API\n' - 'IT SHOULD BE PORTED TO THE NEW `.accounting._mktinfo.MktPair`\n' - 'STATTTTT!!!\n' - ) - mktinfo = per_mkt_init_msg['symbol_info'] - - # TODO: read out renamed/new tick size fields in block below! - price_tick = mktinfo.get( - 'price_tick_size', - Decimal('0.01'), - ) - size_tick = mktinfo.get( - 'lot_tick_size', - Decimal('0.0'), - ) - - log.warning(f'FQME: {fqme} -> backend needs port to `MktPair`') - mkt = MktPair.from_fqme( - fqme, - price_tick=price_tick, - size_tick=size_tick, - bs_mktid=bs_mktid, - - _atype=mktinfo['asset_type'] - ) - - symbol = Symbol.from_fqsn( - fqsn=fqme, - info=mktinfo, - ) - - else: - # the new msg-protocol is to expect an already packed - # ``Asset`` and ``MktPair`` object from the backend - symbol = mkt = mktinfo - assert isinstance(mkt, MktPair) - assert isinstance(mkt.dst, Asset) - - assert mkt.type_key + init: FeedInit = validate_backend( + mod, + [symstr], + init_msgs, + ) + bs_mktid: str = init.bs_mktid + mkt: MktPair = init.mkt_info + assert mkt.bs_mktid == bs_mktid + fqme: str = mkt.fqme # HISTORY storage, run 2 tasks: # - a history loader / maintainer @@ -1116,7 +1072,7 @@ async def allocate_persistent_feed( # TODO: we have to use this for now since currently the # MktPair above doesn't render the correct output key it seems # when we provide the `MktInfo` here?..? - mkt=symbol, + mkt=mkt, first_quote=first_quote, _rt_shm_token=rt_shm.token, @@ -1125,11 +1081,17 @@ async def allocate_persistent_feed( izero_rt=izero_rt, ) - # for ambiguous names we simply apply the retreived + # for ambiguous names we simply register the + # flume for all possible name (sub) sets. # feed to that name (for now). - bus.feeds[symstr] = bus.feeds[bs_mktid] = flume + bus.feeds.update({ + symstr: flume, + fqme: flume, + mkt.bs_fqme: flume, + }) - task_status.started() + # signal the ``open_feed_bus()`` caller task to continue + task_status.started(init) if not start_stream: await trio.sleep_forever() @@ -1140,9 +1102,7 @@ async def allocate_persistent_feed( # NOTE: if not configured otherwise, we always sum tick volume # values in the OHLCV sampler. - sum_tick_vlm: bool = init_msg.get( - 'shm_write_opts', {} - ).get('sum_tick_vlm', True) + sum_tick_vlm: bool = (init.shm_write_opts or {}).get('sum_tick_vlm', True) # NOTE: if no high-freq sampled data has (yet) been loaded, # seed the buffer with a history datum - this is most handy @@ -1218,7 +1178,6 @@ async def open_feed_bus( # ensure we are who we think we are servicename = tractor.current_actor().name assert 'brokerd' in servicename - assert brokername in servicename bus = get_feed_bus(brokername) @@ -1573,7 +1532,7 @@ async def open_feed( feed = Feed() for fqsn in fqsns: - brokername, key, suffix = unpack_fqme(fqsn) + brokername, *_ = unpack_fqme(fqsn) bfqsn = fqsn.replace('.' + brokername, '') try: diff --git a/piker/data/validate.py b/piker/data/validate.py new file mode 100644 index 00000000..8e71326c --- /dev/null +++ b/piker/data/validate.py @@ -0,0 +1,201 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . +''' +Data feed synchronization protocols, init msgs, and general +data-provider-backend-agnostic schema definitions. + +''' +from decimal import Decimal +from pprint import pformat +from types import ModuleType +from typing import ( + Any, +) + +from .types import Struct +from ..accounting import ( + Asset, + MktPair, +) +from ._util import log + + +class FeedInitializationError(ValueError): + ''' + Live data feed setup failed due to API / msg incompatiblity! + + ''' + + +class FeedInit(Struct, frozen=True): + ''' + A stringent data provider startup msg schema validator. + + The fields defined here are matched with those absolutely required + from each backend broker/data provider. + + ''' + # backend specific, market endpoint id + bs_mktid: str + mkt_info: MktPair + shm_write_opts: dict[str, Any] | None = None + + +def validate_backend( + mod: ModuleType, + syms: list[str], + init_msgs: list[FeedInit] | dict[str, dict[str, Any]], + + # TODO: do a module method scan and report mismatches. + check_eps: bool = False, + + api_log_msg_level: str = 'critical' + +) -> FeedInit: + ''' + Fail on malformed live quotes feed config/init or warn on changes + that haven't been implemented by this backend yet. + + ''' + if isinstance(init_msgs, dict): + for i, (sym_str, msg) in enumerate(init_msgs.items()): + init: FeedInit | dict[str, Any] = msg + + # XXX: eventually this WILL NOT necessarily be true. + if i > 0: + assert not len(init_msgs) == 1 + keys: set = set(init_msgs.keys()) - set(syms) + raise FeedInitializationError( + 'TOO MANY INIT MSGS!\n' + f'Unexpected keys: {keys}\n' + 'ALL MSGS:\n' + f'{pformat(init_msgs)}\n' + ) + + # TODO: once all backends are updated we can remove this branching. + rx_msg: bool = False + warn_msg: str = '' + if not isinstance(init, FeedInit): + warn_msg += ( + '\n' + '--------------------------\n' + ':::DEPRECATED API STYLE:::\n' + '--------------------------\n' + f'`{mod.name}.stream_quotes()` should deliver ' + '`.started(FeedInit)`\n' + f'|-> CURRENTLY it is using DEPRECATED `.started(dict)` style!\n' + f'|-> SEE `FeedInit` in `piker.data.validate`\n' + '--------------------------------------------\n' + ) + else: + rx_msg = True + + # verify feed init state / schema + bs_mktid: str # backend specific (unique) market id + bs_fqme: str # backend specific fqme + mkt: MktPair + + match init: + case { + 'symbol_info': dict(symbol_info), + 'fqsn': bs_fqme, + } | { + 'mkt_info': dict(symbol_info), + 'fqsn': bs_fqme, + }: + symbol_info: dict + warn_msg += ( + 'It may also be still using the legacy `Symbol` style API\n' + 'IT SHOULD BE PORTED TO THE NEW ' + '`.accounting._mktinfo.MktPair`\n' + 'STATTTTT!!!\n' + ) + + # XXX use default legacy (aka discrete precision) mkt + # price/size_ticks if none delivered. + price_tick = symbol_info.get( + 'price_tick_size', + Decimal('0.01'), + ) + size_tick = symbol_info.get( + 'lot_tick_size', + Decimal('1'), + ) + mkt = MktPair.from_fqme( + fqme=f'{bs_fqme}.{mod.name}', + + price_tick=price_tick, + size_tick=size_tick, + + bs_mktid=str(init['bs_mktid']), + _atype=symbol_info['asset_type'] + ) + + case { + 'mkt_info': MktPair( + dst=Asset(), + ) as mkt, + 'fqsn': bs_fqme, + }: + warn_msg += ( + f'{mod.name} in API compat transition?\n' + "It's half dict, half man..\n" + '-------------------------------------\n' + ) + + case FeedInit( + # bs_mktid=bs_mktid, + mkt_info=MktPair(dst=Asset()) as mkt, + shm_write_opts=dict(), + ) as init: + log.info( + f'NICE JOB {mod.name} BACKEND!\n' + 'You are fully up to API spec B):\n' + f'{init.to_dict()}' + ) + + case _: + raise FeedInitializationError(init) + + # build a msg if we received a dict for input. + if not rx_msg: + init = FeedInit( + bs_mktid=mkt.bs_mktid, + mkt_info=mkt, + shm_write_opts=init.get('shm_write_opts'), + ) + + # `MktPair` value audits + mkt = init.mkt_info + assert bs_fqme in mkt.fqme + assert mkt.type_key + + # `MktPair` wish list + if not isinstance(mkt.src, Asset): + warn_msg += ( + f'ALSO, {mod.name.upper()} should try to deliver\n' + 'the new `MktPair.src: Asset` field!\n' + '-----------------------------------------------\n' + ) + + # complain about any non-idealities + if warn_msg: + # TODO: would be nice to register an API_COMPAT or something in + # maybe cyan for this in general throughput piker no? + logmeth = getattr(log, api_log_msg_level) + logmeth(warn_msg) + + return init.copy()