From 7b653fe4f44587e42c28375fbed46b3c3be4b9f4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 28 Aug 2022 20:28:42 -0400 Subject: [PATCH 1/2] Store shm array size in token schema, use for loading --- piker/data/_sharedmem.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 82f61e79..8cf09aa5 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -118,6 +118,7 @@ class _Token(Struct, frozen=True): shm_first_index_name: str shm_last_index_name: str dtype_descr: tuple + size: int @property def dtype(self) -> np.dtype: @@ -158,6 +159,7 @@ def get_shm_token(key: str) -> _Token: def _make_token( key: str, + size: int, dtype: Optional[np.dtype] = None, ) -> _Token: ''' @@ -170,7 +172,8 @@ def _make_token( shm_name=key, shm_first_index_name=key + "_first", shm_last_index_name=key + "_last", - dtype_descr=tuple(np.dtype(dtype).descr) + dtype_descr=tuple(np.dtype(dtype).descr), + size=size, ) @@ -222,6 +225,7 @@ class ShmArray: shm_first_index_name=self._first._shm.name, shm_last_index_name=self._last._shm.name, dtype_descr=tuple(self._array.dtype.descr), + size=self._len, ) @property @@ -467,7 +471,8 @@ def open_shm_array( token = _make_token( key=key, - dtype=dtype + size=size, + dtype=dtype, ) # create single entry arrays for storing an first and last indices @@ -527,7 +532,7 @@ def open_shm_array( def attach_shm_array( token: tuple[str, str, tuple[str, str]], - size: int = _default_size, + # size: int = _default_size, readonly: bool = True, ) -> ShmArray: @@ -566,7 +571,7 @@ def attach_shm_array( raise _err shmarr = np.ndarray( - (size,), + (token.size,), dtype=token.dtype, buffer=shm.buf ) @@ -634,6 +639,7 @@ def maybe_open_shm_array( use ``attach_shm_array``. ''' + size = kwargs.pop('size', _default_size) try: # see if we already know this key token = _known_tokens[key] @@ -641,7 +647,11 @@ def maybe_open_shm_array( except KeyError: log.warning(f"Could not find {key} in shms cache") if dtype: - token = _make_token(key, dtype) + token = _make_token( + key, + size=size, + dtype=dtype, + ) try: return attach_shm_array(token=token, **kwargs), False except FileNotFoundError: From 7dfa4c3cde4e1e430827f62ed7e4598cad39eb97 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Aug 2022 13:56:26 -0400 Subject: [PATCH 2/2] Better comment on the `size`'s purpose/units --- piker/data/_sharedmem.py | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 8cf09aa5..3794c46f 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -50,7 +50,11 @@ _rt_buffer_start = int((_days_worth - 1) * _secs_in_day) def cuckoff_mantracker(): + ''' + Disable all ``multiprocessing``` "resource tracking" machinery since + it's an absolute multi-threaded mess of non-SC madness. + ''' from multiprocessing import resource_tracker as mantracker # Tell the "resource tracker" thing to fuck off. @@ -118,7 +122,7 @@ class _Token(Struct, frozen=True): shm_first_index_name: str shm_last_index_name: str dtype_descr: tuple - size: int + size: int # in struct-array index / row terms @property def dtype(self) -> np.dtype: @@ -440,7 +444,7 @@ class ShmArray: def open_shm_array( key: Optional[str] = None, - size: int = _default_size, + size: int = _default_size, # see above dtype: Optional[np.dtype] = None, readonly: bool = False, @@ -524,6 +528,7 @@ def open_shm_array( # "unlink" created shm on process teardown by # pushing teardown calls onto actor context stack + # TODO: make this a public API in ``tractor``.. tractor._actor._lifetime_stack.callback(shmarr.close) tractor._actor._lifetime_stack.callback(shmarr.destroy) @@ -532,7 +537,6 @@ def open_shm_array( def attach_shm_array( token: tuple[str, str, tuple[str, str]], - # size: int = _default_size, readonly: bool = True, ) -> ShmArray: