diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index aeabea0b..1db80015 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -39,6 +39,10 @@ from tractor.trionics import ( import trio from trio_typing import TaskStatus +from .ticktools import ( + frame_ticks, + _tick_groups, +) from ._util import ( log, get_console_log, @@ -742,77 +746,6 @@ async def sample_and_broadcast( ) -# tick-type-classes template for all possible "lowest level" events -# that can can be emitted by the "top of book" L1 queues and -# price-matching (with eventual clearing) in a double auction -# market (queuing) system. -_tick_groups = { - 'clears': {'trade', 'dark_trade', 'last'}, - 'bids': {'bid', 'bsize'}, - 'asks': {'ask', 'asize'}, -} - -# XXX alo define the flattened set of all such "fundamental ticks" -# so that it can be used as filter, eg. in the graphics display -# loop to compute running windowed y-ranges B) -_auction_ticks: set[str] = set.union(*_tick_groups.values()) - - -def frame_ticks( - quote: dict[str, Any], - - ticks_by_type: dict[str, list[dict[str, Any]]] = {}, - ticks_in_order: list[dict[str, Any]] | None = None - -) -> dict: - - # append quotes since last iteration into the last quote's - # tick array/buffer. - # TODO: once we decide to get fancy really we should - # have a shared mem tick buffer that is just - # continually filled and the UI just ready from it - # at it's display rate. - - if ticks := quote.get('ticks'): - - # XXX: build a tick-by-type table of lists - # of tick messages. This allows for less - # iteration on the receiver side by allowing for - # a single "latest tick event" look up by - # indexing the last entry in each sub-list. - # tbt = { - # 'types': ['bid', 'asize', 'last', .. ''], - - # 'bid': [tick0, tick1, tick2, .., tickn], - # 'asize': [tick0, tick1, tick2, .., tickn], - # 'last': [tick0, tick1, tick2, .., tickn], - # ... - # '': [tick0, tick1, tick2, .., tickn], - # } - - # append in reverse FIFO order for in-order iteration on - # receiver side. - tick: dict[str, Any] - for tick in ticks: - ticks_by_type.setdefault( - tick['type'], - [], - ).append(tick) - - # TODO: do we need this any more or can we just - # expect the receiver to unwind the below - # `ticks_by_type: dict`? - # => undwinding would potentially require a - # `dict[str, set | list]` instead with an - # included `'types' field which is an (ordered) - # set of tick type fields in the order which - # types arrived? - if ticks_in_order: - ticks_in_order.extend(ticks) - - return ticks_by_type - - async def uniform_rate_send( rate: float, diff --git a/piker/data/ticktools.py b/piker/data/ticktools.py index 13708252..bc3543f8 100644 --- a/piker/data/ticktools.py +++ b/piker/data/ticktools.py @@ -19,7 +19,25 @@ Stream format enforcement. ''' from itertools import chain -from typing import AsyncIterator +from typing import ( + Any, + AsyncIterator, +) + +# tick-type-classes template for all possible "lowest level" events +# that can can be emitted by the "top of book" L1 queues and +# price-matching (with eventual clearing) in a double auction +# market (queuing) system. +_tick_groups: dict[str, set[str]] = { + 'clears': {'trade', 'dark_trade', 'last'}, + 'bids': {'bid', 'bsize'}, + 'asks': {'ask', 'asize'}, +} + +# XXX alo define the flattened set of all such "fundamental ticks" +# so that it can be used as filter, eg. in the graphics display +# loop to compute running windowed y-ranges B) +_auction_ticks: set[str] = set.union(*_tick_groups.values()) def iterticks( @@ -80,3 +98,58 @@ def iterticks( ttype = tick.get('type') if ttype in types: yield tick + + +def frame_ticks( + quote: dict[str, Any], + + ticks_by_type: dict[str, list[dict[str, Any]]] = {}, + ticks_in_order: list[dict[str, Any]] | None = None + +) -> dict: + + # append quotes since last iteration into the last quote's + # tick array/buffer. + # TODO: once we decide to get fancy really we should + # have a shared mem tick buffer that is just + # continually filled and the UI just ready from it + # at it's display rate. + + if ticks := quote.get('ticks'): + + # XXX: build a tick-by-type table of lists + # of tick messages. This allows for less + # iteration on the receiver side by allowing for + # a single "latest tick event" look up by + # indexing the last entry in each sub-list. + # tbt = { + # 'types': ['bid', 'asize', 'last', .. ''], + + # 'bid': [tick0, tick1, tick2, .., tickn], + # 'asize': [tick0, tick1, tick2, .., tickn], + # 'last': [tick0, tick1, tick2, .., tickn], + # ... + # '': [tick0, tick1, tick2, .., tickn], + # } + + # append in reverse FIFO order for in-order iteration on + # receiver side. + tick: dict[str, Any] + for tick in ticks: + ticks_by_type.setdefault( + tick['type'], + [], + ).append(tick) + + # TODO: do we need this any more or can we just + # expect the receiver to unwind the below + # `ticks_by_type: dict`? + # => undwinding would potentially require a + # `dict[str, set | list]` instead with an + # included `'types' field which is an (ordered) + # set of tick type fields in the order which + # types arrived? + if ticks_in_order: + ticks_in_order.extend(ticks) + + return ticks_by_type diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 1884d018..610b38f3 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -39,20 +39,23 @@ from msgspec import field from ..accounting import ( MktPair, ) -from ..data.feed import ( +from ..data import ( open_feed, Feed, Flume, ) +from ..data.ticktools import ( + _tick_groups, + _auction_ticks, +) from ..data.types import Struct from ..data._sharedmem import ( ShmArray, ) from ..data._sampling import ( - _tick_groups, - _auction_ticks, open_sample_stream, ) +# from ..data._source import tf_in_1s from ._axes import YAxisLabel from ._chart import ( ChartPlotWidget, @@ -72,7 +75,6 @@ from ._forms import ( mk_order_pane_layout, ) from . import _pg_overrides as pgo -# from ..data._source import tf_in_1s from .order_mode import ( open_order_mode, OrderMode,