diff --git a/piker/__init__.py b/piker/__init__.py index 75ec8ded..c84bfac2 100644 --- a/piker/__init__.py +++ b/piker/__init__.py @@ -19,6 +19,8 @@ piker: trading gear for hackers. """ import msgpack # noqa + +# TODO: remove this now right? import msgpack_numpy # patch msgpack for numpy arrays diff --git a/piker/_daemon.py b/piker/_daemon.py index a894f1a5..30fdfec2 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -19,8 +19,9 @@ Structured, daemon tree service management. """ from functools import partial -from typing import Optional, Union -from contextlib import asynccontextmanager +from typing import Optional, Union, Callable, Any +from contextlib import asynccontextmanager, AsyncExitStack +from collections import defaultdict from pydantic import BaseModel import trio @@ -33,6 +34,10 @@ from .brokers import get_brokermod log = get_logger(__name__) _root_dname = 'pikerd' +_tractor_kwargs: dict[str, Any] = { + # use a different registry addr then tractor's default + 'arbiter_addr': ('127.0.0.1', 6116), +} _root_modules = [ __name__, 'piker.clearing._ems', @@ -44,10 +49,34 @@ class Services(BaseModel): actor_n: tractor._trionics.ActorNursery service_n: trio.Nursery debug_mode: bool # tractor sub-actor debug mode flag + ctx_stack: AsyncExitStack class Config: arbitrary_types_allowed = True + async def open_remote_ctx( + self, + portal: tractor.Portal, + target: Callable, + **kwargs, + + ) -> tractor.Context: + ''' + Open a context in a service sub-actor, add to a stack + that gets unwound at ``pikerd`` tearodwn. + + This allows for allocating long-running sub-services in our main + daemon and explicitly controlling their lifetimes. + + ''' + ctx, first = await self.ctx_stack.enter_async_context( + portal.open_context( + target, + **kwargs, + ) + ) + return ctx + _services: Optional[Services] = None @@ -62,20 +91,21 @@ async def open_pikerd( debug_mode: bool = False, ) -> Optional[tractor._portal.Portal]: - """ + ''' Start a root piker daemon who's lifetime extends indefinitely until cancelled. A root actor nursery is created which can be used to create and keep alive underling services (see below). - """ + ''' global _services assert _services is None # XXX: this may open a root actor as well async with tractor.open_root_actor( # passed through to ``open_root_actor`` + arbiter_addr=_tractor_kwargs['arbiter_addr'], name=_root_dname, loglevel=loglevel, debug_mode=debug_mode, @@ -90,14 +120,18 @@ async def open_pikerd( ) as _, tractor.open_nursery() as actor_nursery: async with trio.open_nursery() as service_nursery: - # assign globally for future daemon/task creation - _services = Services( - actor_n=actor_nursery, - service_n=service_nursery, - debug_mode=debug_mode, - ) + # setup service mngr singleton instance + async with AsyncExitStack() as stack: - yield _services + # assign globally for future daemon/task creation + _services = Services( + actor_n=actor_nursery, + service_n=service_nursery, + debug_mode=debug_mode, + ctx_stack=stack, + ) + + yield _services @asynccontextmanager @@ -140,6 +174,7 @@ async def maybe_open_pikerd( # presume pikerd role async with open_pikerd( loglevel=loglevel, + debug_mode=kwargs.get('debug_mode', False), ) as _: # in the case where we're starting up the # tractor-piker runtime stack in **this** process @@ -193,17 +228,19 @@ async def spawn_brokerd( # call with and then have the ability to unwind the call whenevs. # non-blocking setup of brokerd service nursery - _services.service_n.start_soon( - partial( - portal.run, - _setup_persistent_brokerd, - brokername=brokername, - ) + await _services.open_remote_ctx( + portal, + _setup_persistent_brokerd, + brokername=brokername, ) return dname +class Brokerd: + locks = defaultdict(trio.Lock) + + @asynccontextmanager async def maybe_spawn_brokerd( @@ -222,9 +259,15 @@ async def maybe_spawn_brokerd( dname = f'brokerd.{brokername}' + # serialize access to this section to avoid + # 2 or more tasks racing to create a daemon + lock = Brokerd.locks[brokername] + await lock.acquire() + # attach to existing brokerd if possible async with tractor.find_actor(dname) as portal: if portal is not None: + lock.release() yield portal return @@ -249,6 +292,7 @@ async def maybe_spawn_brokerd( ) async with tractor.wait_for_actor(dname) as portal: + lock.release() yield portal diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 824b3710..8f83c960 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -341,6 +341,10 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: class AutoReconWs: """Make ``trio_websocketw` sockets stay up no matter the bs. + TODO: + apply any more insights from this: + https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds + """ recon_errors = ( ConnectionClosed, diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 73ca9ee1..dbb0ff51 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -179,31 +179,40 @@ async def execute_triggers( # majority of iterations will be non-matches continue - # submit_price = price + price*percent_away - submit_price = price + abs_diff_away + action = cmd['action'] - log.info( - f'Dark order triggered for price {price}\n' - f'Submitting order @ price {submit_price}') + if action != 'alert': + # executable order submission - reqid = await client.submit_limit( - oid=oid, + # submit_price = price + price*percent_away + submit_price = price + abs_diff_away - # this is a brand new order request for the - # underlying broker so we set out "broker request - # id" (brid) as nothing so that the broker - # client knows that we aren't trying to modify - # an existing order. - brid=None, + log.info( + f'Dark order triggered for price {price}\n' + f'Submitting order @ price {submit_price}') - symbol=sym, - action=cmd['action'], - price=submit_price, - size=cmd['size'], - ) + reqid = await client.submit_limit( + oid=oid, - # register broker request id to ems id - book._broker2ems_ids[reqid] = oid + # this is a brand new order request for the + # underlying broker so we set out "broker request + # id" (brid) as nothing so that the broker + # client knows that we aren't trying to modify + # an existing order. + brid=None, + + symbol=sym, + action=cmd['action'], + price=submit_price, + size=cmd['size'], + ) + + # register broker request id to ems id + book._broker2ems_ids[reqid] = oid + + else: + # alerts have no broker request id + reqid = '' resp = { 'resp': 'dark_executed', @@ -233,19 +242,20 @@ async def execute_triggers( async def exec_loop( + ctx: tractor.Context, feed: 'Feed', # noqa broker: str, symbol: str, _exec_mode: str, task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, + ) -> AsyncIterator[dict]: """Main scan loop for order execution conditions and submission to brokers. """ - - # TODO: get initial price quote from target broker + # XXX: this should be initial price quote from target provider first_quote = await feed.receive() book = get_dark_book(broker) @@ -342,6 +352,7 @@ async def process_broker_trades( # TODO: make this a context # in the paper engine case this is just a mem receive channel async with feed.receive_trades_data() as trades_stream: + first = await trades_stream.__anext__() # startup msg expected as first from broker backend @@ -642,35 +653,18 @@ async def _emsd_main( dark_book = get_dark_book(broker) - # get a portal back to the client - async with tractor.wait_for_actor(client_actor_name) as portal: + # spawn one task per broker feed + async with trio.open_nursery() as n: - # spawn one task per broker feed - async with trio.open_nursery() as n: + # TODO: eventually support N-brokers + async with data.open_feed( + broker, + [symbol], + loglevel='info', + ) as feed: - # TODO: eventually support N-brokers - async with data.open_feed( - broker, - [symbol], - loglevel='info', - ) as feed: - - # start the condition scan loop - quote, feed, client = await n.start( - exec_loop, - ctx, - feed, - broker, - symbol, - _mode, - ) - - await n.start( - process_broker_trades, - ctx, - feed, - dark_book, - ) + # get a portal back to the client + async with tractor.wait_for_actor(client_actor_name) as portal: # connect back to the calling actor (the one that is # acting as an EMS client and will submit orders) to @@ -681,6 +675,23 @@ async def _emsd_main( symbol_key=symbol, ) as order_stream: + # start the condition scan loop + quote, feed, client = await n.start( + exec_loop, + ctx, + feed, + broker, + symbol, + _mode, + ) + + await n.start( + process_broker_trades, + ctx, + feed, + dark_book, + ) + # start inbound order request processing await process_order_cmds( ctx, diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 7dab2a28..2ca5a1a6 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -37,7 +37,7 @@ _context_defaults = dict( def pikerd(loglevel, host, tl, pdb): """Spawn the piker broker-daemon. """ - from .._daemon import _data_mods, open_pikerd + from .._daemon import open_pikerd log = get_console_log(loglevel) if pdb: @@ -112,11 +112,11 @@ def services(config, tl, names): def _load_clis() -> None: - from ..data import marketstore as _ - from ..data import cli as _ - from ..brokers import cli as _ # noqa - from ..ui import cli as _ # noqa - from ..watchlists import cli as _ # noqa + from ..data import marketstore # noqa + from ..data import cli # noqa + from ..brokers import cli # noqa + from ..ui import cli # noqa + from ..watchlists import cli # noqa # load downstream cli modules diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 40951697..2079eb71 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -227,7 +227,7 @@ async def sample_and_broadcast( # end up triggering backpressure which which will # eventually block this producer end of the feed and # thus other consumers still attached. - subs = bus.subscribers[sym] + subs = bus._subscribers[sym] for ctx in subs: # print(f'sub is {ctx.chan.uid}') try: @@ -236,5 +236,8 @@ async def sample_and_broadcast( trio.BrokenResourceError, trio.ClosedResourceError ): - subs.remove(ctx) + # XXX: do we need to deregister here + # if it's done in the fee bus code? + # so far seems like no since this should all + # be single-threaded. log.error(f'{ctx.chan.uid} dropped connection') diff --git a/piker/data/feed.py b/piker/data/feed.py index 9455f7ae..76c2bc23 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -67,11 +67,19 @@ class _FeedsBus(BaseModel): brokername: str nursery: trio.Nursery feeds: Dict[str, trio.CancelScope] = {} - subscribers: Dict[str, List[tractor.Context]] = {} + task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() + # XXX: so weird but, apparently without this being `._` private + # pydantic will complain about private `tractor.Context` instance + # vars (namely `._portal` and `._cancel_scope`) at import time. + # Reported this bug: + # https://github.com/samuelcolvin/pydantic/issues/2816 + _subscribers: Dict[str, List[tractor.Context]] = {} + class Config: arbitrary_types_allowed = True + underscore_attrs_are_private = False async def cancel_all(self) -> None: for sym, (cs, msg, quote) in self.feeds.items(): @@ -108,7 +116,11 @@ def get_feed_bus( return _bus -async def _setup_persistent_brokerd(brokername: str) -> None: +@tractor.context +async def _setup_persistent_brokerd( + ctx: tractor.Context, + brokername: str +) -> None: """Allocate a actor-wide service nursery in ``brokerd`` such that feeds can be run in the background persistently by the broker backend as needed. @@ -121,6 +133,9 @@ async def _setup_persistent_brokerd(brokername: str) -> None: # background tasks from clients bus = get_feed_bus(brokername, service_nursery) + # unblock caller + await ctx.started() + # we pin this task to keep the feeds manager active until the # parent actor decides to tear it down await trio.sleep_forever() @@ -224,7 +239,7 @@ async def attach_feed_bus( brokername: str, symbol: str, loglevel: str, -): +) -> None: # try: if loglevel is None: @@ -256,7 +271,7 @@ async def attach_feed_bus( loglevel=loglevel, ) ) - bus.subscribers.setdefault(symbol, []).append(ctx) + bus._subscribers.setdefault(symbol, []).append(ctx) else: sub_only = True @@ -266,15 +281,17 @@ async def attach_feed_bus( # send this even to subscribers to existing feed? await ctx.send_yield(init_msg) + + # deliver a first quote asap await ctx.send_yield(first_quote) if sub_only: - bus.subscribers[symbol].append(ctx) + bus._subscribers[symbol].append(ctx) try: await trio.sleep_forever() finally: - bus.subscribers[symbol].remove(ctx) + bus._subscribers[symbol].remove(ctx) @dataclass diff --git a/piker/fsp/__init__.py b/piker/fsp/__init__.py index 2345b516..312a0cef 100644 --- a/piker/fsp/__init__.py +++ b/piker/fsp/__init__.py @@ -17,6 +17,7 @@ """ Financial signal processing for the peeps. """ +from functools import partial from typing import AsyncIterator, Callable, Tuple import trio @@ -29,6 +30,8 @@ from .. import data from ._momo import _rsi, _wma from ._volume import _tina_vwap from ..data import attach_shm_array +from ..data.feed import Feed +from ..data._sharedmem import ShmArray log = get_logger(__name__) @@ -62,6 +65,97 @@ async def latency( yield value +async def fsp_compute( + ctx: tractor.Context, + symbol: str, + feed: Feed, + + src: ShmArray, + dst: ShmArray, + + fsp_func_name: str, + func: Callable, + + task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, + +) -> None: + + # TODO: load appropriate fsp with input args + + async def filter_by_sym( + sym: str, + stream, + ): + + # TODO: make this the actualy first quote from feed + # XXX: this allows for a single iteration to run for history + # processing without waiting on the real-time feed for a new quote + yield {} + + # task cancellation won't kill the channel + with stream.shield(): + async for quotes in stream: + for symbol, quotes in quotes.items(): + if symbol == sym: + yield quotes + + out_stream = func( + filter_by_sym(symbol, feed.stream), + feed.shm, + ) + + # TODO: XXX: + # THERE'S A BIG BUG HERE WITH THE `index` field since we're + # prepending a copy of the first value a few times to make + # sub-curves align with the parent bar chart. + # This likely needs to be fixed either by, + # - manually assigning the index and historical data + # seperately to the shm array (i.e. not using .push()) + # - developing some system on top of the shared mem array that + # is `index` aware such that historical data can be indexed + # relative to the true first datum? Not sure if this is sane + # for incremental compuations. + dst._first.value = src._first.value + dst._last.value = src._first.value + + # Conduct a single iteration of fsp with historical bars input + # and get historical output + history_output = await out_stream.__anext__() + + # build a struct array which includes an 'index' field to push + # as history + history = np.array( + np.arange(len(history_output)), + dtype=dst.array.dtype + ) + history[fsp_func_name] = history_output + + # check for data length mis-allignment and fill missing values + diff = len(src.array) - len(history) + if diff >= 0: + print(f"WTF DIFF SIGNAL to HISTORY {diff}") + for _ in range(diff): + dst.push(history[:1]) + + # compare with source signal and time align + index = dst.push(history) + + await ctx.send_yield(index) + + # setup a respawn handle + with trio.CancelScope() as cs: + task_status.started(cs) + + # rt stream + async for processed in out_stream: + log.debug(f"{fsp_func_name}: {processed}") + index = src.index + dst.array[-1][fsp_func_name] = processed + + # stream latest shm array index entry + await ctx.send_yield(index) + + @tractor.stream async def cascade( ctx: tractor.Context, @@ -85,84 +179,24 @@ async def cascade( assert src.token == feed.shm.token - async def fsp_compute( - task_status: TaskStatus[None] = trio.TASK_STATUS_IGNORED, - ) -> None: - - # TODO: load appropriate fsp with input args - - async def filter_by_sym( - sym: str, - stream, - ): - # task cancellation won't kill the channel - with stream.shield(): - async for quotes in stream: - for symbol, quotes in quotes.items(): - if symbol == sym: - yield quotes - - out_stream = func( - filter_by_sym(symbol, feed.stream), - feed.shm, - ) - - # TODO: XXX: - # THERE'S A BIG BUG HERE WITH THE `index` field since we're - # prepending a copy of the first value a few times to make - # sub-curves align with the parent bar chart. - # This likely needs to be fixed either by, - # - manually assigning the index and historical data - # seperately to the shm array (i.e. not using .push()) - # - developing some system on top of the shared mem array that - # is `index` aware such that historical data can be indexed - # relative to the true first datum? Not sure if this is sane - # for incremental compuations. - dst._first.value = src._first.value - dst._last.value = src._first.value - - # Conduct a single iteration of fsp with historical bars input - # and get historical output - history_output = await out_stream.__anext__() - - # build a struct array which includes an 'index' field to push - # as history - history = np.array( - np.arange(len(history_output)), - dtype=dst.array.dtype - ) - history[fsp_func_name] = history_output - - # check for data length mis-allignment and fill missing values - diff = len(src.array) - len(history) - if diff >= 0: - print(f"WTF DIFF SIGNAL to HISTORY {diff}") - for _ in range(diff): - dst.push(history[:1]) - - # compare with source signal and time align - index = dst.push(history) - - await ctx.send_yield(index) - - # setup a respawn handle - with trio.CancelScope() as cs: - task_status.started(cs) - - # rt stream - async for processed in out_stream: - log.debug(f"{fsp_func_name}: {processed}") - index = src.index - dst.array[-1][fsp_func_name] = processed - - # stream latest shm array index entry - await ctx.send_yield(index) - last_len = new_len = len(src.array) + fsp_target = partial( + fsp_compute, + ctx=ctx, + symbol=symbol, + feed=feed, + + src=src, + dst=dst, + + fsp_func_name=fsp_func_name, + func=func + ) + async with trio.open_nursery() as n: - cs = await n.start(fsp_compute) + cs = await n.start(fsp_target) # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. @@ -176,7 +210,7 @@ async def cascade( # respawn the signal compute task if the source # signal has been updated cs.cancel() - cs = await n.start(fsp_compute) + cs = await n.start(fsp_target) # TODO: adopt an incremental update engine/approach # where possible here eventually! diff --git a/piker/ui/_exec.py b/piker/ui/_exec.py index 8a48943d..53de8554 100644 --- a/piker/ui/_exec.py +++ b/piker/ui/_exec.py @@ -42,7 +42,7 @@ import trio import tractor from outcome import Error -from .._daemon import maybe_open_pikerd +from .._daemon import maybe_open_pikerd, _tractor_kwargs from ..log import get_logger from ._pg_overrides import _do_overrides @@ -196,6 +196,10 @@ def run_qtractor( 'main': instance, } + + # override tractor's defaults + tractor_kwargs.update(_tractor_kwargs) + # define tractor entrypoint async def main(): diff --git a/piker/ui/_style.py b/piker/ui/_style.py index 7284a9d1..322247ec 100644 --- a/piker/ui/_style.py +++ b/piker/ui/_style.py @@ -79,7 +79,7 @@ class DpiAwareFont: return self._qfont @property - def px_size(self): + def px_size(self) -> int: return self._qfont.pixelSize() def configure_to_dpi(self, screen: Optional[QtGui.QScreen] = None): diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 29efcd0f..07f2b281 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -319,14 +319,10 @@ async def start_order_mode( ) -> None: # spawn EMS actor-service - async with open_ems( - brokername, - symbol, - ) as (book, trades_stream), open_order_mode( - symbol, - chart, - book, - ) as order_mode: + async with ( + open_ems(brokername, symbol) as (book, trades_stream), + open_order_mode(symbol, chart, book) as order_mode + ): def get_index(time: float): @@ -337,7 +333,7 @@ async def start_order_mode( indexes = ohlc['time'] >= time if any(indexes): - return ohlc['index'][indexes[-1]] + return ohlc['index'][indexes][-1] else: return ohlc['index'][-1] diff --git a/piker/ui/quantdom/charts.py b/piker/ui/quantdom/charts.py index e2da8586..d35f0742 100644 --- a/piker/ui/quantdom/charts.py +++ b/piker/ui/quantdom/charts.py @@ -5,41 +5,7 @@ import pyqtgraph as pg from PyQt5 import QtCore, QtGui -class SampleLegendItem(pg.graphicsItems.LegendItem.ItemSample): - - def paint(self, p, *args): - p.setRenderHint(p.Antialiasing) - if isinstance(self.item, tuple): - positive = self.item[0].opts - negative = self.item[1].opts - p.setPen(pg.mkPen(positive['pen'])) - p.setBrush(pg.mkBrush(positive['brush'])) - p.drawPolygon( - QtGui.QPolygonF( - [ - QtCore.QPointF(0, 0), - QtCore.QPointF(18, 0), - QtCore.QPointF(18, 18), - ] - ) - ) - p.setPen(pg.mkPen(negative['pen'])) - p.setBrush(pg.mkBrush(negative['brush'])) - p.drawPolygon( - QtGui.QPolygonF( - [ - QtCore.QPointF(0, 0), - QtCore.QPointF(0, 18), - QtCore.QPointF(18, 18), - ] - ) - ) - else: - opts = self.item.opts - p.setPen(pg.mkPen(opts['pen'])) - p.drawRect(0, 10, 18, 0.5) - - +# TODO: test this out as our help menu class CenteredTextItem(QtGui.QGraphicsTextItem): def __init__( self,