diff --git a/piker/storage/marketstore/__init__.py b/piker/storage/marketstore/__init__.py index d1a3d67f..d435fb66 100644 --- a/piker/storage/marketstore/__init__.py +++ b/piker/storage/marketstore/__init__.py @@ -46,7 +46,7 @@ from anyio_marketstore import ( import pendulum import purerpc -from ..service.marketstore import ( +from piker.service.marketstore import ( MarketstoreClient, tf_in_1s, mk_tbk, @@ -58,7 +58,7 @@ from anyio_marketstore import ( # noqa MarketstoreClient, Params, ) -from ..log import get_logger +from piker.log import get_logger # from .._profile import Profiler @@ -107,7 +107,6 @@ class MktsStorageClient: datetime | None, # first dt datetime | None, # last dt ]: - first_tsdb_dt, last_tsdb_dt = None, None hist = await self.read_ohlcv( fqme, @@ -119,10 +118,13 @@ class MktsStorageClient: log.info(f'Loaded tsdb history {hist}') if len(hist): - times = hist['Epoch'] + # breakpoint() + times: np.ndarray = hist['Epoch'] + first, last = times[0], times[-1] first_tsdb_dt, last_tsdb_dt = map( - pendulum.from_timestamp, [first, last] + pendulum.from_timestamp, + [first, last] ) return ( @@ -135,53 +137,82 @@ class MktsStorageClient: self, fqme: str, timeframe: int | str, - end: int | None = None, - limit: int = int(800e3), + end: float | None = None, # epoch or none + limit: int = int(200e3), ) -> np.ndarray: client = self.client syms = await client.list_symbols() - if fqme not in syms: return {} + # ensure end time is in correct int format! + if ( + end + and not isinstance(end, float) + ): + end = int(float(end)) + # breakpoint() + # use the provided timeframe or 1s by default tfstr = tf_in_1s.get(timeframe, tf_in_1s[1]) - params = Params( + import pymarketstore as pymkts + sync_client = pymkts.Client() + param = pymkts.Params( symbols=fqme, timeframe=tfstr, attrgroup='OHLCV', end=end, - # limit_from_start=True, - # TODO: figure the max limit here given the - # ``purepc`` msg size limit of purerpc: 33554432 limit=limit, + # limit_from_start=True, ) + try: + reply = sync_client.query(param) + except Exception as err: + if 'no files returned from query parse: None' in err.args: + return [] - for i in range(3): - try: - result = await client.query(params) - break - except purerpc.grpclib.exceptions.UnknownError as err: - if 'snappy' in err.args: - await tractor.breakpoint() + raise - # indicate there is no history for this timeframe - log.exception( - f'Unknown mkts QUERY error: {params}\n' - f'{err.args}' - ) - else: - return {} + data_set: pymkts.results.DataSet = reply.first() + array: np.ndarray = data_set.array - # TODO: it turns out column access on recarrays is actually slower: - # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist - # it might make sense to make these structured arrays? - data_set = result.by_symbols()[fqme] - array = data_set.array + # params = Params( + # symbols=fqme, + # timeframe=tfstr, + # attrgroup='OHLCV', + # end=end, + # # limit_from_start=True, + + # # TODO: figure the max limit here given the + # # ``purepc`` msg size limit of purerpc: 33554432 + # limit=limit, + # ) + + # for i in range(3): + # try: + # result = await client.query(params) + # break + # except purerpc.grpclib.exceptions.UnknownError as err: + # if 'snappy' in err.args: + # await tractor.breakpoint() + + # # indicate there is no history for this timeframe + # log.exception( + # f'Unknown mkts QUERY error: {params}\n' + # f'{err.args}' + # ) + # else: + # return {} + + # # TODO: it turns out column access on recarrays is actually slower: + # # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist + # # it might make sense to make these structured arrays? + # data_set = result.by_symbols()[fqme] + # array = data_set.array # XXX: ensure sample rate is as expected time = data_set.array['Epoch'] @@ -191,19 +222,20 @@ class MktsStorageClient: if time_step != ts: log.warning( - f'MKTS BUG: wrong timeframe loaded: {time_step}' - 'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG' + f'MKTS BUG: wrong timeframe loaded: {time_step}\n' + 'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG ' f'WIPING HISTORY FOR {ts}s' ) - await self.delete_ts(fqme, timeframe) + await tractor.breakpoint() + # await self.delete_ts(fqme, timeframe) # try reading again.. - return await self.read_ohlcv( - fqme, - timeframe, - end, - limit, - ) + # return await self.read_ohlcv( + # fqme, + # timeframe, + # end, + # limit, + # ) return array