piker/piker/data/_sharedmem.py

178 lines
4.7 KiB
Python

# piker: trading gear for hackers
# Copyright (C) Tyler Goodlet (in stewardship for pikers)
# This program is free software: you can redistribute it
# and/or modify it under the terms of the GNU Affero General
# Public License as published by the Free Software
# Foundation, either version 3 of the License, or (at your
# option) any later version.
# This program is distributed in the hope that it will be
# useful, but WITHOUT ANY WARRANTY; without even the implied
# warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR
# PURPOSE. See the GNU Affero General Public License for
# more details.
# You should have received a copy of the GNU Affero General
# Public License along with this program. If not, see
# <https://www.gnu.org/licenses/>.
'''
NumPy shared memory buffers for real-time IPC streaming.
Thin shim over ``tractor.ipc._shm`` providing
backward-compatible aliases for piker's historical API.
'''
from typing import Optional
import numpy as np
from tractor.ipc._shm import (
SharedInt,
ShmArray,
ShmList,
NDToken as _Token,
open_shm_ndarray as open_shm_array,
attach_shm_ndarray as attach_shm_array,
open_shm_list,
attach_shm_list,
get_shm_token,
_known_tokens,
_make_token as _tractor_make_token,
)
from ._util import log
def _make_token(
key: str,
size: int,
dtype: Optional[np.dtype] = None,
) -> _Token:
'''
Wrap tractor's ``_make_token()`` with piker's
default dtype fallback to ``def_iohlcv_fields``.
'''
from ._source import def_iohlcv_fields
dtype = (
def_iohlcv_fields
if dtype is None
else dtype
)
return _tractor_make_token(
key=key,
size=size,
dtype=dtype,
)
def maybe_open_shm_array(
key: str,
size: int,
dtype: np.dtype|None = None,
append_start_index: int|None = None,
readonly: bool = False,
**kwargs,
) -> tuple[ShmArray, bool]:
'''
Attempt to attach to a shared memory block
using a "key" lookup to registered blocks in
the user's overall "system" registry (presumes
you don't have the block's explicit token).
This is a thin wrapper around tractor's
``maybe_open_shm_ndarray()`` preserving piker's
historical defaults (``readonly=False``,
``append_start_index=None``).
If you know the explicit ``_Token`` for your
memory segment instead use ``attach_shm_array``.
'''
try:
# see if we already know this key
token = _known_tokens[key]
return (
attach_shm_array(
token=token,
readonly=readonly,
),
False,
)
except KeyError:
log.debug(
f'Could not find {key} in shms cache'
)
if dtype:
token = _make_token(
key,
size=size,
dtype=dtype,
)
try:
return (
attach_shm_array(
token=token,
**kwargs,
),
False,
)
except FileNotFoundError:
log.debug(
f'Could not attach to shm'
f' with token {token}'
)
# This actor does not know about memory
# associated with the provided "key".
# Attempt to open a block and expect
# to fail if a block has been allocated
# on the OS by someone else.
return (
open_shm_array(
key=key,
size=size,
dtype=dtype,
append_start_index=append_start_index,
readonly=readonly,
),
True,
)
def try_read(
array: np.ndarray,
) -> Optional[np.ndarray]:
'''
Try to read the last row from a shared mem
array or ``None`` if the array read returns
a zero-length array result.
Can be used to check for backfilling race
conditions where an array is currently being
(re-)written by a writer actor but the reader
is unaware and reads during the window where
the first and last indexes are being updated.
'''
try:
return array[-1]
except IndexError:
# XXX: race condition with backfilling shm.
#
# the underlying issue is that a backfill
# (aka prepend) and subsequent shm array
# first/last index update could result in an
# empty array read here since the indices may
# be updated in such a way that a read delivers
# an empty array (though it seems like we
# *should* be able to prevent that?).
# the array read was empty
return None