diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 1b305009..5952418f 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -30,7 +30,8 @@ from types import ModuleType from typing import ( Any, Iterator, - Generator + Generator, + TYPE_CHECKING, ) import pendulum @@ -59,8 +60,10 @@ from ..clearing._messages import ( BrokerdPosition, ) from piker.types import Struct -from piker.data._symcache import SymbologyCache -from ..log import get_logger +from piker.log import get_logger + +if TYPE_CHECKING: + from piker.data._symcache import SymbologyCache log = get_logger(__name__) @@ -493,6 +496,17 @@ class Account(Struct): _mktmap_table: dict[str, MktPair] | None = None, + only_require: list[str]|True = True, + # ^list of fqmes that are "required" to be processed from + # this ledger pass; we often don't care about others and + # definitely shouldn't always error in such cases. + # (eg. broker backend loaded that doesn't yet supsport the + # symcache but also, inside the paper engine we don't ad-hoc + # request `get_mkt_info()` for every symbol in the ledger, + # only the one for which we're simulating against). + # TODO, not sure if there's a better soln for this, ideally + # all backends get symcache support afap i guess.. + ) -> dict[str, Position]: ''' Update the internal `.pps[str, Position]` table from input @@ -535,11 +549,32 @@ class Account(Struct): if _mktmap_table is None: raise + required: bool = ( + only_require is True + or ( + only_require is not True + and + fqme in only_require + ) + ) # XXX: caller is allowed to provide a fallback # mktmap table for the case where a new position is # being added and the preloaded symcache didn't # have this entry prior (eg. with frickin IB..) - mkt = _mktmap_table[fqme] + if ( + not (mkt := _mktmap_table.get(fqme)) + and + required + ): + raise + + elif not required: + continue + + else: + # should be an entry retreived somewhere + assert mkt + if not (pos := pps.get(bs_mktid)): @@ -656,7 +691,7 @@ class Account(Struct): def write_config(self) -> None: ''' Write the current account state to the user's account TOML file, normally - something like ``pps.toml``. + something like `pps.toml`. ''' # TODO: show diff output? diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 0c328d9f..94e4cbe1 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -98,13 +98,14 @@ async def open_cached_client( If one has not been setup do it and cache it. ''' - brokermod = get_brokermod(brokername) + brokermod: ModuleType = get_brokermod(brokername) + + # TODO: make abstract or `typing.Protocol` + # client: Client async with maybe_open_context( acm_func=brokermod.get_client, kwargs=kwargs, - ) as (cache_hit, client): - if cache_hit: log.runtime(f'Reusing existing {client}') diff --git a/piker/brokers/cli.py b/piker/brokers/cli.py index d54b2203..626b4ff8 100644 --- a/piker/brokers/cli.py +++ b/piker/brokers/cli.py @@ -471,11 +471,15 @@ def search( ''' # global opts - brokermods = list(config['brokermods'].values()) + brokermods: list[ModuleType] = list(config['brokermods'].values()) + + # TODO: this is coming from the `search --pdb` NOT from + # the `piker --pdb` XD .. + # -[ ] pull from the parent click ctx's values..dumdum + # assert pdb # define tractor entrypoint async def main(func): - async with maybe_open_pikerd( loglevel=config['loglevel'], debug_mode=pdb, diff --git a/piker/brokers/core.py b/piker/brokers/core.py index 6111d307..c1aa88ac 100644 --- a/piker/brokers/core.py +++ b/piker/brokers/core.py @@ -22,7 +22,9 @@ routines should be primitive data types where possible. """ import inspect from types import ModuleType -from typing import List, Dict, Any, Optional +from typing import ( + Any, +) import trio @@ -34,8 +36,10 @@ from ..accounting import MktPair async def api(brokername: str, methname: str, **kwargs) -> dict: - """Make (proxy through) a broker API call by name and return its result. - """ + ''' + Make (proxy through) a broker API call by name and return its result. + + ''' brokermod = get_brokermod(brokername) async with brokermod.get_client() as client: meth = getattr(client, methname, None) @@ -62,10 +66,14 @@ async def api(brokername: str, methname: str, **kwargs) -> dict: async def stocks_quote( brokermod: ModuleType, - tickers: List[str] -) -> Dict[str, Dict[str, Any]]: - """Return quotes dict for ``tickers``. - """ + tickers: list[str] + +) -> dict[str, dict[str, Any]]: + ''' + Return a `dict` of snapshot quotes for the provided input + `tickers`: a `list` of fqmes. + + ''' async with brokermod.get_client() as client: return await client.quote(tickers) @@ -74,13 +82,15 @@ async def stocks_quote( async def option_chain( brokermod: ModuleType, symbol: str, - date: Optional[str] = None, -) -> Dict[str, Dict[str, Dict[str, Any]]]: - """Return option chain for ``symbol`` for ``date``. + date: str|None = None, +) -> dict[str, dict[str, dict[str, Any]]]: + ''' + Return option chain for ``symbol`` for ``date``. By default all expiries are returned. If ``date`` is provided then contract quotes for that single expiry are returned. - """ + + ''' async with brokermod.get_client() as client: if date: id = int((await client.tickers2ids([symbol]))[symbol]) @@ -98,7 +108,7 @@ async def option_chain( # async def contracts( # brokermod: ModuleType, # symbol: str, -# ) -> Dict[str, Dict[str, Dict[str, Any]]]: +# ) -> dict[str, dict[str, dict[str, Any]]]: # """Return option contracts (all expiries) for ``symbol``. # """ # async with brokermod.get_client() as client: @@ -110,15 +120,24 @@ async def bars( brokermod: ModuleType, symbol: str, **kwargs, -) -> Dict[str, Dict[str, Dict[str, Any]]]: - """Return option contracts (all expiries) for ``symbol``. - """ +) -> dict[str, dict[str, dict[str, Any]]]: + ''' + Return option contracts (all expiries) for ``symbol``. + + ''' async with brokermod.get_client() as client: return await client.bars(symbol, **kwargs) -async def search_w_brokerd(name: str, pattern: str) -> dict: +async def search_w_brokerd( + name: str, + pattern: str, +) -> dict: + # TODO: WHY NOT WORK!?! + # when we `step` through the next block? + # import tractor + # await tractor.pause() async with open_cached_client(name) as client: # TODO: support multiple asset type concurrent searches. @@ -130,12 +149,12 @@ async def symbol_search( pattern: str, **kwargs, -) -> Dict[str, Dict[str, Dict[str, Any]]]: +) -> dict[str, dict[str, dict[str, Any]]]: ''' Return symbol info from broker. ''' - results = [] + results: list[str] = [] async def search_backend( brokermod: ModuleType @@ -143,6 +162,13 @@ async def symbol_search( brokername: str = mod.name + # TODO: figure this the FUCK OUT + # -> ok so obvi in the root actor any async task that's + # spawned outside the main tractor-root-actor task needs to + # call this.. + # await tractor.devx._debug.maybe_init_greenback() + # tractor.pause_from_sync() + async with maybe_spawn_brokerd( mod.name, infect_asyncio=getattr( @@ -162,7 +188,6 @@ async def symbol_search( )) async with trio.open_nursery() as n: - for mod in brokermods: n.start_soon(search_backend, mod.name) @@ -172,11 +197,13 @@ async def symbol_search( async def mkt_info( brokermod: ModuleType, fqme: str, + **kwargs, ) -> MktPair: ''' - Return MktPair info from broker including src and dst assets. + Return the `piker.accounting.MktPair` info struct from a given + backend broker tradable src/dst asset pair. ''' async with open_cached_client(brokermod.name) as client: diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 9bb2aa74..6d8f645e 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -168,7 +168,6 @@ class OrderClient(Struct): async def relay_orders_from_sync_code( - client: OrderClient, symbol_key: str, to_ems_stream: tractor.MsgStream, @@ -242,6 +241,11 @@ async def open_ems( async with maybe_open_emsd( broker, + # XXX NOTE, LOL so this determines the daemon `emsd` loglevel + # then FYI.. that's kinda wrong no? + # -[ ] shouldn't it be set by `pikerd -l` or no? + # -[ ] would make a lot more sense to have a subsys ctl for + # levels.. like `-l emsd.info` or something? loglevel=loglevel, ) as portal: diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3f7045fa..af5fe690 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -653,7 +653,11 @@ class Router(Struct): flume = feed.flumes[fqme] first_quote: dict = flume.first_quote book: DarkBook = self.get_dark_book(broker) - book.lasts[fqme]: float = float(first_quote['last']) + + if not (last := first_quote.get('last')): + last: float = flume.rt_shm.array[-1]['close'] + + book.lasts[fqme]: float = float(last) async with self.maybe_open_brokerd_dialog( brokermod=brokermod, @@ -716,7 +720,7 @@ class Router(Struct): subs = self.subscribers[sub_key] sent_some: bool = False - for client_stream in subs: + for client_stream in subs.copy(): try: await client_stream.send(msg) sent_some = True @@ -1010,10 +1014,14 @@ async def translate_and_relay_brokerd_events( status_msg.brokerd_msg = msg status_msg.src = msg.broker_details['name'] - await router.client_broadcast( - status_msg.req.symbol, - status_msg, - ) + if not status_msg.req: + # likely some order change state? + await tractor.pause() + else: + await router.client_broadcast( + status_msg.req.symbol, + status_msg, + ) if status == 'closed': log.info(f'Execution for {oid} is complete!') diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 0393b2e6..60835598 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -297,6 +297,8 @@ class PaperBoi(Struct): # transmit pp msg to ems pp: Position = self.acnt.pps[bs_mktid] + # TODO, this will break if `require_only=True` was passed to + # `.update_from_ledger()` pp_msg = BrokerdPosition( broker=self.broker, @@ -653,6 +655,7 @@ async def open_trade_dialog( # in) use manually constructed table from calling # the `.get_mkt_info()` provider EP above. _mktmap_table=mkt_by_fqme, + only_require=list(mkt_by_fqme), ) pp_msgs: list[BrokerdPosition] = [] diff --git a/piker/clearing/_util.py b/piker/clearing/_util.py index 962861e8..c82a01aa 100644 --- a/piker/clearing/_util.py +++ b/piker/clearing/_util.py @@ -30,6 +30,7 @@ subsys: str = 'piker.clearing' log = get_logger(subsys) +# TODO, oof doesn't this ignore the `loglevel` then??? get_console_log = partial( get_console_log, name=subsys, diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 7bb0231d..e5b87a2a 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -95,6 +95,12 @@ class Sampler: # history loading. incr_task_cs: trio.CancelScope | None = None + bcast_errors: tuple[Exception] = ( + trio.BrokenResourceError, + trio.ClosedResourceError, + trio.EndOfChannel, + ) + # holds all the ``tractor.Context`` remote subscriptions for # a particular sample period increment event: all subscribers are # notified on a step. @@ -258,14 +264,15 @@ class Sampler: subs: set last_ts, subs = pair - task = trio.lowlevel.current_task() - log.debug( - f'SUBS {self.subscribers}\n' - f'PAIR {pair}\n' - f'TASK: {task}: {id(task)}\n' - f'broadcasting {period_s} -> {last_ts}\n' - # f'consumers: {subs}' - ) + # NOTE, for debugging pub-sub issues + # task = trio.lowlevel.current_task() + # log.debug( + # f'AlL-SUBS@{period_s!r}: {self.subscribers}\n' + # f'PAIR: {pair}\n' + # f'TASK: {task}: {id(task)}\n' + # f'broadcasting {period_s} -> {last_ts}\n' + # f'consumers: {subs}' + # ) borked: set[MsgStream] = set() sent: set[MsgStream] = set() while True: @@ -282,12 +289,11 @@ class Sampler: await stream.send(msg) sent.add(stream) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): + except self.bcast_errors as err: log.error( - f'{stream._ctx.chan.uid} dropped connection' + f'Connection dropped for IPC ctx\n' + f'{stream._ctx}\n\n' + f'Due to {type(err)}' ) borked.add(stream) else: @@ -394,7 +400,8 @@ async def register_with_sampler( finally: if ( sub_for_broadcasts - and subs + and + subs ): try: subs.remove(stream) @@ -561,8 +568,7 @@ async def open_sample_stream( async def sample_and_broadcast( - - bus: _FeedsBus, # noqa + bus: _FeedsBus, rt_shm: ShmArray, hist_shm: ShmArray, quote_stream: trio.abc.ReceiveChannel, @@ -582,11 +588,33 @@ async def sample_and_broadcast( overruns = Counter() + # NOTE, only used for debugging live-data-feed issues, though + # this should be resolved more correctly in the future using the + # new typed-msgspec feats of `tractor`! + # + # XXX, a multiline nested `dict` formatter (since rn quote-msgs + # are just that). + # pfmt: Callable[[str], str] = mk_repr() + # iterate stream delivered by broker async for quotes in quote_stream: # print(quotes) - # TODO: ``numba`` this! + # XXX WARNING XXX only enable for debugging bc ow can cost + # ALOT of perf with HF-feedz!!! + # + # log.info( + # 'Rx live quotes:\n' + # f'{pfmt(quotes)}' + # ) + + # TODO, + # -[ ] `numba` or `cython`-nize this loop possibly? + # |_alternatively could we do it in rust somehow by upacking + # arrow msgs instead of using `msgspec`? + # -[ ] use `msgspec.Struct` support in new typed-msging from + # `tractor` to ensure only allowed msgs are transmitted? + # for broker_symbol, quote in quotes.items(): # TODO: in theory you can send the IPC msg *before* writing # to the sharedmem array to decrease latency, however, that @@ -659,6 +687,21 @@ async def sample_and_broadcast( sub_key: str = broker_symbol.lower() subs: set[Sub] = bus.get_subs(sub_key) + # TODO, figure out how to make this useful whilst + # incoporating feed "pausing" .. + # + # if not subs: + # all_bs_fqmes: list[str] = list( + # bus._subscribers.keys() + # ) + # log.warning( + # f'No subscribers for {brokername!r} live-quote ??\n' + # f'broker_symbol: {broker_symbol}\n\n' + + # f'Maybe the backend-sys symbol does not match one of,\n' + # f'{pfmt(all_bs_fqmes)}\n' + # ) + # NOTE: by default the broker backend doesn't append # it's own "name" into the fqme schema (but maybe it # should?) so we have to manually generate the correct @@ -728,18 +771,14 @@ async def sample_and_broadcast( if lags > 10: await tractor.pause() - except ( - trio.BrokenResourceError, - trio.ClosedResourceError, - trio.EndOfChannel, - ): + except Sampler.bcast_errors as ipc_err: ctx: Context = ipc._ctx chan: Channel = ctx.chan if ctx: log.warning( - 'Dropped `brokerd`-quotes-feed connection:\n' - f'{broker_symbol}:' - f'{ctx.cid}@{chan.uid}' + f'Dropped `brokerd`-feed for {broker_symbol!r} due to,\n' + f'x>) {ctx.cid}@{chan.uid}' + f'|_{ipc_err!r}\n\n' ) if sub.throttle_rate: assert ipc._closed @@ -756,12 +795,11 @@ async def sample_and_broadcast( async def uniform_rate_send( - rate: float, quote_stream: trio.abc.ReceiveChannel, stream: MsgStream, - task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, ) -> None: ''' @@ -779,13 +817,16 @@ async def uniform_rate_send( https://gist.github.com/njsmith/7ea44ec07e901cb78ebe1dd8dd846cb9 ''' - # TODO: compute the approx overhead latency per cycle - left_to_sleep = throttle_period = 1/rate - 0.000616 + # ?TODO? dynamically compute the **actual** approx overhead latency per cycle + # instead of this magic # bidinezz? + throttle_period: float = 1/rate - 0.000616 + left_to_sleep: float = throttle_period # send cycle state + first_quote: dict|None first_quote = last_quote = None - last_send = time.time() - diff = 0 + last_send: float = time.time() + diff: float = 0 task_status.started() ticks_by_type: dict[ @@ -796,22 +837,28 @@ async def uniform_rate_send( clear_types = _tick_groups['clears'] while True: - # compute the remaining time to sleep for this throttled cycle - left_to_sleep = throttle_period - diff + left_to_sleep: float = throttle_period - diff if left_to_sleep > 0: + cs: trio.CancelScope with trio.move_on_after(left_to_sleep) as cs: + sym: str + last_quote: dict try: sym, last_quote = await quote_stream.receive() except trio.EndOfChannel: - log.exception(f"feed for {stream} ended?") + log.exception( + f'Live stream for feed for ended?\n' + f'<=c\n' + f' |_[{stream!r}\n' + ) break - diff = time.time() - last_send + diff: float = time.time() - last_send if not first_quote: - first_quote = last_quote + first_quote: float = last_quote # first_quote['tbt'] = ticks_by_type if (throttle_period - diff) > 0: @@ -872,7 +919,9 @@ async def uniform_rate_send( # TODO: now if only we could sync this to the display # rate timing exactly lul try: - await stream.send({sym: first_quote}) + await stream.send({ + sym: first_quote + }) except tractor.RemoteActorError as rme: if rme.type is not tractor._exceptions.StreamOverrun: raise @@ -883,19 +932,28 @@ async def uniform_rate_send( f'{sym}:{ctx.cid}@{chan.uid}' ) + # NOTE: any of these can be raised by `tractor`'s IPC + # transport-layer and we want to be highly resilient + # to consumers which crash or lose network connection. + # I.e. we **DO NOT** want to crash and propagate up to + # ``pikerd`` these kinds of errors! except ( - # NOTE: any of these can be raised by ``tractor``'s IPC - # transport-layer and we want to be highly resilient - # to consumers which crash or lose network connection. - # I.e. we **DO NOT** want to crash and propagate up to - # ``pikerd`` these kinds of errors! - trio.ClosedResourceError, - trio.BrokenResourceError, ConnectionResetError, - ): - # if the feed consumer goes down then drop - # out of this rate limiter - log.warning(f'{stream} closed') + ) + Sampler.bcast_errors as ipc_err: + match ipc_err: + case trio.EndOfChannel(): + log.info( + f'{stream} terminated by peer,\n' + f'{ipc_err!r}' + ) + case _: + # if the feed consumer goes down then drop + # out of this rate limiter + log.warning( + f'{stream} closed due to,\n' + f'{ipc_err!r}' + ) + await stream.aclose() return diff --git a/piker/data/_symcache.py b/piker/data/_symcache.py index fc1057a0..bcaa5844 100644 --- a/piker/data/_symcache.py +++ b/piker/data/_symcache.py @@ -31,6 +31,7 @@ from pathlib import Path from pprint import pformat from typing import ( Any, + Callable, Sequence, Hashable, TYPE_CHECKING, @@ -56,7 +57,7 @@ from piker.brokers import ( ) if TYPE_CHECKING: - from ..accounting import ( + from piker.accounting import ( Asset, MktPair, ) @@ -149,57 +150,68 @@ class SymbologyCache(Struct): 'Implement `Client.get_assets()`!' ) - if get_mkt_pairs := getattr(client, 'get_mkt_pairs', None): - - pairs: dict[str, Struct] = await get_mkt_pairs() - for bs_fqme, pair in pairs.items(): - - # NOTE: every backend defined pair should - # declare it's ns path for roundtrip - # serialization lookup. - if not getattr(pair, 'ns_path', None): - raise TypeError( - f'Pair-struct for {self.mod.name} MUST define a ' - '`.ns_path: str`!\n' - f'{pair}' - ) - - entry = await self.mod.get_mkt_info(pair.bs_fqme) - if not entry: - continue - - mkt: MktPair - pair: Struct - mkt, _pair = entry - assert _pair is pair, ( - f'`{self.mod.name}` backend probably has a ' - 'keying-symmetry problem between the pair-`Struct` ' - 'returned from `Client.get_mkt_pairs()`and the ' - 'module level endpoint: `.get_mkt_info()`\n\n' - "Here's the struct diff:\n" - f'{_pair - pair}' - ) - # NOTE XXX: this means backends MUST implement - # a `Struct.bs_mktid: str` field to provide - # a native-keyed map to their own symbol - # set(s). - self.pairs[pair.bs_mktid] = pair - - # NOTE: `MktPair`s are keyed here using piker's - # internal FQME schema so that search, - # accounting and feed init can be accomplished - # a sane, uniform, normalized basis. - self.mktmaps[mkt.fqme] = mkt - - self.pair_ns_path: str = tractor.msg.NamespacePath.from_ref( - pair, - ) - - else: + get_mkt_pairs: Callable|None = getattr( + client, + 'get_mkt_pairs', + None, + ) + if not get_mkt_pairs: log.warning( 'No symbology cache `Pair` support for `{provider}`..\n' 'Implement `Client.get_mkt_pairs()`!' ) + return self + + pairs: dict[str, Struct] = await get_mkt_pairs() + if not pairs: + log.warning( + 'No pairs from intial {provider!r} sym-cache request?\n\n' + '`Client.get_mkt_pairs()` -> {pairs!r} ?' + ) + return self + + for bs_fqme, pair in pairs.items(): + if not getattr(pair, 'ns_path', None): + # XXX: every backend defined pair must declare + # a `.ns_path: tractor.NamespacePath` to enable + # roundtrip serialization lookup from a local + # cache file. + raise TypeError( + f'Pair-struct for {self.mod.name} MUST define a ' + '`.ns_path: str`!\n\n' + f'{pair!r}' + ) + + entry = await self.mod.get_mkt_info(pair.bs_fqme) + if not entry: + continue + + mkt: MktPair + pair: Struct + mkt, _pair = entry + assert _pair is pair, ( + f'`{self.mod.name}` backend probably has a ' + 'keying-symmetry problem between the pair-`Struct` ' + 'returned from `Client.get_mkt_pairs()`and the ' + 'module level endpoint: `.get_mkt_info()`\n\n' + "Here's the struct diff:\n" + f'{_pair - pair}' + ) + # NOTE XXX: this means backends MUST implement + # a `Struct.bs_mktid: str` field to provide + # a native-keyed map to their own symbol + # set(s). + self.pairs[pair.bs_mktid] = pair + + # NOTE: `MktPair`s are keyed here using piker's + # internal FQME schema so that search, + # accounting and feed init can be accomplished + # a sane, uniform, normalized basis. + self.mktmaps[mkt.fqme] = mkt + + self.pair_ns_path: str = tractor.msg.NamespacePath.from_ref( + pair, + ) return self diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 256b35af..4d886fbc 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -273,7 +273,7 @@ async def _reconnect_forever( nobsws._connected.set() await trio.sleep_forever() except HandshakeError: - log.exception(f'Retrying connection') + log.exception('Retrying connection') # ws & nursery block ends @@ -359,8 +359,8 @@ async def open_autorecon_ws( ''' -JSONRPC response-request style machinery for transparent multiplexing of msgs -over a NoBsWs. +JSONRPC response-request style machinery for transparent multiplexing +of msgs over a `NoBsWs`. ''' @@ -377,43 +377,82 @@ async def open_jsonrpc_session( url: str, start_id: int = 0, response_type: type = JSONRPCResult, - request_type: Optional[type] = None, - request_hook: Optional[Callable] = None, - error_hook: Optional[Callable] = None, + msg_recv_timeout: float = float('inf'), + # ^NOTE, since only `deribit` is using this jsonrpc stuff atm + # and options mkts are generally "slow moving".. + # + # FURTHER if we break the underlying ws connection then since we + # don't pass a `fixture` to the task that manages `NoBsWs`, i.e. + # `_reconnect_forever()`, the jsonrpc "transport pipe" get's + # broken and never restored with wtv init sequence is required to + # re-establish a working req-resp session. + ) -> Callable[[str, dict], dict]: + ''' + Init a json-RPC-over-websocket connection to the provided `url`. + + A `json_rpc: Callable[[str, dict], dict` is delivered to the + caller for sending requests and a bg-`trio.Task` handles + processing of response msgs including error reporting/raising in + the parent/caller task. + + ''' + # NOTE, store all request msgs so we can raise errors on the + # caller side! + req_msgs: dict[int, dict] = {} async with ( - trio.open_nursery() as n, - open_autorecon_ws(url) as ws + trio.open_nursery() as tn, + open_autorecon_ws( + url=url, + msg_recv_timeout=msg_recv_timeout, + ) as ws ): - rpc_id: Iterable = count(start_id) + rpc_id: Iterable[int] = count(start_id) rpc_results: dict[int, dict] = {} - async def json_rpc(method: str, params: dict) -> dict: + async def json_rpc( + method: str, + params: dict, + ) -> dict: ''' perform a json rpc call and wait for the result, raise exception in case of error field present on response ''' + nonlocal req_msgs + + req_id: int = next(rpc_id) msg = { 'jsonrpc': '2.0', - 'id': next(rpc_id), + 'id': req_id, 'method': method, 'params': params } _id = msg['id'] - rpc_results[_id] = { + result = rpc_results[_id] = { 'result': None, - 'event': trio.Event() + 'error': None, + 'event': trio.Event(), # signal caller resp arrived } + req_msgs[_id] = msg await ws.send_msg(msg) + # wait for reponse before unblocking requester code await rpc_results[_id]['event'].wait() - ret = rpc_results[_id]['result'] + if (maybe_result := result['result']): + ret = maybe_result + del rpc_results[_id] - del rpc_results[_id] + else: + err = result['error'] + raise Exception( + f'JSONRPC request failed\n' + f'req: {msg}\n' + f'resp: {err}\n' + ) if ret.error is not None: raise Exception(json.dumps(ret.error, indent=4)) @@ -428,6 +467,7 @@ async def open_jsonrpc_session( the server side. ''' + nonlocal req_msgs async for msg in ws: match msg: case { @@ -451,19 +491,28 @@ async def open_jsonrpc_session( 'params': _, }: log.debug(f'Recieved\n{msg}') - if request_hook: - await request_hook(request_type(**msg)) case { 'error': error }: - log.warning(f'Recieved\n{error}') - if error_hook: - await error_hook(response_type(**msg)) + # retreive orig request msg, set error + # response in original "result" msg, + # THEN FINALLY set the event to signal caller + # to raise the error in the parent task. + req_id: int = error['id'] + req_msg: dict = req_msgs[req_id] + result: dict = rpc_results[req_id] + result['error'] = error + result['event'].set() + log.error( + f'JSONRPC request failed\n' + f'req: {req_msg}\n' + f'resp: {error}\n' + ) case _: log.warning(f'Unhandled JSON-RPC msg!?\n{msg}') - n.start_soon(recv_task) + tn.start_soon(recv_task) yield json_rpc - n.cancel_scope.cancel() + tn.cancel_scope.cancel() diff --git a/piker/data/feed.py b/piker/data/feed.py index 7264c8e6..d47d8df9 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -786,7 +786,6 @@ async def install_brokerd_search( @acm async def maybe_open_feed( - fqmes: list[str], loglevel: str | None = None, @@ -840,13 +839,12 @@ async def maybe_open_feed( @acm async def open_feed( - fqmes: list[str], - loglevel: str | None = None, + loglevel: str|None = None, allow_overruns: bool = True, start_stream: bool = True, - tick_throttle: float | None = None, # Hz + tick_throttle: float|None = None, # Hz allow_remote_ctl_ui: bool = False, diff --git a/piker/data/flows.py b/piker/data/flows.py index 677b2f69..573180b9 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -36,10 +36,10 @@ from ._sharedmem import ( ShmArray, _Token, ) +from piker.accounting import MktPair if TYPE_CHECKING: - from ..accounting import MktPair - from .feed import Feed + from piker.data.feed import Feed class Flume(Struct): @@ -82,7 +82,7 @@ class Flume(Struct): # TODO: do we need this really if we can pull the `Portal` from # ``tractor``'s internals? - feed: Feed | None = None + feed: Feed|None = None @property def rt_shm(self) -> ShmArray: diff --git a/piker/data/validate.py b/piker/data/validate.py index cefa0f1f..c5317ede 100644 --- a/piker/data/validate.py +++ b/piker/data/validate.py @@ -113,9 +113,9 @@ def validate_backend( ) if ep is None: log.warning( - f'Provider backend {mod.name} is missing ' - f'{daemon_name} support :(\n' - f'The following endpoint is missing: {name}' + f'Provider backend {mod.name!r} is missing ' + f'{daemon_name!r} support?\n' + f'|_module endpoint-func missing: {name!r}\n' ) inits: list[ diff --git a/piker/log.py b/piker/log.py index 56776e1e..dc5cfc59 100644 --- a/piker/log.py +++ b/piker/log.py @@ -19,6 +19,10 @@ Log like a forester! """ import logging import json +import reprlib +from typing import ( + Callable, +) import tractor from pygments import ( @@ -84,3 +88,29 @@ def colorize_json( # likeable styles: algol_nu, tango, monokai formatters.TerminalTrueColorFormatter(style=style) ) + + +# TODO, eventually defer to the version in `modden` once +# it becomes a dep! +def mk_repr( + **repr_kws, +) -> Callable[[str], str]: + ''' + Allocate and deliver a `repr.Repr` instance with provided input + settings using the std-lib's `reprlib` mod, + * https://docs.python.org/3/library/reprlib.html + + ------ Ex. ------ + An up to 6-layer-nested `dict` as multi-line: + - https://stackoverflow.com/a/79102479 + - https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel + + ''' + def_kws: dict[str, int] = dict( + indent=2, + maxlevel=6, # recursion levels + maxstring=66, # match editor line-len limit + ) + def_kws |= repr_kws + reprr = reprlib.Repr(**def_kws) + return reprr.repr diff --git a/piker/service/_actor_runtime.py b/piker/service/_actor_runtime.py index a4e3ccf2..837b615d 100644 --- a/piker/service/_actor_runtime.py +++ b/piker/service/_actor_runtime.py @@ -119,6 +119,10 @@ async def open_piker_runtime( # spawn other specialized daemons I think? enable_modules=enable_modules, + # TODO: how to configure this? + # keep it on by default if debug mode is set? + maybe_enable_greenback=False, + **tractor_kwargs, ) as actor, diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 6890192d..1c8ff11b 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -386,6 +386,8 @@ def ldshm( open_annot_ctl() as actl, ): shm_df: pl.DataFrame | None = None + tf2aids: dict[float, dict] = {} + for ( shmfile, shm, @@ -526,16 +528,17 @@ def ldshm( new_df, step_gaps, ) - # last chance manual overwrites in REPL - await tractor.pause() + # await tractor.pause() assert aids + tf2aids[period_s] = aids else: # allow interaction even when no ts problems. - await tractor.pause() - # assert not diff + assert not diff + await tractor.pause() + log.info('Exiting TSP shm anal-izer!') if shm_df is None: log.error( diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index bc7f10e3..8a948cab 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -161,7 +161,13 @@ class NativeStorageClient: def index_files(self): for path in self._datadir.iterdir(): - if path.name in {'borked', 'expired',}: + if ( + path.is_dir() + or + '.parquet' not in str(path) + # or + # path.name in {'borked', 'expired',} + ): continue key: str = path.name.rstrip('.parquet') diff --git a/piker/tsp/__init__.py b/piker/tsp/__init__.py index adbe484e..f10f0f75 100644 --- a/piker/tsp/__init__.py +++ b/piker/tsp/__init__.py @@ -44,8 +44,10 @@ import trio from trio_typing import TaskStatus import tractor from pendulum import ( + Interval, DateTime, Duration, + duration as mk_duration, from_timestamp, ) import numpy as np @@ -214,7 +216,8 @@ async def maybe_fill_null_segments( # pair, immediately stop backfilling? if ( start_dt - and end_dt < start_dt + and + end_dt < start_dt ): await tractor.pause() break @@ -262,6 +265,7 @@ async def maybe_fill_null_segments( except tractor.ContextCancelled: # log.exception await tractor.pause() + raise null_segs_detected.set() # RECHECK for more null-gaps @@ -349,7 +353,7 @@ async def maybe_fill_null_segments( async def start_backfill( get_hist, - frame_types: dict[str, Duration] | None, + def_frame_duration: Duration, mod: ModuleType, mkt: MktPair, shm: ShmArray, @@ -379,22 +383,23 @@ async def start_backfill( update_start_on_prepend: bool = False if backfill_until_dt is None: - # TODO: drop this right and just expose the backfill - # limits inside a [storage] section in conf.toml? - # when no tsdb "last datum" is provided, we just load - # some near-term history. - # periods = { - # 1: {'days': 1}, - # 60: {'days': 14}, - # } - - # do a decently sized backfill and load it into storage. + # TODO: per-provider default history-durations? + # -[ ] inside the `open_history_client()` config allow + # declaring the history duration limits instead of + # guessing and/or applying the same limits to all? + # + # -[ ] allow declaring (default) per-provider backfill + # limits inside a [storage] sub-section in conf.toml? + # + # NOTE, when no tsdb "last datum" is provided, we just + # load some near-term history by presuming a "decently + # large" 60s duration limit and a much shorter 1s range. periods = { 1: {'days': 2}, 60: {'years': 6}, } period_duration: int = periods[timeframe] - update_start_on_prepend = True + update_start_on_prepend: bool = True # NOTE: manually set the "latest" datetime which we intend to # backfill history "until" so as to adhere to the history @@ -416,7 +421,6 @@ async def start_backfill( f'backfill_until_dt: {backfill_until_dt}\n' f'last_start_dt: {last_start_dt}\n' ) - try: ( array, @@ -426,71 +430,114 @@ async def start_backfill( timeframe, end_dt=last_start_dt, ) - except NoData as _daterr: - # 3 cases: - # - frame in the middle of a legit venue gap - # - history actually began at the `last_start_dt` - # - some other unknown error (ib blocking the - # history bc they don't want you seeing how they - # cucked all the tinas..) - if dur := frame_types.get(timeframe): - # decrement by a frame's worth of duration and - # retry a few times. - last_start_dt.subtract( - seconds=dur.total_seconds() + orig_last_start_dt: datetime = last_start_dt + gap_report: str = ( + f'EMPTY FRAME for `end_dt: {last_start_dt}`?\n' + f'{mod.name} -> tf@fqme: {timeframe}@{mkt.fqme}\n' + f'last_start_dt: {orig_last_start_dt}\n\n' + f'bf_until: {backfill_until_dt}\n' + ) + # EMPTY FRAME signal with 3 (likely) causes: + # + # 1. range contains legit gap in venue history + # 2. history actually (edge case) **began** at the + # value `last_start_dt` + # 3. some other unknown error (ib blocking the + # history-query bc they don't want you seeing how + # they cucked all the tinas.. like with options + # hist) + # + if def_frame_duration: + # decrement by a duration's (frame) worth of time + # as maybe indicated by the backend to see if we + # can get older data before this possible + # "history gap". + last_start_dt: datetime = last_start_dt.subtract( + seconds=def_frame_duration.total_seconds() ) - log.warning( - f'{mod.name} -> EMPTY FRAME for end_dt?\n' - f'tf@fqme: {timeframe}@{mkt.fqme}\n' - 'bf_until <- last_start_dt:\n' - f'{backfill_until_dt} <- {last_start_dt}\n' - f'Decrementing `end_dt` by {dur} and retry..\n' + gap_report += ( + f'Decrementing `end_dt` and retrying with,\n' + f'def_frame_duration: {def_frame_duration}\n' + f'(new) last_start_dt: {last_start_dt}\n' ) + log.warning(gap_report) + # skip writing to shm/tsdb and try the next + # duration's worth of prior history. continue - # broker says there never was or is no more history to pull - except DataUnavailable: - log.warning( - f'NO-MORE-DATA in range?\n' - f'`{mod.name}` halted history:\n' - f'tf@fqme: {timeframe}@{mkt.fqme}\n' - 'bf_until <- last_start_dt:\n' - f'{backfill_until_dt} <- {last_start_dt}\n' - ) + else: + # await tractor.pause() + raise DataUnavailable(gap_report) - # ugh, what's a better way? - # TODO: fwiw, we probably want a way to signal a throttle - # condition (eg. with ib) so that we can halt the - # request loop until the condition is resolved? - if timeframe > 1: - await tractor.pause() + # broker says there never was or is no more history to pull + except DataUnavailable as due: + message: str = due.args[0] + log.warning( + f'Provider {mod.name!r} halted backfill due to,\n\n' + + f'{message}\n' + + f'fqme: {mkt.fqme}\n' + f'timeframe: {timeframe}\n' + f'last_start_dt: {last_start_dt}\n' + f'bf_until: {backfill_until_dt}\n' + ) + # UGH: what's a better way? + # TODO: backends are responsible for being correct on + # this right!? + # -[ ] in the `ib` case we could maybe offer some way + # to halt the request loop until the condition is + # resolved or should the backend be entirely in + # charge of solving such faults? yes, right? return + time: np.ndarray = array['time'] assert ( - array['time'][0] + time[0] == next_start_dt.timestamp() ) - diff = last_start_dt - next_start_dt - frame_time_diff_s = diff.seconds + assert time[-1] == next_end_dt.timestamp() + + expected_dur: Interval = last_start_dt - next_start_dt # frame's worth of sample-period-steps, in seconds frame_size_s: float = len(array) * timeframe - expected_frame_size_s: float = frame_size_s + timeframe - if frame_time_diff_s > expected_frame_size_s: - + recv_frame_dur: Duration = ( + from_timestamp(array[-1]['time']) + - + from_timestamp(array[0]['time']) + ) + if ( + (lt_frame := (recv_frame_dur < expected_dur)) + or + (null_frame := (frame_size_s == 0)) + # ^XXX, should NEVER hit now! + ): # XXX: query result includes a start point prior to our # expected "frame size" and thus is likely some kind of # history gap (eg. market closed period, outage, etc.) # so just report it to console for now. + if lt_frame: + reason = 'Possible GAP (or first-datum)' + else: + assert null_frame + reason = 'NULL-FRAME' + + missing_dur: Interval = expected_dur.end - recv_frame_dur.end log.warning( - 'GAP DETECTED:\n' - f'last_start_dt: {last_start_dt}\n' - f'diff: {diff}\n' - f'frame_time_diff_s: {frame_time_diff_s}\n' + f'{timeframe}s-series {reason} detected!\n' + f'fqme: {mkt.fqme}\n' + f'last_start_dt: {last_start_dt}\n\n' + f'recv interval: {recv_frame_dur}\n' + f'expected interval: {expected_dur}\n\n' + + f'Missing duration of history of {missing_dur.in_words()!r}\n' + f'{missing_dur}\n' ) + # await tractor.pause() to_push = diff_history( array, @@ -565,22 +612,27 @@ async def start_backfill( # long-term storage. if ( storage is not None - and write_tsdb + and + write_tsdb ): log.info( f'Writing {ln} frame to storage:\n' f'{next_start_dt} -> {last_start_dt}' ) - # always drop the src asset token for + # NOTE, always drop the src asset token for # non-currency-pair like market types (for now) + # + # THAT IS, for now our table key schema is NOT + # including the dst[/src] source asset token. SO, + # 'tsla.nasdaq.ib' over 'tsla/usd.nasdaq.ib' for + # historical reasons ONLY. if mkt.dst.atype not in { 'crypto', 'crypto_currency', 'fiat', # a "forex pair" + 'perpetual_future', # stupid "perps" from cex land }: - # for now, our table key schema is not including - # the dst[/src] source asset token. col_sym_key: str = mkt.get_fqme( delim_char='', without_src=True, @@ -685,7 +737,7 @@ async def back_load_from_tsdb( last_tsdb_dt and latest_start_dt ): - backfilled_size_s = ( + backfilled_size_s: Duration = ( latest_start_dt - last_tsdb_dt ).seconds # if the shm buffer len is not large enough to contain @@ -908,6 +960,8 @@ async def tsdb_backfill( f'{pformat(config)}\n' ) + # concurrently load the provider's most-recent-frame AND any + # pre-existing tsdb history already saved in `piker` storage. dt_eps: list[DateTime, DateTime] = [] async with trio.open_nursery() as tn: tn.start_soon( @@ -918,7 +972,6 @@ async def tsdb_backfill( timeframe, config, ) - tsdb_entry: tuple = await load_tsdb_hist( storage, mkt, @@ -947,6 +1000,25 @@ async def tsdb_backfill( mr_end_dt, ) = dt_eps + first_frame_dur_s: Duration = (mr_end_dt - mr_start_dt).seconds + calced_frame_size: Duration = mk_duration( + seconds=first_frame_dur_s, + ) + # NOTE, attempt to use the backend declared default frame + # sizing (as allowed by their time-series query APIs) and + # if not provided try to construct a default from the + # first frame received above. + def_frame_durs: dict[ + int, + Duration, + ]|None = config.get('frame_types', None) + if def_frame_durs: + def_frame_size: Duration = def_frame_durs[timeframe] + assert def_frame_size == calced_frame_size + else: + # use what we calced from first frame above. + def_frame_size = calced_frame_size + # NOTE: when there's no offline data, there's 2 cases: # - data backend doesn't support timeframe/sample # period (in which case `dt_eps` should be `None` and @@ -977,7 +1049,7 @@ async def tsdb_backfill( partial( start_backfill, get_hist=get_hist, - frame_types=config.get('frame_types', None), + def_frame_duration=def_frame_size, mod=mod, mkt=mkt, shm=shm, diff --git a/piker/tsp/_anal.py b/piker/tsp/_anal.py index c34a0c3a..ea78c46a 100644 --- a/piker/tsp/_anal.py +++ b/piker/tsp/_anal.py @@ -616,6 +616,18 @@ def detect_price_gaps( # ]) ... +# TODO: probably just use the null_segs impl above? +def detect_vlm_gaps( + df: pl.DataFrame, + col: str = 'volume', + +) -> pl.DataFrame: + + vnull: pl.DataFrame = w_dts.filter( + pl.col(col) == 0 + ) + return vnull + def dedupe( src_df: pl.DataFrame, @@ -626,7 +638,6 @@ def dedupe( ) -> tuple[ pl.DataFrame, # with dts - pl.DataFrame, # gaps pl.DataFrame, # with deduplicated dts (aka gap/repeat removal) int, # len diff between input and deduped ]: @@ -639,19 +650,22 @@ def dedupe( ''' wdts: pl.DataFrame = with_dts(src_df) - # maybe sort on any time field - if sort: - wdts = wdts.sort(by='time') - # TODO: detect out-of-order segments which were corrected! - # -[ ] report in log msg - # -[ ] possibly return segment sections which were moved? + deduped = wdts # remove duplicated datetime samples/sections deduped: pl.DataFrame = wdts.unique( - subset=['dt'], + # subset=['dt'], + subset=['time'], maintain_order=True, ) + # maybe sort on any time field + if sort: + deduped = deduped.sort(by='time') + # TODO: detect out-of-order segments which were corrected! + # -[ ] report in log msg + # -[ ] possibly return segment sections which were moved? + diff: int = ( wdts.height - diff --git a/piker/types.py b/piker/types.py index cda3fb44..385f83b0 100644 --- a/piker/types.py +++ b/piker/types.py @@ -21,230 +21,4 @@ Extensions to built-in or (heavily used but 3rd party) friend-lib types. ''' -from __future__ import annotations -from collections import UserList -from pprint import ( - saferepr, -) -from typing import Any - -from msgspec import ( - msgpack, - Struct as _Struct, - structs, -) - - -class DiffDump(UserList): - ''' - Very simple list delegator that repr() dumps (presumed) tuple - elements of the form `tuple[str, Any, Any]` in a nice - multi-line readable form for analyzing `Struct` diffs. - - ''' - def __repr__(self) -> str: - if not len(self): - return super().__repr__() - - # format by displaying item pair's ``repr()`` on multiple, - # indented lines such that they are more easily visually - # comparable when printed to console when printed to - # console. - repstr: str = '[\n' - for k, left, right in self: - repstr += ( - f'({k},\n' - f'\t{repr(left)},\n' - f'\t{repr(right)},\n' - ')\n' - ) - repstr += ']\n' - return repstr - - -class Struct( - _Struct, - - # https://jcristharif.com/msgspec/structs.html#tagged-unions - # tag='pikerstruct', - # tag=True, -): - ''' - A "human friendlier" (aka repl buddy) struct subtype. - - ''' - def _sin_props(self) -> Iterator[ - tuple[ - structs.FieldIinfo, - str, - Any, - ] - ]: - ''' - Iterate over all non-@property fields of this struct. - - ''' - fi: structs.FieldInfo - for fi in structs.fields(self): - key: str = fi.name - val: Any = getattr(self, key) - yield fi, key, val - - def to_dict( - self, - include_non_members: bool = True, - - ) -> dict: - ''' - Like it sounds.. direct delegation to: - https://jcristharif.com/msgspec/api.html#msgspec.structs.asdict - - BUT, by default we pop all non-member (aka not defined as - struct fields) fields by default. - - ''' - asdict: dict = structs.asdict(self) - if include_non_members: - return asdict - - # only return a dict of the struct members - # which were provided as input, NOT anything - # added as type-defined `@property` methods! - sin_props: dict = {} - fi: structs.FieldInfo - for fi, k, v in self._sin_props(): - sin_props[k] = asdict[k] - - return sin_props - - def pformat( - self, - field_indent: int = 2, - indent: int = 0, - - ) -> str: - ''' - Recursion-safe `pprint.pformat()` style formatting of - a `msgspec.Struct` for sane reading by a human using a REPL. - - ''' - # global whitespace indent - ws: str = ' '*indent - - # field whitespace indent - field_ws: str = ' '*(field_indent + indent) - - # qtn: str = ws + self.__class__.__qualname__ - qtn: str = self.__class__.__qualname__ - - obj_str: str = '' # accumulator - fi: structs.FieldInfo - k: str - v: Any - for fi, k, v in self._sin_props(): - - # TODO: how can we prefer `Literal['option1', 'option2, - # ..]` over .__name__ == `Literal` but still get only the - # latter for simple types like `str | int | None` etc..? - ft: type = fi.type - typ_name: str = getattr(ft, '__name__', str(ft)) - - # recurse to get sub-struct's `.pformat()` output Bo - if isinstance(v, Struct): - val_str: str = v.pformat( - indent=field_indent + indent, - field_indent=indent + field_indent, - ) - - else: # the `pprint` recursion-safe format: - # https://docs.python.org/3.11/library/pprint.html#pprint.saferepr - val_str: str = saferepr(v) - - obj_str += (field_ws + f'{k}: {typ_name} = {val_str},\n') - - return ( - f'{qtn}(\n' - f'{obj_str}' - f'{ws})' - ) - - # TODO: use a pprint.PrettyPrinter instance around ONLY rendering - # inside a known tty? - # def __repr__(self) -> str: - # ... - - # __str__ = __repr__ = pformat - __repr__ = pformat - - def copy( - self, - update: dict | None = None, - - ) -> Struct: - ''' - Validate-typecast all self defined fields, return a copy of - us with all such fields. - - NOTE: This is kinda like the default behaviour in - `pydantic.BaseModel` except a copy of the object is - returned making it compat with `frozen=True`. - - ''' - if update: - for k, v in update.items(): - setattr(self, k, v) - - # NOTE: roundtrip serialize to validate - # - enode to msgpack binary format, - # - decode that back to a struct. - return msgpack.Decoder(type=type(self)).decode( - msgpack.Encoder().encode(self) - ) - - def typecast( - self, - - # TODO: allow only casting a named subset? - # fields: set[str] | None = None, - - ) -> None: - ''' - Cast all fields using their declared type annotations - (kinda like what `pydantic` does by default). - - NOTE: this of course won't work on frozen types, use - ``.copy()`` above in such cases. - - ''' - # https://jcristharif.com/msgspec/api.html#msgspec.structs.fields - fi: structs.FieldInfo - for fi in structs.fields(self): - setattr( - self, - fi.name, - fi.type(getattr(self, fi.name)), - ) - - def __sub__( - self, - other: Struct, - - ) -> DiffDump[tuple[str, Any, Any]]: - ''' - Compare fields/items key-wise and return a ``DiffDump`` - for easy visual REPL comparison B) - - ''' - diffs: DiffDump[tuple[str, Any, Any]] = DiffDump() - for fi in structs.fields(self): - attr_name: str = fi.name - ours: Any = getattr(self, attr_name) - theirs: Any = getattr(other, attr_name) - if ours != theirs: - diffs.append(( - attr_name, - ours, - theirs, - )) - - return diffs +from tractor.msg import Struct as Struct