Port `_sharedmem` to thin shim over `tractor.ipc._shm`

Replace the ~716 line `piker.data._sharedmem` mod with a thin re-export
shim consuming `tractor.ipc._shm` types directly, since the `tractor`
version is the refined factoring of piker's original impl.

Deats,
- Re-export `SharedInt`, `ShmArray`, `ShmList`, `get_shm_token`,
  `_known_tokens` directly
- Alias renames: `NDToken as _Token`, `open_shm_ndarray as
  open_shm_array`, `attach_shm_ndarray as attach_shm_array`
- Keep `_make_token()` wrapper for piker's default dtype fallback to
  `def_iohlcv_fields`
- Keep `maybe_open_shm_array()` wrapper preserving piker's historical
  defaults (`readonly=False`, `append_start_index=None`)
- Keep `try_read()` race-condition guard (not in `tractor`)

All 13 import sites across piker continue to work unchanged with no
modifications needed.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
shm_from_tractor
Gud Boi 2026-03-14 16:49:55 -04:00
parent c23d034935
commit b71d0533b2
1 changed files with 88 additions and 626 deletions

View File

@ -1,151 +1,51 @@
# piker: trading gear for hackers # piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers) # Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it and/or modify # This program is free software: you can redistribute it
# it under the terms of the GNU Affero General Public License as published by # and/or modify it under the terms of the GNU Affero General
# the Free Software Foundation, either version 3 of the License, or # Public License as published by the Free Software
# (at your option) any later version. # Foundation, either version 3 of the License, or (at your
# option) any later version.
# This program is distributed in the hope that it will be useful, # This program is distributed in the hope that it will be
# but WITHOUT ANY WARRANTY; without even the implied warranty of # useful, but WITHOUT ANY WARRANTY; without even the implied
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# GNU Affero General Public License for more details. # PURPOSE. See the GNU Affero General Public License for
# more details.
# You should have received a copy of the GNU Affero General Public License # You should have received a copy of the GNU Affero General
# along with this program. If not, see <https://www.gnu.org/licenses/>. # Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
""" '''
NumPy compatible shared memory buffers for real-time IPC streaming. NumPy shared memory buffers for real-time IPC streaming.
""" Thin shim over ``tractor.ipc._shm`` providing
from __future__ import annotations backward-compatible aliases for piker's historical API.
from sys import byteorder
import time '''
from typing import Optional from typing import Optional
from multiprocessing.shared_memory import SharedMemory, _USE_POSIX
if _USE_POSIX:
from _posixshmem import shm_unlink
# import msgspec
import numpy as np import numpy as np
from numpy.lib import recfunctions as rfn
import tractor from tractor.ipc._shm import (
SharedInt,
ShmArray,
ShmList,
NDToken as _Token,
open_shm_ndarray as open_shm_array,
attach_shm_ndarray as attach_shm_array,
open_shm_list,
attach_shm_list,
get_shm_token,
_known_tokens,
_make_token as _tractor_make_token,
)
from ._util import log from ._util import log
from ._source import def_iohlcv_fields
from piker.types import Struct
def cuckoff_mantracker():
'''
Disable all ``multiprocessing``` "resource tracking" machinery since
it's an absolute multi-threaded mess of non-SC madness.
'''
from multiprocessing import resource_tracker as mantracker
# Tell the "resource tracker" thing to fuck off.
class ManTracker(mantracker.ResourceTracker):
def register(self, name, rtype):
pass
def unregister(self, name, rtype):
pass
def ensure_running(self):
pass
# "know your land and know your prey"
# https://www.dailymotion.com/video/x6ozzco
mantracker._resource_tracker = ManTracker()
mantracker.register = mantracker._resource_tracker.register
mantracker.ensure_running = mantracker._resource_tracker.ensure_running
mantracker.unregister = mantracker._resource_tracker.unregister
mantracker.getfd = mantracker._resource_tracker.getfd
cuckoff_mantracker()
class SharedInt:
"""Wrapper around a single entry shared memory array which
holds an ``int`` value used as an index counter.
"""
def __init__(
self,
shm: SharedMemory,
) -> None:
self._shm = shm
@property
def value(self) -> int:
return int.from_bytes(self._shm.buf, byteorder)
@value.setter
def value(self, value) -> None:
self._shm.buf[:] = value.to_bytes(self._shm.size, byteorder)
def destroy(self) -> None:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
name = self._shm.name
try:
shm_unlink(name)
except FileNotFoundError:
# might be a teardown race here?
log.warning(f'Shm for {name} already unlinked?')
class _Token(Struct, frozen=True):
'''
Internal represenation of a shared memory "token"
which can be used to key a system wide post shm entry.
'''
shm_name: str # this servers as a "key" value
shm_first_index_name: str
shm_last_index_name: str
dtype_descr: tuple
size: int # in struct-array index / row terms
@property
def dtype(self) -> np.dtype:
return np.dtype(list(map(tuple, self.dtype_descr))).descr
def as_msg(self):
return self.to_dict()
@classmethod
def from_msg(cls, msg: dict) -> _Token:
if isinstance(msg, _Token):
return msg
# TODO: native struct decoding
# return _token_dec.decode(msg)
msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr']))
return _Token(**msg)
# _token_dec = msgspec.msgpack.Decoder(_Token)
# TODO: this api?
# _known_tokens = tractor.ActorVar('_shm_tokens', {})
# _known_tokens = tractor.ContextStack('_known_tokens', )
# _known_tokens = trio.RunVar('shms', {})
# process-local store of keys to tokens
_known_tokens = {}
def get_shm_token(key: str) -> _Token:
"""Convenience func to check if a token
for the provided key is known by this process.
"""
return _known_tokens.get(key)
def _make_token( def _make_token(
@ -154,494 +54,44 @@ def _make_token(
dtype: Optional[np.dtype] = None, dtype: Optional[np.dtype] = None,
) -> _Token: ) -> _Token:
''' '''
Create a serializable token that can be used Wrap tractor's ``_make_token()`` with piker's
to access a shared array. default dtype fallback to ``def_iohlcv_fields``.
''' '''
dtype = def_iohlcv_fields if dtype is None else dtype from ._source import def_iohlcv_fields
return _Token( dtype = (
shm_name=key, def_iohlcv_fields
shm_first_index_name=key + "_first", if dtype is None
shm_last_index_name=key + "_last", else dtype
dtype_descr=tuple(np.dtype(dtype).descr),
size=size,
) )
return _tractor_make_token(
class ShmArray:
'''
A shared memory ``numpy`` (compatible) array API.
An underlying shared memory buffer is allocated based on
a user specified ``numpy.ndarray``. This fixed size array
can be read and written to by pushing data both onto the "front"
or "back" of a set index range. The indexes for the "first" and
"last" index are themselves stored in shared memory (accessed via
``SharedInt`` interfaces) values such that multiple processes can
interact with the same array using a synchronized-index.
'''
def __init__(
self,
shmarr: np.ndarray,
first: SharedInt,
last: SharedInt,
shm: SharedMemory,
# readonly: bool = True,
) -> None:
self._array = shmarr
# indexes for first and last indices corresponding
# to fille data
self._first = first
self._last = last
self._len = len(shmarr)
self._shm = shm
self._post_init: bool = False
# pushing data does not write the index (aka primary key)
dtype = shmarr.dtype
if dtype.fields:
self._write_fields = list(shmarr.dtype.fields.keys())[1:]
else:
self._write_fields = None
# TODO: ringbuf api?
@property
def _token(self) -> _Token:
return _Token(
shm_name=self._shm.name,
shm_first_index_name=self._first._shm.name,
shm_last_index_name=self._last._shm.name,
dtype_descr=tuple(self._array.dtype.descr),
size=self._len,
)
@property
def token(self) -> dict:
"""Shared memory token that can be serialized and used by
another process to attach to this array.
"""
return self._token.as_msg()
@property
def index(self) -> int:
return self._last.value % self._len
@property
def array(self) -> np.ndarray:
'''
Return an up-to-date ``np.ndarray`` view of the
so-far-written data to the underlying shm buffer.
'''
a = self._array[self._first.value:self._last.value]
# first, last = self._first.value, self._last.value
# a = self._array[first:last]
# TODO: eventually comment this once we've not seen it in the
# wild in a long time..
# XXX: race where first/last indexes cause a reader
# to load an empty array..
if len(a) == 0 and self._post_init:
raise RuntimeError('Empty array race condition hit!?')
return a
def ustruct(
self,
fields: Optional[list[str]] = None,
# type that all field values will be cast to
# in the returned view.
common_dtype: np.dtype = float,
) -> np.ndarray:
array = self._array
if fields:
selection = array[fields]
# fcount = len(fields)
else:
selection = array
# fcount = len(array.dtype.fields)
# XXX: manual ``.view()`` attempt that also doesn't work.
# uview = selection.view(
# dtype='<f16',
# ).reshape(-1, 4, order='A')
# assert len(selection) == len(uview)
u = rfn.structured_to_unstructured(
selection,
# dtype=float,
copy=True,
)
# unstruct = np.ndarray(u.shape, dtype=a.dtype, buffer=shm.buf)
# array[:] = a[:]
return u
# return ShmArray(
# shmarr=u,
# first=self._first,
# last=self._last,
# shm=self._shm
# )
def last(
self,
length: int = 1,
) -> np.ndarray:
'''
Return the last ``length``'s worth of ("row") entries from the
array.
'''
return self.array[-length:]
def push(
self,
data: np.ndarray,
field_map: Optional[dict[str, str]] = None,
prepend: bool = False,
update_first: bool = True,
start: int | None = None,
) -> int:
'''
Ring buffer like "push" to append data
into the buffer and return updated "last" index.
NB: no actual ring logic yet to give a "loop around" on overflow
condition, lel.
'''
length = len(data)
if prepend:
index = (start or self._first.value) - length
if index < 0:
raise ValueError(
f'Array size of {self._len} was overrun during prepend.\n'
f'You have passed {abs(index)} too many datums.'
)
else:
index = start if start is not None else self._last.value
end = index + length
if field_map:
src_names, dst_names = zip(*field_map.items())
else:
dst_names = src_names = self._write_fields
try:
self._array[
list(dst_names)
][index:end] = data[list(src_names)][:]
# NOTE: there was a race here between updating
# the first and last indices and when the next reader
# tries to access ``.array`` (which due to the index
# overlap will be empty). Pretty sure we've fixed it now
# but leaving this here as a reminder.
if (
prepend
and update_first
and length
):
assert index < self._first.value
if (
index < self._first.value
and update_first
):
assert prepend, 'prepend=True not passed but index decreased?'
self._first.value = index
elif not prepend:
self._last.value = end
self._post_init = True
return end
except ValueError as err:
if field_map:
raise
# should raise if diff detected
self.diff_err_fields(data)
raise err
def diff_err_fields(
self,
data: np.ndarray,
) -> None:
# reraise with any field discrepancy
our_fields, their_fields = (
set(self._array.dtype.fields),
set(data.dtype.fields),
)
only_in_ours = our_fields - their_fields
only_in_theirs = their_fields - our_fields
if only_in_ours:
raise TypeError(
f"Input array is missing field(s): {only_in_ours}"
)
elif only_in_theirs:
raise TypeError(
f"Input array has unknown field(s): {only_in_theirs}"
)
# TODO: support "silent" prepends that don't update ._first.value?
def prepend(
self,
data: np.ndarray,
) -> int:
end = self.push(data, prepend=True)
assert end
def close(self) -> None:
self._first._shm.close()
self._last._shm.close()
self._shm.close()
def destroy(self) -> None:
if _USE_POSIX:
# We manually unlink to bypass all the "resource tracker"
# nonsense meant for non-SC systems.
shm_unlink(self._shm.name)
self._first.destroy()
self._last.destroy()
def flush(self) -> None:
# TODO: flush to storage backend like markestore?
...
def open_shm_array(
size: int,
key: str | None = None,
dtype: np.dtype | None = None,
append_start_index: int | None = None,
readonly: bool = False,
) -> ShmArray:
'''Open a memory shared ``numpy`` using the standard library.
This call unlinks (aka permanently destroys) the buffer on teardown
and thus should be used from the parent-most accessor (process).
'''
# create new shared mem segment for which we
# have write permission
a = np.zeros(size, dtype=dtype)
a['index'] = np.arange(len(a))
shm = SharedMemory(
name=key,
create=True,
size=a.nbytes
)
array = np.ndarray(
a.shape,
dtype=a.dtype,
buffer=shm.buf
)
array[:] = a[:]
array.setflags(write=int(not readonly))
token = _make_token(
key=key, key=key,
size=size, size=size,
dtype=dtype, dtype=dtype,
) )
# create single entry arrays for storing an first and last indices
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=True,
size=4, # std int
)
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=True,
size=4, # std int
)
)
# start the "real-time" updated section after 3-days worth of 1s
# sampled OHLC. this allows appending up to a days worth from
# tick/quote feeds before having to flush to a (tsdb) storage
# backend, and looks something like,
# -------------------------
# | | i
# _________________________
# <-------------> <------->
# history real-time
#
# Once fully "prepended", the history section will leave the
# ``ShmArray._start.value: int = 0`` and the yet-to-be written
# real-time section will start at ``ShmArray.index: int``.
# this sets the index to nearly 2/3rds into the the length of
# the buffer leaving at least a "days worth of second samples"
# for the real-time section.
if append_start_index is None:
append_start_index = round(size * 0.616)
last.value = first.value = append_start_index
shmarr = ShmArray(
array,
first,
last,
shm,
)
assert shmarr._token == token
_known_tokens[key] = shmarr.token
# "unlink" created shm on process teardown by
# pushing teardown calls onto actor context stack
stack = tractor.current_actor(
err_on_no_runtime=False,
).lifetime_stack
if stack:
stack.callback(shmarr.close)
stack.callback(shmarr.destroy)
return shmarr
def attach_shm_array(
token: tuple[str, str, tuple[str, str]],
readonly: bool = True,
) -> ShmArray:
'''
Attach to an existing shared memory array previously
created by another process using ``open_shared_array``.
No new shared mem is allocated but wrapper types for read/write
access are constructed.
'''
token = _Token.from_msg(token)
key = token.shm_name
if key in _known_tokens:
assert _Token.from_msg(_known_tokens[key]) == token, "WTF"
# XXX: ugh, looks like due to the ``shm_open()`` C api we can't
# actually place files in a subdir, see discussion here:
# https://stackoverflow.com/a/11103289
# attach to array buffer and view as per dtype
_err: Optional[Exception] = None
for _ in range(3):
try:
shm = SharedMemory(
name=key,
create=False,
)
break
except OSError as oserr:
_err = oserr
time.sleep(0.1)
else:
if _err:
raise _err
shmarr = np.ndarray(
(token.size,),
dtype=token.dtype,
buffer=shm.buf
)
shmarr.setflags(write=int(not readonly))
first = SharedInt(
shm=SharedMemory(
name=token.shm_first_index_name,
create=False,
size=4, # std int
),
)
last = SharedInt(
shm=SharedMemory(
name=token.shm_last_index_name,
create=False,
size=4, # std int
),
)
# make sure we can read
first.value
sha = ShmArray(
shmarr,
first,
last,
shm,
)
# read test
sha.array
# Stash key -> token knowledge for future queries
# via `maybe_opepn_shm_array()` but only after we know
# we can attach.
if key not in _known_tokens:
_known_tokens[key] = token
# "close" attached shm on actor teardown
if (actor := tractor.current_actor(
err_on_no_runtime=False,
)):
actor.lifetime_stack.callback(sha.close)
return sha
def maybe_open_shm_array( def maybe_open_shm_array(
key: str, key: str,
size: int, size: int,
dtype: np.dtype | None = None, dtype: np.dtype|None = None,
append_start_index: int | None = None, append_start_index: int|None = None,
readonly: bool = False, readonly: bool = False,
**kwargs, **kwargs,
) -> tuple[ShmArray, bool]: ) -> tuple[ShmArray, bool]:
''' '''
Attempt to attach to a shared memory block using a "key" lookup Attempt to attach to a shared memory block
to registered blocks in the users overall "system" registry using a "key" lookup to registered blocks in
(presumes you don't have the block's explicit token). the user's overall "system" registry (presumes
you don't have the block's explicit token).
This function is meant to solve the problem of discovering whether This is a thin wrapper around tractor's
a shared array token has been allocated or discovered by the actor ``maybe_open_shm_ndarray()`` preserving piker's
running in **this** process. Systems where multiple actors may seek historical defaults (``readonly=False``,
to access a common block can use this function to attempt to acquire ``append_start_index=None``).
a token as discovered by the actors who have previously stored
a "key" -> ``_Token`` map in an actor local (aka python global)
variable.
If you know the explicit ``_Token`` for your memory segment instead If you know the explicit ``_Token`` for your
use ``attach_shm_array``. memory segment instead use ``attach_shm_array``.
''' '''
try: try:
@ -655,7 +105,9 @@ def maybe_open_shm_array(
False, False,
) )
except KeyError: except KeyError:
log.debug(f"Could not find {key} in shms cache") log.debug(
f'Could not find {key} in shms cache'
)
if dtype: if dtype:
token = _make_token( token = _make_token(
key, key,
@ -663,9 +115,18 @@ def maybe_open_shm_array(
dtype=dtype, dtype=dtype,
) )
try: try:
return attach_shm_array(token=token, **kwargs), False return (
attach_shm_array(
token=token,
**kwargs,
),
False,
)
except FileNotFoundError: except FileNotFoundError:
log.debug(f"Could not attach to shm with token {token}") log.debug(
f'Could not attach to shm'
f' with token {token}'
)
# This actor does not know about memory # This actor does not know about memory
# associated with the provided "key". # associated with the provided "key".
@ -683,18 +144,20 @@ def maybe_open_shm_array(
True, True,
) )
def try_read(
array: np.ndarray
def try_read(
array: np.ndarray,
) -> Optional[np.ndarray]: ) -> Optional[np.ndarray]:
''' '''
Try to read the last row from a shared mem array or ``None`` Try to read the last row from a shared mem
if the array read returns a zero-length array result. array or ``None`` if the array read returns
a zero-length array result.
Can be used to check for backfilling race conditions where an array Can be used to check for backfilling race
is currently being (re-)written by a writer actor but the reader is conditions where an array is currently being
unaware and reads during the window where the first and last indexes (re-)written by a writer actor but the reader
are being updated. is unaware and reads during the window where
the first and last indexes are being updated.
''' '''
try: try:
@ -702,14 +165,13 @@ def try_read(
except IndexError: except IndexError:
# XXX: race condition with backfilling shm. # XXX: race condition with backfilling shm.
# #
# the underlying issue is that a backfill (aka prepend) and subsequent # the underlying issue is that a backfill
# shm array first/last index update could result in an empty array # (aka prepend) and subsequent shm array
# read here since the indices may be updated in such a way that # first/last index update could result in an
# a read delivers an empty array (though it seems like we # empty array read here since the indices may
# *should* be able to prevent that?). also, as and alt and # be updated in such a way that a read delivers
# something we need anyway, maybe there should be some kind of # an empty array (though it seems like we
# signal that a prepend is taking place and this consumer can # *should* be able to prevent that?).
# respond (eg. redrawing graphics) accordingly.
# the array read was emtpy # the array read was empty
return None return None