diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 6cc8ecb1..7af09d38 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -25,6 +25,7 @@ import numpy as np import pyqtgraph as pg import tractor import trio +from trio_typing import TaskStatus from ._axes import ( DynamicDateAxis, @@ -407,10 +408,6 @@ class ChartPlotWidget(pg.PlotWidget): self.default_view() - # TODO: stick in config - # use cross-hair for cursor? - # self.setCursor(QtCore.Qt.CrossCursor) - # Assign callback for rescaling y-axis automatically # based on data contents and ``ViewBox`` state. self.sigXRangeChanged.connect(self._set_yrange) @@ -821,6 +818,73 @@ class ChartPlotWidget(pg.PlotWidget): self.scene().leaveEvent(ev) +_to_router: trio.abc.SendChannel = None +_from_ui: trio.abc.ReceiveChannel = None + + +# TODO: make this a ``tractor.msg.pub`` +async def stream_orders(): + """Order streaming task: deliver orders transmitted from UI + to downstream consumers. + + This is run in the UI actor (usually the one running Qt). + The UI simply delivers order messages to the above ``_to_router`` + send channel (from sync code using ``.send_nowait()``), these values + are pulled from the channel here and send to any consumer(s). + + """ + global _from_ui + + async for order in _from_ui: + yield order + + +async def stream_and_route(ui_name): + """Order router actor entrypoint. + + """ + actor = tractor.current_actor() + + # new router entry point + async with tractor.wait_for_actor(ui_name) as portal: + async for order in await portal.run(stream_orders): + print(f'order {order} received in {actor.uid}') + + # push order back to parent as an "alert" + # (mocking for eg. a "fill") + yield order + + +async def spawn_router_stream_alerts( + ident: str, + task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, +) -> None: + + # setup local ui event streaming channels + global _from_ui, _to_router + _to_router, _from_ui = trio.open_memory_channel(100) + + actor = tractor.current_actor() + subactor_name = ident + '.router' + + async with tractor.open_nursery() as n: + + portal = await n.start_actor( + subactor_name, + rpc_module_paths=[__name__], + ) + stream = await portal.run( + stream_and_route, + ui_name=actor.name + ) + + # let parent task continue + task_status.started(subactor_name) + + async for alert in stream: + print(f'alert {alert} received in {actor.uid}') + + async def _async_main( sym: str, brokername: str, @@ -906,6 +970,17 @@ async def _async_main( async with trio.open_nursery() as n: + router_name = await n.start( + spawn_router_stream_alerts, + sym, + ) + + # wait for router to come up before setting + # enabling send channel on chart + async with tractor.wait_for_actor(router_name): + global _to_router + linked_charts._to_router = _to_router + # load initial fsp chain (otherwise known as "indicators") n.start_soon( spawn_fsps, diff --git a/piker/ui/cli.py b/piker/ui/cli.py index e14ef3f6..0adeaf5a 100644 --- a/piker/ui/cli.py +++ b/piker/ui/cli.py @@ -150,5 +150,6 @@ def chart(config, symbol, date, rate, test, profile): tractor_kwargs={ 'debug_mode': True, 'loglevel': tractorloglevel, + 'rpc_module_paths': ['piker.ui._chart'], }, )