From a7cee86feade5b66571fbf4deba884433650829b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 14 Apr 2021 11:01:00 -0400 Subject: [PATCH 01/16] Just de-lint imports --- piker/cli/__init__.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 7dab2a28..2ca5a1a6 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -37,7 +37,7 @@ _context_defaults = dict( def pikerd(loglevel, host, tl, pdb): """Spawn the piker broker-daemon. """ - from .._daemon import _data_mods, open_pikerd + from .._daemon import open_pikerd log = get_console_log(loglevel) if pdb: @@ -112,11 +112,11 @@ def services(config, tl, names): def _load_clis() -> None: - from ..data import marketstore as _ - from ..data import cli as _ - from ..brokers import cli as _ # noqa - from ..ui import cli as _ # noqa - from ..watchlists import cli as _ # noqa + from ..data import marketstore # noqa + from ..data import cli # noqa + from ..brokers import cli # noqa + from ..ui import cli # noqa + from ..watchlists import cli # noqa # load downstream cli modules From 9c3f8ff0504f3a0737a91c9540d218d31807aa31 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 14 Apr 2021 11:01:43 -0400 Subject: [PATCH 02/16] Only do context unsubs in main feed bus path --- piker/data/_sampling.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 40951697..c0f47d8e 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -236,5 +236,8 @@ async def sample_and_broadcast( trio.BrokenResourceError, trio.ClosedResourceError ): - subs.remove(ctx) + # XXX: do we need to deregister here + # if it's done in the fee bus code? + # so far seems like no since this should all + # be single-threaded. log.error(f'{ctx.chan.uid} dropped connection') From 7b8c48271982f8b4778bc718b942b6ac065de2f8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 23 Apr 2021 11:12:54 -0400 Subject: [PATCH 03/16] Add reconnect logic help link --- piker/brokers/kraken.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 824b3710..8f83c960 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -341,6 +341,10 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: class AutoReconWs: """Make ``trio_websocketw` sockets stay up no matter the bs. + TODO: + apply any more insights from this: + https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds + """ recon_errors = ( ConnectionClosed, From ecd94ce24d9d17b65883fa5dcde15aaffce8493b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 23 Apr 2021 11:14:08 -0400 Subject: [PATCH 04/16] Better indexing logic? --- piker/ui/_style.py | 2 +- piker/ui/order_mode.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/ui/_style.py b/piker/ui/_style.py index 7284a9d1..322247ec 100644 --- a/piker/ui/_style.py +++ b/piker/ui/_style.py @@ -79,7 +79,7 @@ class DpiAwareFont: return self._qfont @property - def px_size(self): + def px_size(self) -> int: return self._qfont.pixelSize() def configure_to_dpi(self, screen: Optional[QtGui.QScreen] = None): diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 29efcd0f..a3f01e9f 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -337,7 +337,7 @@ async def start_order_mode( indexes = ohlc['time'] >= time if any(indexes): - return ohlc['index'][indexes[-1]] + return ohlc['index'][indexes][-1] else: return ohlc['index'][-1] From 4d03d626412ac281a6a531ac63f59d2e75c3d81f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 29 Apr 2021 08:17:40 -0400 Subject: [PATCH 05/16] Don't submit limits on alerts --- piker/clearing/_ems.py | 49 +++++++++++++++++++++++++----------------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 73ca9ee1..87d41bd3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -179,31 +179,40 @@ async def execute_triggers( # majority of iterations will be non-matches continue - # submit_price = price + price*percent_away - submit_price = price + abs_diff_away + action = cmd['action'] - log.info( - f'Dark order triggered for price {price}\n' - f'Submitting order @ price {submit_price}') + if action != 'alert': + # executable order submission - reqid = await client.submit_limit( - oid=oid, + # submit_price = price + price*percent_away + submit_price = price + abs_diff_away - # this is a brand new order request for the - # underlying broker so we set out "broker request - # id" (brid) as nothing so that the broker - # client knows that we aren't trying to modify - # an existing order. - brid=None, + log.info( + f'Dark order triggered for price {price}\n' + f'Submitting order @ price {submit_price}') - symbol=sym, - action=cmd['action'], - price=submit_price, - size=cmd['size'], - ) + reqid = await client.submit_limit( + oid=oid, - # register broker request id to ems id - book._broker2ems_ids[reqid] = oid + # this is a brand new order request for the + # underlying broker so we set out "broker request + # id" (brid) as nothing so that the broker + # client knows that we aren't trying to modify + # an existing order. + brid=None, + + symbol=sym, + action=cmd['action'], + price=submit_price, + size=cmd['size'], + ) + + # register broker request id to ems id + book._broker2ems_ids[reqid] = oid + + else: + # alerts have no broker request id + reqid = '' resp = { 'resp': 'dark_executed', From 6e70bc4fa7ab24b585ce73066b308d49bfb9cd7a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 29 Apr 2021 09:04:10 -0400 Subject: [PATCH 06/16] New tractor non-default port --- piker/_daemon.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/_daemon.py b/piker/_daemon.py index a894f1a5..bdf7a7fb 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -76,6 +76,7 @@ async def open_pikerd( # XXX: this may open a root actor as well async with tractor.open_root_actor( # passed through to ``open_root_actor`` + arbiter_addr=('127.0.0.1', 6116), name=_root_dname, loglevel=loglevel, debug_mode=debug_mode, From 9b59471dc1b982c48825ec5a9b079e9f5f0171cb Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 10 May 2021 10:15:29 -0400 Subject: [PATCH 07/16] Pass debug flag down to any daemons --- piker/_daemon.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/_daemon.py b/piker/_daemon.py index bdf7a7fb..1d98eee4 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -141,6 +141,7 @@ async def maybe_open_pikerd( # presume pikerd role async with open_pikerd( loglevel=loglevel, + debug_mode=kwargs.get('debug_mode', False), ) as _: # in the case where we're starting up the # tractor-piker runtime stack in **this** process From 2635ade90864f040052fb192a52ddeeb15e7d80e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 12 May 2021 08:58:04 -0400 Subject: [PATCH 08/16] Reminder to remove msgpack-numpy --- piker/__init__.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/piker/__init__.py b/piker/__init__.py index 75ec8ded..c84bfac2 100644 --- a/piker/__init__.py +++ b/piker/__init__.py @@ -19,6 +19,8 @@ piker: trading gear for hackers. """ import msgpack # noqa + +# TODO: remove this now right? import msgpack_numpy # patch msgpack for numpy arrays From 0b36bacfb6b2ae6f10afb812f7a177cdf65d1534 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 19 May 2021 16:39:59 -0400 Subject: [PATCH 09/16] Avoid weird `pydantic` runtime warning --- piker/data/_sampling.py | 2 +- piker/data/feed.py | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index c0f47d8e..2079eb71 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -227,7 +227,7 @@ async def sample_and_broadcast( # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. - subs = bus.subscribers[sym] + subs = bus._subscribers[sym] for ctx in subs: # print(f'sub is {ctx.chan.uid}') try: diff --git a/piker/data/feed.py b/piker/data/feed.py index 9455f7ae..5d39c7c4 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -67,11 +67,19 @@ class _FeedsBus(BaseModel): brokername: str nursery: trio.Nursery feeds: Dict[str, trio.CancelScope] = {} - subscribers: Dict[str, List[tractor.Context]] = {} + task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() + # XXX: so weird but, apparently without this being `._` private + # pydantic will complain about private `tractor.Context` instance + # vars (namely `._portal` and `._cancel_scope`) at import time. + # Reported this bug: + # https://github.com/samuelcolvin/pydantic/issues/2816 + _subscribers: Dict[str, List[tractor.Context]] = {} + class Config: arbitrary_types_allowed = True + underscore_attrs_are_private = False async def cancel_all(self) -> None: for sym, (cs, msg, quote) in self.feeds.items(): @@ -256,7 +264,7 @@ async def attach_feed_bus( loglevel=loglevel, ) ) - bus.subscribers.setdefault(symbol, []).append(ctx) + bus._subscribers.setdefault(symbol, []).append(ctx) else: sub_only = True @@ -269,12 +277,12 @@ async def attach_feed_bus( await ctx.send_yield(first_quote) if sub_only: - bus.subscribers[symbol].append(ctx) + bus._subscribers[symbol].append(ctx) try: await trio.sleep_forever() finally: - bus.subscribers[symbol].remove(ctx) + bus._subscribers[symbol].remove(ctx) @dataclass From 435e005d6e9209b75840539509e4158fcd6bf9c7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 20 May 2021 16:08:24 -0400 Subject: [PATCH 10/16] Drop quantdum legend item --- piker/ui/quantdom/charts.py | 36 +----------------------------------- 1 file changed, 1 insertion(+), 35 deletions(-) diff --git a/piker/ui/quantdom/charts.py b/piker/ui/quantdom/charts.py index e2da8586..d35f0742 100644 --- a/piker/ui/quantdom/charts.py +++ b/piker/ui/quantdom/charts.py @@ -5,41 +5,7 @@ import pyqtgraph as pg from PyQt5 import QtCore, QtGui -class SampleLegendItem(pg.graphicsItems.LegendItem.ItemSample): - - def paint(self, p, *args): - p.setRenderHint(p.Antialiasing) - if isinstance(self.item, tuple): - positive = self.item[0].opts - negative = self.item[1].opts - p.setPen(pg.mkPen(positive['pen'])) - p.setBrush(pg.mkBrush(positive['brush'])) - p.drawPolygon( - QtGui.QPolygonF( - [ - QtCore.QPointF(0, 0), - QtCore.QPointF(18, 0), - QtCore.QPointF(18, 18), - ] - ) - ) - p.setPen(pg.mkPen(negative['pen'])) - p.setBrush(pg.mkBrush(negative['brush'])) - p.drawPolygon( - QtGui.QPolygonF( - [ - QtCore.QPointF(0, 0), - QtCore.QPointF(0, 18), - QtCore.QPointF(18, 18), - ] - ) - ) - else: - opts = self.item.opts - p.setPen(pg.mkPen(opts['pen'])) - p.drawRect(0, 10, 18, 0.5) - - +# TODO: test this out as our help menu class CenteredTextItem(QtGui.QGraphicsTextItem): def __init__( self, From dcc60524cb8660eb4fff540696e796d22bbffbc9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sun, 23 May 2021 10:53:57 -0400 Subject: [PATCH 11/16] Add remote context allocation api to service daemon This allows for more deterministically managing long running sub-daemon services under `pikerd` using the new context api from `tractor`. The contexts are allocated in an async exit stack and torn down at root daemon termination. Spawn brokerds using this method by changing the persistence entry point to be a `@tractor.context`. --- piker/_daemon.py | 72 +++++++++++++++++++++++++++++++++++----------- piker/data/feed.py | 13 +++++++-- 2 files changed, 66 insertions(+), 19 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 1d98eee4..bf3f62a2 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -19,8 +19,9 @@ Structured, daemon tree service management. """ from functools import partial -from typing import Optional, Union -from contextlib import asynccontextmanager +from typing import Optional, Union, Callable +from contextlib import asynccontextmanager, AsyncExitStack +from collections import defaultdict from pydantic import BaseModel import trio @@ -44,10 +45,34 @@ class Services(BaseModel): actor_n: tractor._trionics.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag + ctx_stack: AsyncExitStack class Config: arbitrary_types_allowed = True + async def open_remote_ctx( + self, + portal: tractor.Portal, + target: Callable, + **kwargs, + + ) -> tractor.Context: + ''' + Open a context in a service sub-actor, add to a stack + that gets unwound at ``pikerd`` tearodwn. + + This allows for allocating long-running sub-services in our main + daemon and explicitly controlling their lifetimes. + + ''' + ctx, first = await self.ctx_stack.enter_async_context( + portal.open_context( + target, + **kwargs, + ) + ) + return ctx + _services: Optional[Services] = None @@ -62,14 +87,14 @@ async def open_pikerd( debug_mode: bool = False, ) -> Optional[tractor._portal.Portal]: - """ + ''' Start a root piker daemon who's lifetime extends indefinitely until cancelled. A root actor nursery is created which can be used to create and keep alive underling services (see below). - """ + ''' global _services assert _services is None @@ -91,14 +116,18 @@ async def open_pikerd( ) as _, tractor.open_nursery() as actor_nursery: async with trio.open_nursery() as service_nursery: - # assign globally for future daemon/task creation - _services = Services( - actor_n=actor_nursery, - service_n=service_nursery, - debug_mode=debug_mode, - ) + # setup service mngr singleton instance + async with AsyncExitStack() as stack: - yield _services + # assign globally for future daemon/task creation + _services = Services( + actor_n=actor_nursery, + service_n=service_nursery, + debug_mode=debug_mode, + ctx_stack=stack, + ) + + yield _services @asynccontextmanager @@ -195,17 +224,19 @@ async def spawn_brokerd( # call with and then have the ability to unwind the call whenevs. # non-blocking setup of brokerd service nursery - _services.service_n.start_soon( - partial( - portal.run, - _setup_persistent_brokerd, - brokername=brokername, - ) + await _services.open_remote_ctx( + portal, + _setup_persistent_brokerd, + brokername=brokername, ) return dname +class Brokerd: + locks = defaultdict(trio.Lock) + + @asynccontextmanager async def maybe_spawn_brokerd( @@ -224,9 +255,15 @@ async def maybe_spawn_brokerd( dname = f'brokerd.{brokername}' + # serialize access to this section to avoid + # 2 or more tasks racing to create a daemon + lock = Brokerd.locks[brokername] + await lock.acquire() + # attach to existing brokerd if possible async with tractor.find_actor(dname) as portal: if portal is not None: + lock.release() yield portal return @@ -251,6 +288,7 @@ async def maybe_spawn_brokerd( ) async with tractor.wait_for_actor(dname) as portal: + lock.release() yield portal diff --git a/piker/data/feed.py b/piker/data/feed.py index 5d39c7c4..76c2bc23 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -116,7 +116,11 @@ def get_feed_bus( return _bus -async def _setup_persistent_brokerd(brokername: str) -> None: +@tractor.context +async def _setup_persistent_brokerd( + ctx: tractor.Context, + brokername: str +) -> None: """Allocate a actor-wide service nursery in ``brokerd`` such that feeds can be run in the background persistently by the broker backend as needed. @@ -129,6 +133,9 @@ async def _setup_persistent_brokerd(brokername: str) -> None: # background tasks from clients bus = get_feed_bus(brokername, service_nursery) + # unblock caller + await ctx.started() + # we pin this task to keep the feeds manager active until the # parent actor decides to tear it down await trio.sleep_forever() @@ -232,7 +239,7 @@ async def attach_feed_bus( brokername: str, symbol: str, loglevel: str, -): +) -> None: # try: if loglevel is None: @@ -274,6 +281,8 @@ async def attach_feed_bus( # send this even to subscribers to existing feed? await ctx.send_yield(init_msg) + + # deliver a first quote asap await ctx.send_yield(first_quote) if sub_only: From 74b4b4b1308d8186710d9a676feaf348298c82e8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 24 May 2021 08:32:25 -0400 Subject: [PATCH 12/16] Expose registry port as dict --- piker/_daemon.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index bf3f62a2..30fdfec2 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -19,7 +19,7 @@ Structured, daemon tree service management. """ from functools import partial -from typing import Optional, Union, Callable +from typing import Optional, Union, Callable, Any from contextlib import asynccontextmanager, AsyncExitStack from collections import defaultdict @@ -34,6 +34,10 @@ from .brokers import get_brokermod log = get_logger(__name__) _root_dname = 'pikerd' +_tractor_kwargs: dict[str, Any] = { + # use a different registry addr then tractor's default + 'arbiter_addr': ('127.0.0.1', 6116), +} _root_modules = [ __name__, 'piker.clearing._ems', @@ -101,7 +105,7 @@ async def open_pikerd( # XXX: this may open a root actor as well async with tractor.open_root_actor( # passed through to ``open_root_actor`` - arbiter_addr=('127.0.0.1', 6116), + arbiter_addr=_tractor_kwargs['arbiter_addr'], name=_root_dname, loglevel=loglevel, debug_mode=debug_mode, From f6f4a0cd8daf804442acd9a97f77340ed450ed61 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 24 May 2021 08:32:49 -0400 Subject: [PATCH 13/16] Use tractor settings for qtractor --- piker/ui/_exec.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index 8a48943d..53de8554 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -42,7 +42,7 @@ import trio import tractor from outcome import Error -from .._daemon import maybe_open_pikerd +from .._daemon import maybe_open_pikerd, _tractor_kwargs from ..log import get_logger from ._pg_overrides import _do_overrides @@ -196,6 +196,10 @@ def run_qtractor( 'main': instance, } + + # override tractor's defaults + tractor_kwargs.update(_tractor_kwargs) + # define tractor entrypoint async def main(): From efd93d058a69cf44c24c8459ba85122f003d2811 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 24 May 2021 08:47:30 -0400 Subject: [PATCH 14/16] Breakout fsp rt loop as non-closure for readability --- piker/fsp/__init__.py | 184 +++++++++++++++++++++++++----------------- 1 file changed, 109 insertions(+), 75 deletions(-) diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 2345b516..312a0cef 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -17,6 +17,7 @@ """ Financial signal processing for the peeps. """ +from functools import partial from typing import AsyncIterator, Callable, Tuple import trio @@ -29,6 +30,8 @@ from .. import data from ._momo import _rsi, _wma from ._volume import _tina_vwap from ..data import attach_shm_array +from ..data.feed import Feed +from ..data._sharedmem import ShmArray log = get_logger(__name__) @@ -62,6 +65,97 @@ async def latency( yield value +async def fsp_compute( + ctx: tractor.Context, + symbol: str, + feed: Feed, + + src: ShmArray, + dst: ShmArray, + + fsp_func_name: str, + func: Callable, + + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + +) -> None: + + # TODO: load appropriate fsp with input args + + async def filter_by_sym( + sym: str, + stream, + ): + + # TODO: make this the actualy first quote from feed + # XXX: this allows for a single iteration to run for history + # processing without waiting on the real-time feed for a new quote + yield {} + + # task cancellation won't kill the channel + with stream.shield(): + async for quotes in stream: + for symbol, quotes in quotes.items(): + if symbol == sym: + yield quotes + + out_stream = func( + filter_by_sym(symbol, feed.stream), + feed.shm, + ) + + # TODO: XXX: + # THERE'S A BIG BUG HERE WITH THE `index` field since we're + # prepending a copy of the first value a few times to make + # sub-curves align with the parent bar chart. + # This likely needs to be fixed either by, + # - manually assigning the index and historical data + # seperately to the shm array (i.e. not using .push()) + # - developing some system on top of the shared mem array that + # is `index` aware such that historical data can be indexed + # relative to the true first datum? Not sure if this is sane + # for incremental compuations. + dst._first.value = src._first.value + dst._last.value = src._first.value + + # Conduct a single iteration of fsp with historical bars input + # and get historical output + history_output = await out_stream.__anext__() + + # build a struct array which includes an 'index' field to push + # as history + history = np.array( + np.arange(len(history_output)), + dtype=dst.array.dtype + ) + history[fsp_func_name] = history_output + + # check for data length mis-allignment and fill missing values + diff = len(src.array) - len(history) + if diff >= 0: + print(f"WTF DIFF SIGNAL to HISTORY {diff}") + for _ in range(diff): + dst.push(history[:1]) + + # compare with source signal and time align + index = dst.push(history) + + await ctx.send_yield(index) + + # setup a respawn handle + with trio.CancelScope() as cs: + task_status.started(cs) + + # rt stream + async for processed in out_stream: + log.debug(f"{fsp_func_name}: {processed}") + index = src.index + dst.array[-1][fsp_func_name] = processed + + # stream latest shm array index entry + await ctx.send_yield(index) + + @tractor.stream async def cascade( ctx: tractor.Context, @@ -85,84 +179,24 @@ async def cascade( assert src.token == feed.shm.token - async def fsp_compute( - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, - ) -> None: - - # TODO: load appropriate fsp with input args - - async def filter_by_sym( - sym: str, - stream, - ): - # task cancellation won't kill the channel - with stream.shield(): - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes - - out_stream = func( - filter_by_sym(symbol, feed.stream), - feed.shm, - ) - - # TODO: XXX: - # THERE'S A BIG BUG HERE WITH THE `index` field since we're - # prepending a copy of the first value a few times to make - # sub-curves align with the parent bar chart. - # This likely needs to be fixed either by, - # - manually assigning the index and historical data - # seperately to the shm array (i.e. not using .push()) - # - developing some system on top of the shared mem array that - # is `index` aware such that historical data can be indexed - # relative to the true first datum? Not sure if this is sane - # for incremental compuations. - dst._first.value = src._first.value - dst._last.value = src._first.value - - # Conduct a single iteration of fsp with historical bars input - # and get historical output - history_output = await out_stream.__anext__() - - # build a struct array which includes an 'index' field to push - # as history - history = np.array( - np.arange(len(history_output)), - dtype=dst.array.dtype - ) - history[fsp_func_name] = history_output - - # check for data length mis-allignment and fill missing values - diff = len(src.array) - len(history) - if diff >= 0: - print(f"WTF DIFF SIGNAL to HISTORY {diff}") - for _ in range(diff): - dst.push(history[:1]) - - # compare with source signal and time align - index = dst.push(history) - - await ctx.send_yield(index) - - # setup a respawn handle - with trio.CancelScope() as cs: - task_status.started(cs) - - # rt stream - async for processed in out_stream: - log.debug(f"{fsp_func_name}: {processed}") - index = src.index - dst.array[-1][fsp_func_name] = processed - - # stream latest shm array index entry - await ctx.send_yield(index) - last_len = new_len = len(src.array) + fsp_target = partial( + fsp_compute, + ctx=ctx, + symbol=symbol, + feed=feed, + + src=src, + dst=dst, + + fsp_func_name=fsp_func_name, + func=func + ) + async with trio.open_nursery() as n: - cs = await n.start(fsp_compute) + cs = await n.start(fsp_target) # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. @@ -176,7 +210,7 @@ async def cascade( # respawn the signal compute task if the source # signal has been updated cs.cancel() - cs = await n.start(fsp_compute) + cs = await n.start(fsp_target) # TODO: adopt an incremental update engine/approach # where possible here eventually! From 319eacd8c1c8ae774359c914b2645c8a5c41a97c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 24 May 2021 10:09:54 -0400 Subject: [PATCH 15/16] Use compact async with tuple syntax from py3.9 --- piker/ui/order_mode.py | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index a3f01e9f..07f2b281 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -319,14 +319,10 @@ async def start_order_mode( ) -> None: # spawn EMS actor-service - async with open_ems( - brokername, - symbol, - ) as (book, trades_stream), open_order_mode( - symbol, - chart, - book, - ) as order_mode: + async with ( + open_ems(brokername, symbol) as (book, trades_stream), + open_order_mode(symbol, chart, book) as order_mode + ): def get_index(time: float): From f51e12819a056561ad0f3ddc05b312884acdf777 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 24 May 2021 12:09:03 -0400 Subject: [PATCH 16/16] Attach to order client *after* feed connection to speed up the startup time --- piker/clearing/_ems.py | 60 ++++++++++++++++++++++-------------------- 1 file changed, 31 insertions(+), 29 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 87d41bd3..dbb0ff51 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -242,19 +242,20 @@ async def execute_triggers( async def exec_loop( + ctx: tractor.Context, feed: 'Feed', # noqa broker: str, symbol: str, _exec_mode: str, task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, + ) -> AsyncIterator[dict]: """Main scan loop for order execution conditions and submission to brokers. """ - - # TODO: get initial price quote from target broker + # XXX: this should be initial price quote from target provider first_quote = await feed.receive() book = get_dark_book(broker) @@ -351,6 +352,7 @@ async def process_broker_trades( # TODO: make this a context # in the paper engine case this is just a mem receive channel async with feed.receive_trades_data() as trades_stream: + first = await trades_stream.__anext__() # startup msg expected as first from broker backend @@ -651,35 +653,18 @@ async def _emsd_main( dark_book = get_dark_book(broker) - # get a portal back to the client - async with tractor.wait_for_actor(client_actor_name) as portal: + # spawn one task per broker feed + async with trio.open_nursery() as n: - # spawn one task per broker feed - async with trio.open_nursery() as n: + # TODO: eventually support N-brokers + async with data.open_feed( + broker, + [symbol], + loglevel='info', + ) as feed: - # TODO: eventually support N-brokers - async with data.open_feed( - broker, - [symbol], - loglevel='info', - ) as feed: - - # start the condition scan loop - quote, feed, client = await n.start( - exec_loop, - ctx, - feed, - broker, - symbol, - _mode, - ) - - await n.start( - process_broker_trades, - ctx, - feed, - dark_book, - ) + # get a portal back to the client + async with tractor.wait_for_actor(client_actor_name) as portal: # connect back to the calling actor (the one that is # acting as an EMS client and will submit orders) to @@ -690,6 +675,23 @@ async def _emsd_main( symbol_key=symbol, ) as order_stream: + # start the condition scan loop + quote, feed, client = await n.start( + exec_loop, + ctx, + feed, + broker, + symbol, + _mode, + ) + + await n.start( + process_broker_trades, + ctx, + feed, + dark_book, + ) + # start inbound order request processing await process_order_cmds( ctx,