From b71d0533b257207f93c02bb01519682c8e6930b9 Mon Sep 17 00:00:00 2001 From: goodboy Date: Sat, 14 Mar 2026 16:49:55 -0400 Subject: [PATCH 1/3] Port `_sharedmem` to thin shim over `tractor.ipc._shm` Replace the ~716 line `piker.data._sharedmem` mod with a thin re-export shim consuming `tractor.ipc._shm` types directly, since the `tractor` version is the refined factoring of piker's original impl. Deats, - Re-export `SharedInt`, `ShmArray`, `ShmList`, `get_shm_token`, `_known_tokens` directly - Alias renames: `NDToken as _Token`, `open_shm_ndarray as open_shm_array`, `attach_shm_ndarray as attach_shm_array` - Keep `_make_token()` wrapper for piker's default dtype fallback to `def_iohlcv_fields` - Keep `maybe_open_shm_array()` wrapper preserving piker's historical defaults (`readonly=False`, `append_start_index=None`) - Keep `try_read()` race-condition guard (not in `tractor`) All 13 import sites across piker continue to work unchanged with no modifications needed. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/data/_sharedmem.py | 714 +++++---------------------------------- 1 file changed, 88 insertions(+), 626 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index acb8070f..3c7857b0 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -1,151 +1,51 @@ # piker: trading gear for hackers # Copyright (C) Tyler Goodlet (in stewardship for pikers) -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. +# This program is free software: you can redistribute it +# and/or modify it under the terms of the GNU Affero General +# Public License as published by the Free Software +# Foundation, either version 3 of the License, or (at your +# option) any later version. -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. +# This program is distributed in the hope that it will be +# useful, but WITHOUT ANY WARRANTY; without even the implied +# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR +# PURPOSE. See the GNU Affero General Public License for +# more details. -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . +# You should have received a copy of the GNU Affero General +# Public License along with this program. If not, see +# . -""" -NumPy compatible shared memory buffers for real-time IPC streaming. +''' +NumPy shared memory buffers for real-time IPC streaming. -""" -from __future__ import annotations -from sys import byteorder -import time +Thin shim over ``tractor.ipc._shm`` providing +backward-compatible aliases for piker's historical API. + +''' from typing import Optional -from multiprocessing.shared_memory import SharedMemory, _USE_POSIX -if _USE_POSIX: - from _posixshmem import shm_unlink - -# import msgspec import numpy as np -from numpy.lib import recfunctions as rfn -import tractor + +from tractor.ipc._shm import ( + SharedInt, + ShmArray, + ShmList, + + NDToken as _Token, + + open_shm_ndarray as open_shm_array, + attach_shm_ndarray as attach_shm_array, + open_shm_list, + attach_shm_list, + + get_shm_token, + _known_tokens, + _make_token as _tractor_make_token, +) from ._util import log -from ._source import def_iohlcv_fields -from piker.types import Struct - - -def cuckoff_mantracker(): - ''' - Disable all ``multiprocessing``` "resource tracking" machinery since - it's an absolute multi-threaded mess of non-SC madness. - - ''' - from multiprocessing import resource_tracker as mantracker - - # Tell the "resource tracker" thing to fuck off. - class ManTracker(mantracker.ResourceTracker): - def register(self, name, rtype): - pass - - def unregister(self, name, rtype): - pass - - def ensure_running(self): - pass - - # "know your land and know your prey" - # https://www.dailymotion.com/video/x6ozzco - mantracker._resource_tracker = ManTracker() - mantracker.register = mantracker._resource_tracker.register - mantracker.ensure_running = mantracker._resource_tracker.ensure_running - mantracker.unregister = mantracker._resource_tracker.unregister - mantracker.getfd = mantracker._resource_tracker.getfd - - -cuckoff_mantracker() - - -class SharedInt: - """Wrapper around a single entry shared memory array which - holds an ``int`` value used as an index counter. - - """ - def __init__( - self, - shm: SharedMemory, - ) -> None: - self._shm = shm - - @property - def value(self) -> int: - return int.from_bytes(self._shm.buf, byteorder) - - @value.setter - def value(self, value) -> None: - self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder) - - def destroy(self) -> None: - if _USE_POSIX: - # We manually unlink to bypass all the "resource tracker" - # nonsense meant for non-SC systems. - name = self._shm.name - try: - shm_unlink(name) - except FileNotFoundError: - # might be a teardown race here? - log.warning(f'Shm for {name} already unlinked?') - - -class _Token(Struct, frozen=True): - ''' - Internal represenation of a shared memory "token" - which can be used to key a system wide post shm entry. - - ''' - shm_name: str # this servers as a "key" value - shm_first_index_name: str - shm_last_index_name: str - dtype_descr: tuple - size: int # in struct-array index / row terms - - @property - def dtype(self) -> np.dtype: - return np.dtype(list(map(tuple, self.dtype_descr))).descr - - def as_msg(self): - return self.to_dict() - - @classmethod - def from_msg(cls, msg: dict) -> _Token: - if isinstance(msg, _Token): - return msg - - # TODO: native struct decoding - # return _token_dec.decode(msg) - - msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) - return _Token(**msg) - - -# _token_dec = msgspec.msgpack.Decoder(_Token) - -# TODO: this api? -# _known_tokens = tractor.ActorVar('_shm_tokens', {}) -# _known_tokens = tractor.ContextStack('_known_tokens', ) -# _known_tokens = trio.RunVar('shms', {}) - -# process-local store of keys to tokens -_known_tokens = {} - - -def get_shm_token(key: str) -> _Token: - """Convenience func to check if a token - for the provided key is known by this process. - """ - return _known_tokens.get(key) def _make_token( @@ -154,494 +54,44 @@ def _make_token( dtype: Optional[np.dtype] = None, ) -> _Token: ''' - Create a serializable token that can be used - to access a shared array. + Wrap tractor's ``_make_token()`` with piker's + default dtype fallback to ``def_iohlcv_fields``. ''' - dtype = def_iohlcv_fields if dtype is None else dtype - return _Token( - shm_name=key, - shm_first_index_name=key + "_first", - shm_last_index_name=key + "_last", - dtype_descr=tuple(np.dtype(dtype).descr), - size=size, + from ._source import def_iohlcv_fields + dtype = ( + def_iohlcv_fields + if dtype is None + else dtype ) - - -class ShmArray: - ''' - A shared memory ``numpy`` (compatible) array API. - - An underlying shared memory buffer is allocated based on - a user specified ``numpy.ndarray``. This fixed size array - can be read and written to by pushing data both onto the "front" - or "back" of a set index range. The indexes for the "first" and - "last" index are themselves stored in shared memory (accessed via - ``SharedInt`` interfaces) values such that multiple processes can - interact with the same array using a synchronized-index. - - ''' - def __init__( - self, - shmarr: np.ndarray, - first: SharedInt, - last: SharedInt, - shm: SharedMemory, - # readonly: bool = True, - ) -> None: - self._array = shmarr - - # indexes for first and last indices corresponding - # to fille data - self._first = first - self._last = last - - self._len = len(shmarr) - self._shm = shm - self._post_init: bool = False - - # pushing data does not write the index (aka primary key) - dtype = shmarr.dtype - if dtype.fields: - self._write_fields = list(shmarr.dtype.fields.keys())[1:] - else: - self._write_fields = None - - # TODO: ringbuf api? - - @property - def _token(self) -> _Token: - return _Token( - shm_name=self._shm.name, - shm_first_index_name=self._first._shm.name, - shm_last_index_name=self._last._shm.name, - dtype_descr=tuple(self._array.dtype.descr), - size=self._len, - ) - - @property - def token(self) -> dict: - """Shared memory token that can be serialized and used by - another process to attach to this array. - """ - return self._token.as_msg() - - @property - def index(self) -> int: - return self._last.value % self._len - - @property - def array(self) -> np.ndarray: - ''' - Return an up-to-date ``np.ndarray`` view of the - so-far-written data to the underlying shm buffer. - - ''' - a = self._array[self._first.value:self._last.value] - - # first, last = self._first.value, self._last.value - # a = self._array[first:last] - - # TODO: eventually comment this once we've not seen it in the - # wild in a long time.. - # XXX: race where first/last indexes cause a reader - # to load an empty array.. - if len(a) == 0 and self._post_init: - raise RuntimeError('Empty array race condition hit!?') - - return a - - def ustruct( - self, - fields: Optional[list[str]] = None, - - # type that all field values will be cast to - # in the returned view. - common_dtype: np.dtype = float, - - ) -> np.ndarray: - - array = self._array - - if fields: - selection = array[fields] - # fcount = len(fields) - else: - selection = array - # fcount = len(array.dtype.fields) - - # XXX: manual ``.view()`` attempt that also doesn't work. - # uview = selection.view( - # dtype=' np.ndarray: - ''' - Return the last ``length``'s worth of ("row") entries from the - array. - - ''' - return self.array[-length:] - - def push( - self, - data: np.ndarray, - - field_map: Optional[dict[str, str]] = None, - prepend: bool = False, - update_first: bool = True, - start: int | None = None, - - ) -> int: - ''' - Ring buffer like "push" to append data - into the buffer and return updated "last" index. - - NB: no actual ring logic yet to give a "loop around" on overflow - condition, lel. - - ''' - length = len(data) - - if prepend: - index = (start or self._first.value) - length - - if index < 0: - raise ValueError( - f'Array size of {self._len} was overrun during prepend.\n' - f'You have passed {abs(index)} too many datums.' - ) - - else: - index = start if start is not None else self._last.value - - end = index + length - - if field_map: - src_names, dst_names = zip(*field_map.items()) - else: - dst_names = src_names = self._write_fields - - try: - self._array[ - list(dst_names) - ][index:end] = data[list(src_names)][:] - - # NOTE: there was a race here between updating - # the first and last indices and when the next reader - # tries to access ``.array`` (which due to the index - # overlap will be empty). Pretty sure we've fixed it now - # but leaving this here as a reminder. - if ( - prepend - and update_first - and length - ): - assert index < self._first.value - - if ( - index < self._first.value - and update_first - ): - assert prepend, 'prepend=True not passed but index decreased?' - self._first.value = index - - elif not prepend: - self._last.value = end - - self._post_init = True - return end - - except ValueError as err: - if field_map: - raise - - # should raise if diff detected - self.diff_err_fields(data) - raise err - - def diff_err_fields( - self, - data: np.ndarray, - ) -> None: - # reraise with any field discrepancy - our_fields, their_fields = ( - set(self._array.dtype.fields), - set(data.dtype.fields), - ) - - only_in_ours = our_fields - their_fields - only_in_theirs = their_fields - our_fields - - if only_in_ours: - raise TypeError( - f"Input array is missing field(s): {only_in_ours}" - ) - elif only_in_theirs: - raise TypeError( - f"Input array has unknown field(s): {only_in_theirs}" - ) - - # TODO: support "silent" prepends that don't update ._first.value? - def prepend( - self, - data: np.ndarray, - ) -> int: - end = self.push(data, prepend=True) - assert end - - def close(self) -> None: - self._first._shm.close() - self._last._shm.close() - self._shm.close() - - def destroy(self) -> None: - if _USE_POSIX: - # We manually unlink to bypass all the "resource tracker" - # nonsense meant for non-SC systems. - shm_unlink(self._shm.name) - - self._first.destroy() - self._last.destroy() - - def flush(self) -> None: - # TODO: flush to storage backend like markestore? - ... - - -def open_shm_array( - size: int, - key: str | None = None, - dtype: np.dtype | None = None, - append_start_index: int | None = None, - readonly: bool = False, - -) -> ShmArray: - '''Open a memory shared ``numpy`` using the standard library. - - This call unlinks (aka permanently destroys) the buffer on teardown - and thus should be used from the parent-most accessor (process). - - ''' - # create new shared mem segment for which we - # have write permission - a = np.zeros(size, dtype=dtype) - a['index'] = np.arange(len(a)) - - shm = SharedMemory( - name=key, - create=True, - size=a.nbytes - ) - array = np.ndarray( - a.shape, - dtype=a.dtype, - buffer=shm.buf - ) - array[:] = a[:] - array.setflags(write=int(not readonly)) - - token = _make_token( + return _tractor_make_token( key=key, size=size, dtype=dtype, ) - # create single entry arrays for storing an first and last indices - first = SharedInt( - shm=SharedMemory( - name=token.shm_first_index_name, - create=True, - size=4, # std int - ) - ) - - last = SharedInt( - shm=SharedMemory( - name=token.shm_last_index_name, - create=True, - size=4, # std int - ) - ) - - # start the "real-time" updated section after 3-days worth of 1s - # sampled OHLC. this allows appending up to a days worth from - # tick/quote feeds before having to flush to a (tsdb) storage - # backend, and looks something like, - # ------------------------- - # | | i - # _________________________ - # <-------------> <-------> - # history real-time - # - # Once fully "prepended", the history section will leave the - # ``ShmArray._start.value: int = 0`` and the yet-to-be written - # real-time section will start at ``ShmArray.index: int``. - - # this sets the index to nearly 2/3rds into the the length of - # the buffer leaving at least a "days worth of second samples" - # for the real-time section. - if append_start_index is None: - append_start_index = round(size * 0.616) - - last.value = first.value = append_start_index - - shmarr = ShmArray( - array, - first, - last, - shm, - ) - - assert shmarr._token == token - _known_tokens[key] = shmarr.token - - # "unlink" created shm on process teardown by - # pushing teardown calls onto actor context stack - stack = tractor.current_actor( - err_on_no_runtime=False, - ).lifetime_stack - if stack: - stack.callback(shmarr.close) - stack.callback(shmarr.destroy) - - return shmarr - - -def attach_shm_array( - token: tuple[str, str, tuple[str, str]], - readonly: bool = True, - -) -> ShmArray: - ''' - Attach to an existing shared memory array previously - created by another process using ``open_shared_array``. - - No new shared mem is allocated but wrapper types for read/write - access are constructed. - - ''' - token = _Token.from_msg(token) - key = token.shm_name - - if key in _known_tokens: - assert _Token.from_msg(_known_tokens[key]) == token, "WTF" - - # XXX: ugh, looks like due to the ``shm_open()`` C api we can't - # actually place files in a subdir, see discussion here: - # https://stackoverflow.com/a/11103289 - - # attach to array buffer and view as per dtype - _err: Optional[Exception] = None - for _ in range(3): - try: - shm = SharedMemory( - name=key, - create=False, - ) - break - except OSError as oserr: - _err = oserr - time.sleep(0.1) - else: - if _err: - raise _err - - shmarr = np.ndarray( - (token.size,), - dtype=token.dtype, - buffer=shm.buf - ) - shmarr.setflags(write=int(not readonly)) - - first = SharedInt( - shm=SharedMemory( - name=token.shm_first_index_name, - create=False, - size=4, # std int - ), - ) - last = SharedInt( - shm=SharedMemory( - name=token.shm_last_index_name, - create=False, - size=4, # std int - ), - ) - - # make sure we can read - first.value - - sha = ShmArray( - shmarr, - first, - last, - shm, - ) - # read test - sha.array - - # Stash key -> token knowledge for future queries - # via `maybe_opepn_shm_array()` but only after we know - # we can attach. - if key not in _known_tokens: - _known_tokens[key] = token - - # "close" attached shm on actor teardown - if (actor := tractor.current_actor( - err_on_no_runtime=False, - )): - actor.lifetime_stack.callback(sha.close) - - return sha - def maybe_open_shm_array( key: str, size: int, - dtype: np.dtype | None = None, - append_start_index: int | None = None, + dtype: np.dtype|None = None, + append_start_index: int|None = None, readonly: bool = False, **kwargs, - ) -> tuple[ShmArray, bool]: ''' - Attempt to attach to a shared memory block using a "key" lookup - to registered blocks in the users overall "system" registry - (presumes you don't have the block's explicit token). + Attempt to attach to a shared memory block + using a "key" lookup to registered blocks in + the user's overall "system" registry (presumes + you don't have the block's explicit token). - This function is meant to solve the problem of discovering whether - a shared array token has been allocated or discovered by the actor - running in **this** process. Systems where multiple actors may seek - to access a common block can use this function to attempt to acquire - a token as discovered by the actors who have previously stored - a "key" -> ``_Token`` map in an actor local (aka python global) - variable. + This is a thin wrapper around tractor's + ``maybe_open_shm_ndarray()`` preserving piker's + historical defaults (``readonly=False``, + ``append_start_index=None``). - If you know the explicit ``_Token`` for your memory segment instead - use ``attach_shm_array``. + If you know the explicit ``_Token`` for your + memory segment instead use ``attach_shm_array``. ''' try: @@ -655,7 +105,9 @@ def maybe_open_shm_array( False, ) except KeyError: - log.debug(f"Could not find {key} in shms cache") + log.debug( + f'Could not find {key} in shms cache' + ) if dtype: token = _make_token( key, @@ -663,9 +115,18 @@ def maybe_open_shm_array( dtype=dtype, ) try: - return attach_shm_array(token=token, **kwargs), False + return ( + attach_shm_array( + token=token, + **kwargs, + ), + False, + ) except FileNotFoundError: - log.debug(f"Could not attach to shm with token {token}") + log.debug( + f'Could not attach to shm' + f' with token {token}' + ) # This actor does not know about memory # associated with the provided "key". @@ -683,18 +144,20 @@ def maybe_open_shm_array( True, ) -def try_read( - array: np.ndarray +def try_read( + array: np.ndarray, ) -> Optional[np.ndarray]: ''' - Try to read the last row from a shared mem array or ``None`` - if the array read returns a zero-length array result. + Try to read the last row from a shared mem + array or ``None`` if the array read returns + a zero-length array result. - Can be used to check for backfilling race conditions where an array - is currently being (re-)written by a writer actor but the reader is - unaware and reads during the window where the first and last indexes - are being updated. + Can be used to check for backfilling race + conditions where an array is currently being + (re-)written by a writer actor but the reader + is unaware and reads during the window where + the first and last indexes are being updated. ''' try: @@ -702,14 +165,13 @@ def try_read( except IndexError: # XXX: race condition with backfilling shm. # - # the underlying issue is that a backfill (aka prepend) and subsequent - # shm array first/last index update could result in an empty array - # read here since the indices may be updated in such a way that - # a read delivers an empty array (though it seems like we - # *should* be able to prevent that?). also, as and alt and - # something we need anyway, maybe there should be some kind of - # signal that a prepend is taking place and this consumer can - # respond (eg. redrawing graphics) accordingly. + # the underlying issue is that a backfill + # (aka prepend) and subsequent shm array + # first/last index update could result in an + # empty array read here since the indices may + # be updated in such a way that a read delivers + # an empty array (though it seems like we + # *should* be able to prevent that?). - # the array read was emtpy + # the array read was empty return None -- 2.34.1 From 0a2059d00fe63878e49a71070937bd01e11bca2d Mon Sep 17 00:00:00 2001 From: goodboy Date: Sat, 14 Mar 2026 17:11:42 -0400 Subject: [PATCH 2/3] Use `tractor.ipc._shm` types directly across codebase MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Port all 16 internal import sites from re-exporting via `piker.data._sharedmem` shim to importing core shm types directly from `tractor.ipc._shm`. Deats, - `ShmArray` now imported from tractor in 10 files. - `_Token` renamed to `NDToken` everywhere (5 files). - `attach_shm_array` → `attach_shm_ndarray` at all call sites. - `data/__init__.py` sources `ShmArray`, `get_shm_token` from tractor; keeps `open/attach_shm_array` as public API aliases. - Trim shim to only piker-specific wrappers: `_make_token()`, `maybe_open_shm_array()`, `try_read()`. - Drop `Optional` usage in shim, use `|None`. (this patch was generated in some part by [`claude-code`][claude-code-gh]) [claude-code-gh]: https://github.com/anthropics/claude-code --- piker/brokers/deribit/feed.py | 2 +- piker/data/__init__.py | 10 ++++----- piker/data/_formatters.py | 4 +--- piker/data/_sampling.py | 14 +++++------- piker/data/_sharedmem.py | 41 +++++++++++++++-------------------- piker/data/flows.py | 14 ++++++------ piker/fsp/_api.py | 14 ++++++------ piker/fsp/_engine.py | 10 ++++----- piker/fsp/_momo.py | 2 +- piker/fsp/_volume.py | 2 +- piker/storage/cli.py | 4 +--- piker/storage/nativedb.py | 6 ++--- piker/tsp/_history.py | 8 +++---- piker/ui/_chart.py | 2 +- piker/ui/_dataviz.py | 4 +--- piker/ui/_fsp.py | 12 +++++----- 16 files changed, 65 insertions(+), 84 deletions(-) diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 821aab87..94fb3f89 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -32,7 +32,7 @@ import tractor from piker.brokers import open_cached_client from piker.log import get_logger, get_console_log -from piker.data import ShmArray +from tractor.ipc._shm import ShmArray from piker.brokers._util import ( BrokerError, DataUnavailable, diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 207eeaa1..d92c33ca 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -23,13 +23,13 @@ sharing live streams over a network. """ from .ticktools import iterticks -from ._sharedmem import ( - maybe_open_shm_array, - attach_shm_array, - open_shm_array, - get_shm_token, +from tractor.ipc._shm import ( ShmArray, + get_shm_token, + open_shm_ndarray as open_shm_array, + attach_shm_ndarray as attach_shm_array, ) +from ._sharedmem import maybe_open_shm_array from ._source import ( def_iohlcv_fields, def_ohlcv_fields, diff --git a/piker/data/_formatters.py b/piker/data/_formatters.py index 7c3058bb..7cc793e9 100644 --- a/piker/data/_formatters.py +++ b/piker/data/_formatters.py @@ -28,9 +28,7 @@ from msgspec import field import numpy as np from numpy.lib import recfunctions as rfn -from ._sharedmem import ( - ShmArray, -) +from tractor.ipc._shm import ShmArray from ._pathops import ( path_arrays_from_ohlc, ) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 74ecf114..02420bb0 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -55,9 +55,7 @@ from ._util import ( from ..service import maybe_spawn_daemon if TYPE_CHECKING: - from ._sharedmem import ( - ShmArray, - ) + from tractor.ipc._shm import ShmArray from .feed import ( _FeedsBus, Sub, @@ -378,16 +376,16 @@ async def register_with_sampler( # feed_is_live.is_set() # ^TODO? pass it in instead? ): - from ._sharedmem import ( - attach_shm_array, - _Token, + from tractor.ipc._shm import ( + attach_shm_ndarray, + NDToken, ) for period in shms_by_period: # load and register shm handles shm_token_msg = shms_by_period[period] - shm = attach_shm_array( - _Token.from_msg(shm_token_msg), + shm = attach_shm_ndarray( + NDToken.from_msg(shm_token_msg), readonly=False, ) shms_by_period[period] = shm diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 3c7857b0..d894d392 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -18,31 +18,23 @@ # . ''' -NumPy shared memory buffers for real-time IPC streaming. +Piker-specific shared memory helpers. -Thin shim over ``tractor.ipc._shm`` providing -backward-compatible aliases for piker's historical API. +Thin shim providing piker-only wrappers around +``tractor.ipc._shm``; all core types and functions +are now imported directly from tractor throughout +the codebase. ''' -from typing import Optional - import numpy as np from tractor.ipc._shm import ( - SharedInt, + NDToken, ShmArray, - ShmList, - - NDToken as _Token, - - open_shm_ndarray as open_shm_array, - attach_shm_ndarray as attach_shm_array, - open_shm_list, - attach_shm_list, - - get_shm_token, _known_tokens, _make_token as _tractor_make_token, + open_shm_ndarray, + attach_shm_ndarray, ) from ._util import log @@ -51,8 +43,8 @@ from ._util import log def _make_token( key: str, size: int, - dtype: Optional[np.dtype] = None, -) -> _Token: + dtype: np.dtype|None = None, +) -> NDToken: ''' Wrap tractor's ``_make_token()`` with piker's default dtype fallback to ``def_iohlcv_fields``. @@ -90,15 +82,16 @@ def maybe_open_shm_array( historical defaults (``readonly=False``, ``append_start_index=None``). - If you know the explicit ``_Token`` for your - memory segment instead use ``attach_shm_array``. + If you know the explicit ``NDToken`` for your + memory segment instead use + ``tractor.ipc._shm.attach_shm_ndarray()``. ''' try: # see if we already know this key token = _known_tokens[key] return ( - attach_shm_array( + attach_shm_ndarray( token=token, readonly=readonly, ), @@ -116,7 +109,7 @@ def maybe_open_shm_array( ) try: return ( - attach_shm_array( + attach_shm_ndarray( token=token, **kwargs, ), @@ -134,7 +127,7 @@ def maybe_open_shm_array( # to fail if a block has been allocated # on the OS by someone else. return ( - open_shm_array( + open_shm_ndarray( key=key, size=size, dtype=dtype, @@ -147,7 +140,7 @@ def maybe_open_shm_array( def try_read( array: np.ndarray, -) -> Optional[np.ndarray]: +) -> np.ndarray|None: ''' Try to read the last row from a shared mem array or ``None`` if the array read returns diff --git a/piker/data/flows.py b/piker/data/flows.py index 573180b9..83b7460c 100644 --- a/piker/data/flows.py +++ b/piker/data/flows.py @@ -31,10 +31,10 @@ import pendulum import numpy as np from piker.types import Struct -from ._sharedmem import ( - attach_shm_array, +from tractor.ipc._shm import ( ShmArray, - _Token, + NDToken, + attach_shm_ndarray, ) from piker.accounting import MktPair @@ -64,11 +64,11 @@ class Flume(Struct): ''' mkt: MktPair first_quote: dict - _rt_shm_token: _Token + _rt_shm_token: NDToken # optional since some data flows won't have a "downsampled" history # buffer/stream (eg. FSPs). - _hist_shm_token: _Token | None = None + _hist_shm_token: NDToken|None = None # private shm refs loaded dynamically from tokens _hist_shm: ShmArray | None = None @@ -88,7 +88,7 @@ class Flume(Struct): def rt_shm(self) -> ShmArray: if self._rt_shm is None: - self._rt_shm = attach_shm_array( + self._rt_shm = attach_shm_ndarray( token=self._rt_shm_token, readonly=self._readonly, ) @@ -104,7 +104,7 @@ class Flume(Struct): ) if self._hist_shm is None: - self._hist_shm = attach_shm_array( + self._hist_shm = attach_shm_ndarray( token=self._hist_shm_token, readonly=self._readonly, ) diff --git a/piker/fsp/_api.py b/piker/fsp/_api.py index bb2dea50..c42391fc 100644 --- a/piker/fsp/_api.py +++ b/piker/fsp/_api.py @@ -37,12 +37,12 @@ import numpy as np import tractor from tractor.msg import NamespacePath -from ..data._sharedmem import ( +from tractor.ipc._shm import ( ShmArray, - maybe_open_shm_array, - attach_shm_array, - _Token, + NDToken, + attach_shm_ndarray, ) +from ..data._sharedmem import maybe_open_shm_array from ..log import get_logger log = get_logger(__name__) @@ -78,8 +78,8 @@ class Fsp: # + the consuming fsp *to* the consumers output # shm flow. _flow_registry: dict[ - tuple[_Token, str], - tuple[_Token, Optional[ShmArray]], + tuple[NDToken, str], + tuple[NDToken, Optional[ShmArray]], ] = {} def __init__( @@ -148,7 +148,7 @@ class Fsp: # times as possible as per: # - https://github.com/pikers/piker/issues/359 # - https://github.com/pikers/piker/issues/332 - maybe_array := attach_shm_array(dst_token) + maybe_array := attach_shm_ndarray(dst_token) ) return maybe_array diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index b7806719..8795a1fb 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -40,7 +40,7 @@ from ..log import ( ) from .. import data from ..data.flows import Flume -from ..data._sharedmem import ShmArray +from tractor.ipc._shm import ShmArray from ..data._sampling import ( _default_delay_s, open_sample_stream, @@ -49,7 +49,7 @@ from ..accounting import MktPair from ._api import ( Fsp, _load_builtins, - _Token, + NDToken, ) from ..toolz import Profiler @@ -414,7 +414,7 @@ async def cascade( dst_flume_addr: dict, ns_path: NamespacePath, - shm_registry: dict[str, _Token], + shm_registry: dict[str, NDToken], zero_on_step: bool = False, loglevel: str|None = None, @@ -465,9 +465,9 @@ async def cascade( # not sure how else to do it. for (token, fsp_name, dst_token) in shm_registry: Fsp._flow_registry[( - _Token.from_msg(token), + NDToken.from_msg(token), fsp_name, - )] = _Token.from_msg(dst_token), None + )] = NDToken.from_msg(dst_token), None fsp: Fsp = reg.get( NamespacePath(ns_path) diff --git a/piker/fsp/_momo.py b/piker/fsp/_momo.py index d1463c22..829f5f45 100644 --- a/piker/fsp/_momo.py +++ b/piker/fsp/_momo.py @@ -25,7 +25,7 @@ from numba import jit, float64, optional, int64 from ._api import fsp from ..data import iterticks -from ..data._sharedmem import ShmArray +from tractor.ipc._shm import ShmArray @jit( diff --git a/piker/fsp/_volume.py b/piker/fsp/_volume.py index 594e80e4..d0edfeb3 100644 --- a/piker/fsp/_volume.py +++ b/piker/fsp/_volume.py @@ -21,7 +21,7 @@ from tractor.trionics._broadcast import AsyncReceiver from ._api import fsp from ..data import iterticks -from ..data._sharedmem import ShmArray +from tractor.ipc._shm import ShmArray from ._momo import _wma from ..log import get_logger diff --git a/piker/storage/cli.py b/piker/storage/cli.py index c73d3b6d..b8015904 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -37,9 +37,7 @@ import typer from piker.service import open_piker_runtime from piker.cli import cli -from piker.data import ( - ShmArray, -) +from tractor.ipc._shm import ShmArray from piker import tsp from . import log from . import ( diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index 8a948cab..edb3a362 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -64,10 +64,8 @@ from pendulum import ( from piker import config from piker import tsp -from piker.data import ( - def_iohlcv_fields, - ShmArray, -) +from tractor.ipc._shm import ShmArray +from piker.data import def_iohlcv_fields from piker.log import get_logger from . import TimeseriesNotFound diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py index 0e75162f..fcc1d331 100644 --- a/piker/tsp/_history.py +++ b/piker/tsp/_history.py @@ -59,11 +59,11 @@ from piker.brokers import NoData from piker.accounting import ( MktPair, ) -from piker.log import get_logger -from ..data._sharedmem import ( - maybe_open_shm_array, - ShmArray, +from piker.log import ( + get_logger, ) +from tractor.ipc._shm import ShmArray +from ..data._sharedmem import maybe_open_shm_array from piker.data._source import ( def_iohlcv_fields, ) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index e6dbd69f..fc24570e 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -49,7 +49,7 @@ from ._cursor import ( Cursor, ContentsLabel, ) -from ..data._sharedmem import ShmArray +from tractor.ipc._shm import ShmArray from ._ohlc import BarItems from ._curve import ( Curve, diff --git a/piker/ui/_dataviz.py b/piker/ui/_dataviz.py index cc4529be..eeece1fa 100644 --- a/piker/ui/_dataviz.py +++ b/piker/ui/_dataviz.py @@ -42,9 +42,7 @@ from numpy import ( import pyqtgraph as pg from piker.ui.qt import QLineF -from ..data._sharedmem import ( - ShmArray, -) +from tractor.ipc._shm import ShmArray from ..data.flows import Flume from ..data._formatters import ( IncrementalFormatter, diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index 7a2df5e6..8d18beee 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -44,14 +44,12 @@ from piker.fsp import ( dolla_vlm, flow_rates, ) -from piker.data import ( - Flume, +from tractor.ipc._shm import ( ShmArray, + NDToken, ) -from piker.data._sharedmem import ( - _Token, - try_read, -) +from piker.data import Flume +from piker.data._sharedmem import try_read from piker.log import get_logger from piker.toolz import Profiler from piker.types import Struct @@ -382,7 +380,7 @@ class FspAdmin: tuple, tuple[tractor.MsgStream, ShmArray] ] = {} - self._flow_registry: dict[_Token, str] = {} + self._flow_registry: dict[NDToken, str] = {} # TODO: make this a `.src_flume` and add # a `dst_flume`? -- 2.34.1 From e7a867126bfa1bdddc288866e88885d0c45358ce Mon Sep 17 00:00:00 2001 From: goodboy Date: Thu, 26 Feb 2026 18:00:35 -0500 Subject: [PATCH 3/3] Add a console log in `.tsp._history.manage_history()` Set the passed down `loglevel`. --- piker/tsp/_history.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/piker/tsp/_history.py b/piker/tsp/_history.py index fcc1d331..1c8940b5 100644 --- a/piker/tsp/_history.py +++ b/piker/tsp/_history.py @@ -61,6 +61,7 @@ from piker.accounting import ( ) from piker.log import ( get_logger, + get_console_log, ) from tractor.ipc._shm import ShmArray from ..data._sharedmem import maybe_open_shm_array @@ -1386,6 +1387,10 @@ async def manage_history( engages. ''' + get_console_log( + name=__name__, + level=loglevel, + ) # TODO: is there a way to make each shm file key # actor-tree-discovery-addr unique so we avoid collisions # when doing tests which also allocate shms for certain instruments -- 2.34.1