diff --git a/piker/calc.py b/piker/calc.py index 0cf42cf8..d5d8d4e1 100644 --- a/piker/calc.py +++ b/piker/calc.py @@ -20,10 +20,17 @@ Handy financial calculations. import math import itertools +from bidict import bidict + + +_mag2suffix = bidict({3: 'k', 6: 'M', 9: 'B'}) + def humanize( + number: float, digits: int = 1 + ) -> str: '''Convert large numbers to something with at most ``digits`` and a letter suffix (eg. k: thousand, M: million, B: billion). @@ -36,19 +43,38 @@ def humanize( if not number or number <= 0: return round(number, ndigits=digits) - mag2suffix = {3: 'k', 6: 'M', 9: 'B'} mag = math.floor(math.log(number, 10)) if mag < 3: return round(number, ndigits=digits) - maxmag = max(itertools.takewhile(lambda key: mag >= key, mag2suffix)) + maxmag = max(itertools.takewhile(lambda key: mag >= key, _mag2suffix)) return "{value}{suffix}".format( value=round(number/10**maxmag, ndigits=digits), - suffix=mag2suffix[maxmag], + suffix=_mag2suffix[maxmag], ) +def puterize( + + text: str, + digits: int = 1, + +) -> float: + '''Inverse of ``humanize()`` above. + + ''' + try: + suffix = str(text)[-1] + mult = _mag2suffix.inverse[suffix] + value = text.rstrip(suffix) + return round(float(value) * 10**mult, ndigits=digits) + + except KeyError: + # no matching suffix try just the value + return float(text) + + def pnl( init: float, diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 583e2509..8ff2b0e2 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -373,7 +373,9 @@ async def open_brokerd_trades_dialogue( broker = feed.mod.name # TODO: make a `tractor` bug/test for this! - # portal = feed._brokerd_portal + # if only i could member what the problem was.. + # probably some GC of the portal thing? + # portal = feed.portal # XXX: we must have our own portal + channel otherwise # when the data feed closes it may result in a half-closed diff --git a/piker/data/feed.py b/piker/data/feed.py index 9bfe95a9..bf003b9a 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -393,18 +393,23 @@ class Feed: shm: ShmArray mod: ModuleType first_quotes: dict # symbol names to first quote dicts - stream: trio.abc.ReceiveChannel[dict[str, Any]] - _brokerd_portal: tractor._portal.Portal + _portal: tractor.Portal + + stream: trio.abc.ReceiveChannel[dict[str, Any]] + throttle_rate: Optional[int] = None + _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None _max_sample_rate: int = 0 - search: Callable[..., Awaitable] = None - # cache of symbol info messages received as first message when # a stream startsc. symbols: dict[str, Symbol] = field(default_factory=dict) + @property + def portal(self) -> tractor.Portal: + return self._portal + async def receive(self) -> dict: return await self.stream.receive() @@ -418,7 +423,7 @@ class Feed: delay_s = delay_s or self._max_sample_rate async with open_sample_step_stream( - self._brokerd_portal, + self.portal, delay_s, ) as istream: yield istream @@ -526,7 +531,8 @@ async def open_feed( mod=mod, first_quotes=first_quotes, stream=stream, - _brokerd_portal=portal, + _portal=portal, + throttle_rate=tick_throttle, ) ohlc_sample_rates = [] diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index a6324bb6..f3f2d5de 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -18,14 +18,14 @@ Financial signal processing for the peeps. """ from functools import partial -from typing import AsyncIterator, Callable, Tuple +from typing import AsyncIterator, Callable, Tuple, Optional import trio from trio_typing import TaskStatus import tractor import numpy as np -from ..log import get_logger +from ..log import get_logger, get_console_log from .. import data from ._momo import _rsi, _wma from ._volume import _tina_vwap @@ -134,7 +134,7 @@ async def fsp_compute( # check for data length mis-allignment and fill missing values diff = len(src.array) - len(history) if diff >= 0: - print(f"WTF DIFF SIGNAL to HISTORY {diff}") + log.warning(f"WTF DIFF SIGNAL to HISTORY {diff}") for _ in range(diff): dst.push(history[:1]) @@ -149,6 +149,12 @@ async def fsp_compute( # rt stream async for processed in out_stream: + + # period = time.time() - last + # hz = 1/period if period else float('nan') + # if hz > 60: + # log.info(f'FSP quote too fast: {hz}') + log.debug(f"{fsp_func_name}: {processed}") index = src.index dst.array[-1][fsp_func_name] = processed @@ -165,12 +171,16 @@ async def cascade( dst_shm_token: Tuple[str, np.dtype], symbol: str, fsp_func_name: str, + loglevel: Optional[str] = None, ) -> None: - """Chain streaming signal processors and deliver output to + '''Chain streaming signal processors and deliver output to destination mem buf. - """ + ''' + if loglevel: + get_console_log(loglevel) + src = attach_shm_array(token=src_shm_token) dst = attach_shm_array(readonly=False, token=dst_shm_token) @@ -180,6 +190,10 @@ async def cascade( async with data.feed.maybe_open_feed( brokername, [symbol], + + # TODO: + # tick_throttle=60, + ) as (feed, stream): assert src.token == feed.shm.token diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index dd713a04..780262fc 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -276,6 +276,8 @@ class ChartnPane(QFrame): hbox.setContentsMargins(0, 0, 0, 0) hbox.setSpacing(3) + # self.setMaximumWidth() + class LinkedSplits(QWidget): ''' @@ -339,7 +341,8 @@ class LinkedSplits(QWidget): def set_split_sizes( self, - prop: float = 0.375 # proportion allocated to consumer subcharts + # prop: float = 0.375, # proportion allocated to consumer subcharts + prop: float = 5/8, ) -> None: '''Set the proportion of space allocated for linked subcharts. @@ -450,7 +453,6 @@ class LinkedSplits(QWidget): self.xaxis = xaxis qframe = ChartnPane(sidepane=sidepane, parent=self.splitter) - cpw = ChartPlotWidget( # this name will be used to register the primary @@ -522,10 +524,10 @@ class LinkedSplits(QWidget): # track by name self.subplots[name] = cpw - if sidepane: - # TODO: use a "panes" collection to manage this? - sidepane.setMinimumWidth(self.chart.sidepane.width()) - sidepane.setMaximumWidth(self.chart.sidepane.width()) + # if sidepane: + # # TODO: use a "panes" collection to manage this? + # qframe.setMaximumWidth(self.chart.sidepane.width()) + # qframe.setMinimumWidth(self.chart.sidepane.width()) self.splitter.addWidget(qframe) @@ -537,6 +539,16 @@ class LinkedSplits(QWidget): return cpw + def resize_sidepanes( + self, + ) -> None: + '''Size all sidepanes based on the OHLC "main" plot. + + ''' + for name, cpw in self.subplots.items(): + cpw.sidepane.setMinimumWidth(self.chart.sidepane.width()) + cpw.sidepane.setMaximumWidth(self.chart.sidepane.width()) + class ChartPlotWidget(pg.PlotWidget): ''' @@ -681,9 +693,9 @@ class ChartPlotWidget(pg.PlotWidget): """Return a range tuple for the bars present in view. """ l, r = self.view_range() - a = self._arrays['ohlc'] - lbar = max(l, a[0]['index']) - rbar = min(r, a[-1]['index']) + array = self._arrays['ohlc'] + lbar = max(l, array[0]['index']) + rbar = min(r, array[-1]['index']) return l, lbar, rbar, r def default_view( @@ -991,22 +1003,19 @@ class ChartPlotWidget(pg.PlotWidget): a = self._arrays['ohlc'] ifirst = a[0]['index'] bars = a[lbar - ifirst:rbar - ifirst + 1] - if not len(bars): # likely no data loaded yet or extreme scrolling? log.error(f"WTF bars_range = {lbar}:{rbar}") return - # TODO: should probably just have some kinda attr mark - # that determines this behavior based on array type - try: + if self.data_key != self.linked.symbol.key: + bars = a[self.data_key] + ylow = np.nanmin(bars) + yhigh = np.nanmax((bars)) + else: + # just the std ohlc bars ylow = np.nanmin(bars['low']) yhigh = np.nanmax(bars['high']) - except (IndexError, ValueError): - # likely non-ohlc array? - bars = bars[self.name] - ylow = np.nanmin(bars) - yhigh = np.nanmax(bars) if set_range: # view margins: stay within a % of the "true range" diff --git a/piker/ui/_display.py b/piker/ui/_display.py index dcc04f47..254fee76 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -18,6 +18,7 @@ Real-time display tasks for charting / graphics. ''' +from contextlib import asynccontextmanager import time from typing import Any from types import ModuleType @@ -264,7 +265,7 @@ async def chart_from_quotes( last_mx, last_mn = mx, mn -async def spawn_fsps( +async def fan_out_spawn_fsp_daemons( linkedsplits: LinkedSplits, fsps: dict[str, str], @@ -275,109 +276,93 @@ async def spawn_fsps( loglevel: str, ) -> None: - """Start financial signal processing in subactor. + '''Create financial signal processing sub-actors (under flat tree) + for each entry in config and attach to local graphics update tasks. Pass target entrypoint and historical data. - """ - + ''' linkedsplits.focus() uid = tractor.current_actor().uid # spawns sub-processes which execute cpu bound FSP code - async with tractor.open_nursery(loglevel=loglevel) as n: + async with ( + tractor.open_nursery() as n, + trio.open_nursery() as ln, + ): - # spawns local task that consume and chart data streams from - # sub-procs - async with trio.open_nursery() as ln: + # Currently we spawn an actor per fsp chain but + # likely we'll want to pool them eventually to + # scale horizonatlly once cores are used up. + for display_name, conf in fsps.items(): - # Currently we spawn an actor per fsp chain but - # likely we'll want to pool them eventually to - # scale horizonatlly once cores are used up. - for display_name, conf in fsps.items(): + fsp_func_name = conf['fsp_func_name'] - fsp_func_name = conf['fsp_func_name'] + # TODO: load function here and introspect + # return stream type(s) - # TODO: load function here and introspect - # return stream type(s) + # TODO: should `index` be a required internal field? + fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) - # TODO: should `index` be a required internal field? - fsp_dtype = np.dtype([('index', int), (fsp_func_name, float)]) + key = f'{sym}.fsp.{display_name}.{".".join(uid)}' - key = f'{sym}.fsp.{display_name}.{".".join(uid)}' + # this is all sync currently + shm, opened = maybe_open_shm_array( + key, + # TODO: create entry for each time frame + dtype=fsp_dtype, + readonly=True, + ) - # this is all sync currently - shm, opened = maybe_open_shm_array( - key, - # TODO: create entry for each time frame - dtype=fsp_dtype, - readonly=True, - ) + # XXX: fsp may have been opened by a duplicate chart. + # Error for now until we figure out how to wrap fsps as + # "feeds". assert opened, f"A chart for {key} likely + # already exists?" - # XXX: fsp may have been opened by a duplicate chart. - # Error for now until we figure out how to wrap fsps as - # "feeds". assert opened, f"A chart for {key} likely - # already exists?" + conf['shm'] = shm - conf['shm'] = shm + portal = await n.start_actor( + enable_modules=['piker.fsp'], + name='fsp.' + display_name, + ) - portal = await n.start_actor( - enable_modules=['piker.fsp'], - name='fsp.' + display_name, - ) + # init async + ln.start_soon( + run_fsp, + portal, + linkedsplits, + brokermod, + sym, + src_shm, + fsp_func_name, + display_name, + conf, + group_status_key, + loglevel, + ) - # init async - ln.start_soon( - run_fsp, - portal, - linkedsplits, - brokermod, - sym, - src_shm, - fsp_func_name, - display_name, - conf, - group_status_key, - ) - - # blocks here until all fsp actors complete + # blocks here until all fsp actors complete -async def run_fsp( +class FspConfig(BaseModel): + class Config: + validate_assignment = True - portal: tractor._portal.Portal, - linkedsplits: LinkedSplits, - brokermod: ModuleType, - sym: str, - src_shm: ShmArray, - fsp_func_name: str, + name: str + period: int + + +@asynccontextmanager +async def open_sidepane( + + linked: LinkedSplits, display_name: str, - conf: dict[str, Any], - group_status_key: str, -) -> None: - """FSP stream chart update loop. - - This is called once for each entry in the fsp - config map. - """ - done = linkedsplits.window().status_bar.open_status( - f'loading fsp, {display_name}..', - group_key=group_status_key, - ) - - # make sidepane config widget - class FspConfig(BaseModel): - - class Config: - validate_assignment = True - - name: str - period: int +) -> FspConfig: sidepane: FieldsForm = mk_form( - parent=linkedsplits.godwidget, + parent=linked.godwidget, fields_schema={ 'name': { 'label': '**fsp**:', @@ -386,6 +371,8 @@ async def run_fsp( f'{display_name}' ], }, + + # TODO: generate this from input map 'period': { 'label': '**period**:', 'type': 'edit', @@ -403,10 +390,46 @@ async def run_fsp( print(f'{key}: {value}') return True + # TODO: + async with ( + open_form_input_handling( + sidepane, + focus_next=linked.godwidget, + on_value_change=settings_change, + ) + ): + yield sidepane + + +async def run_fsp( + + portal: tractor._portal.Portal, + linkedsplits: LinkedSplits, + brokermod: ModuleType, + sym: str, + src_shm: ShmArray, + fsp_func_name: str, + display_name: str, + conf: dict[str, Any], + group_status_key: str, + loglevel: str, + +) -> None: + '''FSP stream chart update loop. + + This is called once for each entry in the fsp + config map. + + ''' + done = linkedsplits.window().status_bar.open_status( + f'loading fsp, {display_name}..', + group_key=group_status_key, + ) + async with ( portal.open_stream_from( - # subactor entrypoint + # chaining entrypoint fsp.cascade, # name as title of sub-chart @@ -415,15 +438,14 @@ async def run_fsp( dst_shm_token=conf['shm'].token, symbol=sym, fsp_func_name=fsp_func_name, + loglevel=loglevel, ) as stream, - # TODO: - open_form_input_handling( - sidepane, - focus_next=linkedsplits.godwidget, - on_value_change=settings_change, - ), + open_sidepane( + linkedsplits, + display_name, + ) as sidepane, ): # receive last index for processed historical @@ -472,7 +494,7 @@ async def run_fsp( # read from last calculated value array = shm.array - # XXX: fsp func names are unique meaning we don't have + # XXX: fsp func names must be unique meaning we don't have # duplicates of the underlying data even if multiple # sub-charts reference it under different 'named charts'. value = array[fsp_func_name][-1] @@ -489,6 +511,8 @@ async def run_fsp( array_key=fsp_func_name ) + chart.linked.resize_sidepanes() + # TODO: figure out if we can roll our own `FillToThreshold` to # get brush filled polygons for OS/OB conditions. # ``pg.FillBetweenItems`` seems to be one technique using @@ -622,6 +646,73 @@ async def check_for_new_bars(feed, ohlcv, linkedsplits): price_chart.increment_view() +def has_vlm(ohlcv: ShmArray) -> bool: + # make sure that the instrument supports volume history + # (sometimes this is not the case for some commodities and + # derivatives) + volm = ohlcv.array['volume'] + return not bool(np.all(np.isin(volm, -1)) or np.all(np.isnan(volm))) + + +@asynccontextmanager +async def maybe_open_vlm_display( + + linked: LinkedSplits, + ohlcv: ShmArray, + +) -> ChartPlotWidget: + + # make sure that the instrument supports volume history + # (sometimes this is not the case for some commodities and + # derivatives) + # volm = ohlcv.array['volume'] + # if ( + # np.all(np.isin(volm, -1)) or + # np.all(np.isnan(volm)) + # ): + if not has_vlm(ohlcv): + log.warning(f"{linked.symbol.key} does not seem to have volume info") + else: + async with open_sidepane(linked, 'volume') as sidepane: + # built-in $vlm + shm = ohlcv + chart = linked.add_plot( + name='vlm', + array=shm.array, + + array_key='volume', + sidepane=sidepane, + + # curve by default + ohlc=False, + + # vertical bars + # stepMode=True, + # static_yrange=(0, 100), + ) + + # XXX: ONLY for sub-chart fsps, overlays have their + # data looked up from the chart's internal array set. + # TODO: we must get a data view api going STAT!! + chart._shm = shm + + # should **not** be the same sub-chart widget + assert chart.name != linked.chart.name + + # sticky only on sub-charts atm + last_val_sticky = chart._ysticks[chart.name] + + # read from last calculated value + value = shm.array['volume'][-1] + + last_val_sticky.update_from_data(-1, value) + + # size view to data once at outset + chart._set_yrange() + + yield chart + + async def display_symbol_data( godwidget: GodWidget, @@ -686,6 +777,7 @@ async def display_symbol_data( # add as next-to-y-axis singleton pane godwidget.pp_pane = pp_pane + # create main OHLC chart chart = linkedsplits.plot_ohlc_main( symbol, bars, @@ -722,7 +814,7 @@ async def display_symbol_data( 'static_yrange': (0, 100), }, }, - # test for duplicate fsps on same chart + # # test for duplicate fsps on same chart # 'rsi2': { # 'fsp_func_name': 'rsi', # 'period': 14, @@ -733,18 +825,8 @@ async def display_symbol_data( } - # make sure that the instrument supports volume history - # (sometimes this is not the case for some commodities and - # derivatives) - volm = ohlcv.array['volume'] - if ( - np.all(np.isin(volm, -1)) or - np.all(np.isnan(volm)) - ): - log.warning( - f"{sym} does not seem to have volume info," - " dropping volume signals") - else: + if has_vlm(ohlcv): + # add VWAP to fsp config for downstream loading fsp_conf.update({ 'vwap': { 'fsp_func_name': 'vwap', @@ -756,11 +838,10 @@ async def display_symbol_data( async with ( trio.open_nursery() as ln, - ): # load initial fsp chain (otherwise known as "indicators") ln.start_soon( - spawn_fsps, + fan_out_spawn_fsp_daemons, linkedsplits, fsp_conf, sym, @@ -787,6 +868,7 @@ async def display_symbol_data( ) async with ( + maybe_open_vlm_display(linkedsplits, ohlcv), open_order_mode( feed, diff --git a/piker/ui/_forms.py b/piker/ui/_forms.py index 5aed6f9c..38184ca4 100644 --- a/piker/ui/_forms.py +++ b/piker/ui/_forms.py @@ -21,7 +21,6 @@ Text entry "forms" widgets (mostly for configuration and UI user input). from __future__ import annotations from contextlib import asynccontextmanager from functools import partial -from textwrap import dedent from typing import ( Optional, Any, Callable, Awaitable ) @@ -320,14 +319,14 @@ class FieldsForm(QWidget): self.vbox = QVBoxLayout(self) # self.vbox.setAlignment(Qt.AlignVCenter) self.vbox.setAlignment(Qt.AlignBottom) - self.vbox.setContentsMargins(0, 4, 3, 6) + self.vbox.setContentsMargins(3, 6, 3, 6) self.vbox.setSpacing(0) # split layout for the (