diff --git a/piker/data/tsp.py b/piker/data/tsp.py index 3f293d83..9c327725 100644 --- a/piker/data/tsp.py +++ b/piker/data/tsp.py @@ -28,6 +28,7 @@ from math import ( ceil, floor, ) +import time from typing import Literal import numpy as np @@ -408,3 +409,51 @@ def dedupe(src_df: pl.DataFrame) -> tuple[ deduped, was_deduped, ) + + +# NOTE: thanks to this SO answer for the below conversion routines +# to go from numpy struct-arrays to polars dataframes and back: +# https://stackoverflow.com/a/72054819 +def np2pl(array: np.ndarray) -> pl.DataFrame: + start = time.time() + + # XXX: thanks to this SO answer for this conversion tip: + # https://stackoverflow.com/a/72054819 + df = pl.DataFrame({ + field_name: array[field_name] + for field_name in array.dtype.fields + }) + delay: float = round( + time.time() - start, + ndigits=6, + ) + log.info( + f'numpy -> polars conversion took {delay} secs\n' + f'polars df: {df}' + ) + return df + # return pl.DataFrame({ + # field_name: array[field_name] + # for field_name in array.dtype.fields + # }) + + +def pl2np( + df: pl.DataFrame, + dtype: np.dtype, + +) -> np.ndarray: + + # Create numpy struct array of the correct size and dtype + # and loop through df columns to fill in array fields. + array = np.empty( + df.height, + dtype, + ) + for field, col in zip( + dtype.fields, + df.columns, + ): + array[field] = df.get_column(col).to_numpy() + + return array diff --git a/piker/storage/cli.py b/piker/storage/cli.py index 34d46046..54e7570e 100644 --- a/piker/storage/cli.py +++ b/piker/storage/cli.py @@ -260,22 +260,8 @@ def iter_dfs_from_shms(fqme: str) -> Generator[ assert not opened ohlcv = shm.array - start = time.time() - - # XXX: thanks to this SO answer for this conversion tip: - # https://stackoverflow.com/a/72054819 - df = pl.DataFrame({ - field_name: ohlcv[field_name] - for field_name in ohlcv.dtype.fields - }) - delay: float = round( - time.time() - start, - ndigits=6, - ) - log.info( - f'numpy -> polars conversion took {delay} secs\n' - f'polars df: {df}' - ) + from .nativedb import np2pl + df: pl.DataFrame = np2pl(ohlcv) yield ( shmfile, @@ -316,7 +302,6 @@ def ldshm( f'Something is wrong with time period for {shm}:\n{times}' ) - # over-write back to shm? df: pl.DataFrame # with dts deduped: pl.DataFrame # deduplicated dts diff --git a/piker/storage/nativedb.py b/piker/storage/nativedb.py index 0b15d4d7..04c4935b 100644 --- a/piker/storage/nativedb.py +++ b/piker/storage/nativedb.py @@ -65,8 +65,11 @@ from pendulum import ( ) from piker import config -from piker.data import def_iohlcv_fields -from piker.data import ShmArray +from piker.data import ( + def_iohlcv_fields, + ShmArray, + tsp, +) from piker.log import get_logger from . import TimeseriesNotFound @@ -74,37 +77,6 @@ from . import TimeseriesNotFound log = get_logger('storage.nativedb') -# NOTE: thanks to this SO answer for the below conversion routines -# to go from numpy struct-arrays to polars dataframes and back: -# https://stackoverflow.com/a/72054819 -def np2pl(array: np.ndarray) -> pl.DataFrame: - return pl.DataFrame({ - field_name: array[field_name] - for field_name in array.dtype.fields - }) - - -def pl2np( - df: pl.DataFrame, - dtype: np.dtype, - -) -> np.ndarray: - - # Create numpy struct array of the correct size and dtype - # and loop through df columns to fill in array fields. - array = np.empty( - df.height, - dtype, - ) - for field, col in zip( - dtype.fields, - df.columns, - ): - array[field] = df.get_column(col).to_numpy() - - return array - - def detect_period(shm: ShmArray) -> float: ''' Attempt to detect the series time step sampling period @@ -290,7 +262,7 @@ class NativeStorageClient: # TODO: filter by end and limit inputs # times: pl.Series = df['time'] - array: np.ndarray = pl2np( + array: np.ndarray = tsp.pl2np( df, dtype=np.dtype(def_iohlcv_fields), ) @@ -326,7 +298,7 @@ class NativeStorageClient: datadir=self._datadir, ) if isinstance(ohlcv, np.ndarray): - df: pl.DataFrame = np2pl(ohlcv) + df: pl.DataFrame = tsp.np2pl(ohlcv) else: df = ohlcv