From 1bd0ee87467859a702e728a680fd893e3d2a67b7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 18 May 2021 11:22:29 -0400 Subject: [PATCH] Support loading multi-brokerds search at startup --- piker/data/feed.py | 62 ++++++++++++++++++++++++-------- piker/ui/_chart.py | 89 +++++++++++++++++++++++++++++++++++----------- 2 files changed, 115 insertions(+), 36 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index ce26d6a2..5a78c740 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -384,6 +384,32 @@ def sym_to_shm_key( return f'{broker}.{symbol}' +@asynccontextmanager +async def install_brokerd_search( + portal: tractor._portal.Portal, + brokermod: ModuleType, +) -> None: + async with portal.open_context( + brokermod.open_symbol_search + ) as (ctx, cache): + + # shield here since we expect the search rpc to be + # cancellable by the user as they see fit. + async with ctx.open_stream() as stream: + + async def search(text: str) -> Dict[str, Any]: + await stream.send(text) + return await stream.receive() + + async with _search.register_symbol_search( + provider_name=brokermod.name, + search_routine=search, + pause_period=brokermod._search_conf.get('pause_period'), + + ): + yield + + @asynccontextmanager async def open_feed( brokername: str, @@ -475,22 +501,28 @@ async def open_feed( yield feed else: - async with feed._brokerd_portal.open_context( - mod.open_symbol_search - ) as (ctx, cache): + async with install_brokerd_search( + feed._brokerd_portal, + mod, + ): + yield feed - # shield here since we expect the search rpc to be - # cancellable by the user as they see fit. - async with ctx.open_stream() as stream: + # async with feed._brokerd_portal.open_context( + # mod.open_symbol_search + # ) as (ctx, cache): - async def search(text: str) -> Dict[str, Any]: - await stream.send(text) - return await stream.receive() + # # shield here since we expect the search rpc to be + # # cancellable by the user as they see fit. + # async with ctx.open_stream() as stream: - async with _search.register_symbol_search( - provider_name=brokername, - search_routine=search, - pause_period=mod._search_conf.get('pause_period'), + # async def search(text: str) -> Dict[str, Any]: + # await stream.send(text) + # return await stream.receive() - ): - yield feed + # async with _search.register_symbol_search( + # provider_name=brokername, + # search_routine=search, + # pause_period=mod._search_conf.get('pause_period'), + + # ): + # yield feed diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index a0e2628e..427add22 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -22,6 +22,7 @@ import time from typing import Tuple, Dict, Any, Optional, Callable from types import ModuleType from functools import partial +from contextlib import AsyncExitStack from PyQt5 import QtCore, QtGui from PyQt5.QtCore import Qt @@ -30,6 +31,10 @@ import pyqtgraph as pg import tractor import trio +from .._daemon import ( + maybe_spawn_brokerd, +) +from ..brokers import get_brokermod from ._axes import ( DynamicDateAxis, PriceAxis, @@ -67,6 +72,7 @@ from ._exec import run_qtractor, current_screen from ._interaction import ChartView from .order_mode import start_order_mode from .. import fsp +from ..data import feed log = get_logger(__name__) @@ -104,6 +110,19 @@ class ChartSpace(QtGui.QWidget): self._root_n: Optional[trio.Nursery] = None + def set_chart_symbol( + self, + symbol_key: str, # of form . + linked_charts: 'LinkedSplitCharts', # type: ignore + ) -> None: + self._chart_cache[symbol_key] = linked_charts + + def get_chart_symbol( + self, + symbol_key: str, + ) -> 'LinkedSplitCharts': # type: ignore + return self._chart_cache.get(symbol_key) + def init_timeframes_ui(self): self.tf_layout = QtGui.QHBoxLayout() self.tf_layout.setSpacing(0) @@ -128,7 +147,7 @@ class ChartSpace(QtGui.QWidget): def load_symbol( self, - brokername: str, + providername: str, symbol_key: str, loglevel: str, ohlc: bool = True, @@ -142,7 +161,10 @@ class ChartSpace(QtGui.QWidget): # our symbol key style is always lower case symbol_key = symbol_key.lower() - linkedcharts = self._chart_cache.get(symbol_key) + # fully qualified symbol name (SNS i guess is what we're making?) + fqsn = '.'.join([symbol_key, providername]) + + linkedcharts = self.get_chart_symbol(fqsn) if not self.vbox.isEmpty(): # XXX: this is CRITICAL especially with pixel buffer caching @@ -162,13 +184,13 @@ class ChartSpace(QtGui.QWidget): self._root_n.start_soon( chart_symbol, self, - brokername, + providername, symbol_key, loglevel, ) self.vbox.addWidget(linkedcharts) - self._chart_cache[symbol_key] = linkedcharts + self.set_chart_symbol(fqsn, linkedcharts) # chart is already in memory so just focus it if self.linkedcharts: @@ -1619,28 +1641,53 @@ async def _async_main( # this internally starts a ``chart_symbol()`` task above chart_app.load_symbol(brokernames[0], sym, loglevel) - async with _search.register_symbol_search( + # TODO: seems like our incentive for brokerd caching lelel + backends = {} - provider_name='cache', - search_routine=partial( - _search.search_simple_dict, - source=chart_app._chart_cache, - ), + async with AsyncExitStack() as stack: - ): - async with open_key_stream( - search.bar, - ) as key_stream: + # TODO: spawn these async in nursery. - # start kb handling task for searcher - root_n.start_soon( - _search.handle_keyboard_input, - # chart_app, - search, - key_stream, + # load all requested brokerd's at startup and load their + # search engines. + for broker in brokernames: + portal = await stack.enter_async_context( + maybe_spawn_brokerd( + broker, + loglevel=loglevel + ) ) - await trio.sleep_forever() + backends[broker] = portal + await stack.enter_async_context( + feed.install_brokerd_search( + portal, + get_brokermod(broker), + ) + ) + + async with _search.register_symbol_search( + + provider_name='cache', + search_routine=partial( + _search.search_simple_dict, + source=chart_app._chart_cache, + ), + + ): + async with open_key_stream( + search.bar, + ) as key_stream: + + # start kb handling task for searcher + root_n.start_soon( + _search.handle_keyboard_input, + # chart_app, + search, + key_stream, + ) + + await trio.sleep_forever() def _main(