diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 802dcf46..944f933b 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -30,8 +30,8 @@ import trio import tractor from dataclasses import dataclass -from .. import data from ..data._source import Symbol +from ..data.feed import open_feed from ..pp import Position from ..data._normalize import iterticks from ..data._source import unpack_fqsn @@ -441,14 +441,11 @@ async def trades_dialogue( ) -> None: tractor.log.get_console_log(loglevel) - async with ( + async with open_feed( + [fqsn], + loglevel=loglevel, + ) as feed: - data.open_feed( - [fqsn], - loglevel=loglevel, - ) as feed, - - ): # TODO: load paper positions per broker from .toml config file # and pass as symbol to position data mapping: ``dict[str, dict]`` # await ctx.started(all_positions) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index e98195b4..55dca991 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -30,19 +30,19 @@ from ._sharedmem import ( get_shm_token, ShmArray, ) -from .feed import ( - open_feed, - _setup_persistent_brokerd, -) +# from .feed import ( +# # open_feed, +# _setup_persistent_brokerd, +# ) __all__ = [ - 'open_feed', + # 'open_feed', 'ShmArray', 'iterticks', 'maybe_open_shm_array', 'attach_shm_array', 'open_shm_array', 'get_shm_token', - '_setup_persistent_brokerd', + # '_setup_persistent_brokerd', ] diff --git a/piker/data/_sharedmem.py b/piker/data/_sharedmem.py index 82f61e79..8ea51eb7 100644 --- a/piker/data/_sharedmem.py +++ b/piker/data/_sharedmem.py @@ -128,12 +128,15 @@ class _Token(Struct, frozen=True): @classmethod def from_msg(cls, msg: dict) -> _Token: - if isinstance(msg, _Token): - return msg # TODO: native struct decoding # return _token_dec.decode(msg) + if isinstance(msg, _Token): + return msg + + # assert 0 + msg['dtype_descr'] = tuple(map(tuple, msg['dtype_descr'])) return _Token(**msg)