diff --git a/piker/service/marketstore.py b/piker/service/marketstore.py index f2174ad2..4ca496b5 100644 --- a/piker/service/marketstore.py +++ b/piker/service/marketstore.py @@ -59,6 +59,7 @@ from ._util import ( ) from ..data.feed import maybe_open_feed from .._profile import Profiler +from .. import config # ahabd-supervisor and container level config @@ -332,8 +333,8 @@ def quote_to_marketstore_structarray( @acm async def get_client( - host: str = 'localhost', - port: int = _config['grpc_listen_port'], + host: str | None, + port: int | None, ) -> MarketstoreClient: ''' @@ -342,8 +343,8 @@ async def get_client( ''' async with open_marketstore_client( - host, - port + host or 'localhost', + port or _config['grpc_listen_port'], ) as client: yield client @@ -407,7 +408,7 @@ class Storage: async def load( self, - fqsn: str, + fqme: str, timeframe: int, ) -> tuple[ @@ -418,7 +419,7 @@ class Storage: first_tsdb_dt, last_tsdb_dt = None, None hist = await self.read_ohlcv( - fqsn, + fqme, # on first load we don't need to pull the max # history per request size worth. limit=3000, @@ -441,7 +442,7 @@ class Storage: async def read_ohlcv( self, - fqsn: str, + fqme: str, timeframe: int | str, end: int | None = None, limit: int = int(800e3), @@ -451,14 +452,14 @@ class Storage: client = self.client syms = await client.list_symbols() - if fqsn not in syms: + if fqme not in syms: return {} # use the provided timeframe or 1s by default tfstr = tf_in_1s.get(timeframe, tf_in_1s[1]) params = Params( - symbols=fqsn, + symbols=fqme, timeframe=tfstr, attrgroup='OHLCV', end=end, @@ -488,7 +489,7 @@ class Storage: # TODO: it turns out column access on recarrays is actually slower: # https://jakevdp.github.io/PythonDataScienceHandbook/02.09-structured-data-numpy.html#RecordArrays:-Structured-Arrays-with-a-Twist # it might make sense to make these structured arrays? - data_set = result.by_symbols()[fqsn] + data_set = result.by_symbols()[fqme] array = data_set.array # XXX: ensure sample rate is as expected @@ -503,11 +504,11 @@ class Storage: 'YOUR DATABASE LIKELY CONTAINS BAD DATA FROM AN OLD BUG' f'WIPING HISTORY FOR {ts}s' ) - await self.delete_ts(fqsn, timeframe) + await self.delete_ts(fqme, timeframe) # try reading again.. return await self.read_ohlcv( - fqsn, + fqme, timeframe, end, limit, @@ -537,7 +538,7 @@ class Storage: async def write_ohlcv( self, - fqsn: str, + fqme: str, ohlcv: np.ndarray, timeframe: int, append_and_duplicate: bool = True, @@ -570,7 +571,7 @@ class Storage: # write to db resp = await self.client.write( to_push, - tbk=f'{fqsn}/{tfkey}/OHLCV', + tbk=f'{fqme}/{tfkey}/OHLCV', # NOTE: will will append duplicates # for the same timestamp-index. @@ -593,7 +594,7 @@ class Storage: # write to db resp = await self.client.write( to_push, - tbk=f'{fqsn}/{tfkey}/OHLCV', + tbk=f'{fqme}/{tfkey}/OHLCV', # NOTE: will will append duplicates # for the same timestamp-index. @@ -625,8 +626,8 @@ class Storage: @acm async def open_storage_client( - fqsn: str, - period: Union[int, str | None] = None, # in seconds + host: str, + grpc_port: int, ) -> tuple[Storage, dict[str, np.ndarray]]: ''' @@ -635,7 +636,10 @@ async def open_storage_client( ''' async with ( # eventually a storage backend endpoint - get_client() as client, + get_client( + host=host, + port=grpc_port, + ) as client, ): # slap on our wrapper api yield Storage(client) @@ -643,7 +647,7 @@ async def open_storage_client( @acm async def open_tsdb_client( - fqsn: str, + fqme: str, ) -> Storage: # TODO: real-time dedicated task for ensuring @@ -677,25 +681,31 @@ async def open_tsdb_client( delayed=False, ) + # load any user service settings for connecting to tsdb + conf, path = config.load('conf') + tsdbconf = conf['network'].get('tsdb') + backend = tsdbconf.pop('backend') async with ( - open_storage_client(fqsn) as storage, + open_storage_client( + **tsdbconf, + ) as storage, maybe_open_feed( - [fqsn], + [fqme], start_stream=False, ) as feed, ): - profiler(f'opened feed for {fqsn}') + profiler(f'opened feed for {fqme}') # to_append = feed.hist_shm.array # to_prepend = None - if fqsn: - flume = feed.flumes[fqsn] + if fqme: + flume = feed.flumes[fqme] symbol = flume.symbol if symbol: - fqsn = symbol.fqsn + fqme = symbol.fqme # diff db history with shm and only write the missing portions # ohlcv = flume.hist_shm.array @@ -703,7 +713,7 @@ async def open_tsdb_client( # TODO: use pg profiler # for secs in (1, 60): # tsdb_array = await storage.read_ohlcv( - # fqsn, + # fqme, # timeframe=timeframe, # ) # # hist diffing: @@ -726,7 +736,7 @@ async def open_tsdb_client( # log.info( # f'Writing datums {array.size} -> to tsdb from shm\n' # ) - # await storage.write_ohlcv(fqsn, array) + # await storage.write_ohlcv(fqme, array) # profiler('Finished db writes')