diff --git a/.gitignore b/.gitignore index bdbd200a..70826d07 100644 --- a/.gitignore +++ b/.gitignore @@ -97,6 +97,9 @@ ENV/ # mkdocs documentation /site +# extra scripts dir +/snippets + # mypy .mypy_cache/ .vscode/settings.json diff --git a/piker/__init__.py b/piker/__init__.py index c84bfac2..a6437f88 100644 --- a/piker/__init__.py +++ b/piker/__init__.py @@ -18,10 +18,3 @@ piker: trading gear for hackers. """ -import msgpack # noqa - -# TODO: remove this now right? -import msgpack_numpy - -# patch msgpack for numpy arrays -msgpack_numpy.patch() diff --git a/piker/_daemon.py b/piker/_daemon.py index 77462d35..b4eed03d 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -34,9 +34,11 @@ from .brokers import get_brokermod log = get_logger(__name__) _root_dname = 'pikerd' + +_registry_addr = ('127.0.0.1', 6116) _tractor_kwargs: dict[str, Any] = { # use a different registry addr then tractor's default - 'arbiter_addr': ('127.0.0.1', 6116), + 'arbiter_addr': _registry_addr } _root_modules = [ __name__, @@ -78,7 +80,6 @@ class Services(BaseModel): ) -> Any: with trio.CancelScope() as cs: - async with portal.open_context( target, **kwargs, @@ -87,19 +88,17 @@ class Services(BaseModel): # unblock once the remote context has started task_status.started((cs, first)) - + log.info( + f'`pikerd` service {name} started with value {first}' + ) # wait on any context's return value ctx_res = await ctx.result() - log.info( - f'`pikerd` service {name} started with value {ctx_res}' - ) # wait on any error from the sub-actor # NOTE: this will block indefinitely until cancelled - # either by error from the target context function or - # by being cancelled here by the surroundingn cancel - # scope - return await (portal.result(), ctx_res) + # either by error from the target context function or by + # being cancelled here by the surrounding cancel scope + return (await portal.result(), ctx_res) cs, first = await self.service_n.start(open_context_in_task) @@ -109,16 +108,16 @@ class Services(BaseModel): return cs, first - async def cancel_service( - self, - name: str, - - ) -> Any: - - log.info(f'Cancelling `pikerd` service {name}') - cs, portal = self.service_tasks[name] - cs.cancel() - return await portal.cancel_actor() + # TODO: per service cancellation by scope, we aren't using this + # anywhere right? + # async def cancel_service( + # self, + # name: str, + # ) -> Any: + # log.info(f'Cancelling `pikerd` service {name}') + # cs, portal = self.service_tasks[name] + # cs.cancel() + # return await portal.cancel_actor() _services: Optional[Services] = None @@ -150,7 +149,7 @@ async def open_pikerd( tractor.open_root_actor( # passed through to ``open_root_actor`` - arbiter_addr=_tractor_kwargs['arbiter_addr'], + arbiter_addr=_registry_addr, name=_root_dname, loglevel=loglevel, debug_mode=debug_mode, @@ -179,6 +178,47 @@ async def open_pikerd( yield _services +@asynccontextmanager +async def open_piker_runtime( + name: str, + enable_modules: list[str] = [], + start_method: str = 'trio', + loglevel: Optional[str] = None, + + # XXX: you should pretty much never want debug mode + # for data daemons when running in production. + debug_mode: bool = False, + +) -> Optional[tractor._portal.Portal]: + ''' + Start a piker actor who's runtime will automatically + sync with existing piker actors in local network + based on configuration. + + ''' + global _services + assert _services is None + + # XXX: this may open a root actor as well + async with ( + tractor.open_root_actor( + + # passed through to ``open_root_actor`` + arbiter_addr=_registry_addr, + name=name, + loglevel=loglevel, + debug_mode=debug_mode, + start_method=start_method, + + # TODO: eventually we should be able to avoid + # having the root have more then permissions to + # spawn other specialized daemons I think? + enable_modules=_root_modules, + ) as _, + ): + yield tractor.current_actor() + + @asynccontextmanager async def maybe_open_runtime( loglevel: Optional[str] = None, @@ -283,13 +323,20 @@ async def maybe_spawn_daemon( lock = Brokerd.locks[service_name] await lock.acquire() + log.info(f'Scanning for existing {service_name}') # attach to existing daemon by name if possible - async with tractor.find_actor(service_name) as portal: + async with tractor.find_actor( + service_name, + arbiter_sockaddr=_registry_addr, + + ) as portal: if portal is not None: lock.release() yield portal return + log.warning(f"Couldn't find any existing {service_name}") + # ask root ``pikerd`` daemon to spawn the daemon we need if # pikerd is not live we now become the root of the # process tree @@ -447,3 +494,25 @@ async def maybe_open_emsd( ) as portal: yield portal + + +# TODO: ideally we can start the tsdb "on demand" but it's +# probably going to require "rootless" docker, at least if we don't +# want to expect the user to start ``pikerd`` with root perms all the +# time. +# async def maybe_open_marketstored( +# loglevel: Optional[str] = None, +# **kwargs, + +# ) -> tractor._portal.Portal: # noqa + +# async with maybe_spawn_daemon( + +# 'marketstored', +# service_task_target=spawn_emsd, +# spawn_args={'loglevel': loglevel}, +# loglevel=loglevel, +# **kwargs, + +# ) as portal: +# yield portal diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index 4d82474b..f4732e54 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -386,7 +386,6 @@ async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: List[str], - shm: ShmArray, feed_is_live: trio.Event, loglevel: str = None, diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 5cf7d2f0..9f1bd49b 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -517,11 +517,11 @@ class Client: contract, ticker, details = await self.get_sym_details(symbol) # ensure a last price gets filled in before we deliver quote - for _ in range(2): + for _ in range(1): if isnan(ticker.last): + await asyncio.sleep(0.1) log.warning(f'Quote for {symbol} timed out: market is closed?') ticker = await ticker.updateEvent - await asyncio.sleep(0.1) else: log.info(f'Got first quote for {symbol}') break @@ -1201,12 +1201,13 @@ async def backfill_bars( task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: - """Fill historical bars into shared mem / storage afap. + ''' + Fill historical bars into shared mem / storage afap. TODO: avoid pacing constraints: https://github.com/pikers/piker/issues/128 - """ + ''' if platform.system() == 'Windows': log.warning( 'Decreasing history query count to 4 since, windows...') @@ -1411,7 +1412,6 @@ async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: list[str], - shm: ShmArray, feed_is_live: trio.Event, loglevel: str = None, diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 24d2dab3..0d899428 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -406,7 +406,6 @@ async def stream_quotes( send_chan: trio.abc.SendChannel, symbols: List[str], - shm: ShmArray, feed_is_live: trio.Event, loglevel: str = None, diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 97b6ebc3..7eb7b5d1 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -109,13 +109,11 @@ def services(config, tl, names): ) as portal: registry = await portal.run_from_ns('self', 'get_registry') json_d = {} - for uid, socket in registry.items(): - name, uuid = uid + for key, socket in registry.items(): + # name, uuid = uid host, port = socket - json_d[f'{name}.{uuid}'] = f'{host}:{port}' - click.echo( - f"Available `piker` services:\n{colorize_json(json_d)}" - ) + json_d[key] = f'{host}:{port}' + click.echo(f"{colorize_json(json_d)}") tractor.run( list_services, diff --git a/piker/data/_sampling.py b/piker/data/_sampling.py index 669f624e..ecad241d 100644 --- a/piker/data/_sampling.py +++ b/piker/data/_sampling.py @@ -15,10 +15,11 @@ # along with this program. If not, see . """ -Data buffers for fast shared humpy. +Sampling and broadcast machinery for (soft) real-time delivery of +financial data flows. + """ import time -from typing import Dict, List import tractor import trio @@ -31,24 +32,35 @@ from ..log import get_logger log = get_logger(__name__) -# TODO: we could stick these in a composed type to avoid -# angering the "i hate module scoped variables crowd" (yawn). -_shms: Dict[int, List[ShmArray]] = {} -_start_increment: Dict[str, trio.Event] = {} -_incrementers: Dict[int, trio.CancelScope] = {} -_subscribers: Dict[str, tractor.Context] = {} +class sampler: + ''' + Global sampling engine registry. + Manages state for sampling events, shm incrementing and + sample period logic. -def shm_incrementing(shm_token_name: str) -> trio.Event: - global _start_increment - return _start_increment.setdefault(shm_token_name, trio.Event()) + ''' + # TODO: we could stick these in a composed type to avoid + # angering the "i hate module scoped variables crowd" (yawn). + ohlcv_shms: dict[int, list[ShmArray]] = {} + + # holds one-task-per-sample-period tasks which are spawned as-needed by + # data feed requests with a given detected time step usually from + # history loading. + incrementers: dict[int, trio.CancelScope] = {} + + # holds all the ``tractor.Context`` remote subscriptions for + # a particular sample period increment event: all subscribers are + # notified on a step. + subscribers: dict[int, tractor.Context] = {} async def increment_ohlc_buffer( delay_s: int, task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ): - """Task which inserts new bars into the provide shared memory array + ''' + Task which inserts new bars into the provide shared memory array every ``delay_s`` seconds. This task fulfills 2 purposes: @@ -59,8 +71,8 @@ async def increment_ohlc_buffer( Note that if **no** actor has initiated this task then **none** of the underlying buffers will actually be incremented. - """ + ''' # # wait for brokerd to signal we should start sampling # await shm_incrementing(shm_token['shm_name']).wait() @@ -69,19 +81,17 @@ async def increment_ohlc_buffer( # to solve this is to make this task aware of the instrument's # tradable hours? - global _incrementers - # adjust delay to compensate for trio processing time - ad = min(_shms.keys()) - 0.001 + ad = min(sampler.ohlcv_shms.keys()) - 0.001 total_s = 0 # total seconds counted - lowest = min(_shms.keys()) + lowest = min(sampler.ohlcv_shms.keys()) ad = lowest - 0.001 with trio.CancelScope() as cs: # register this time period step as active - _incrementers[delay_s] = cs + sampler.incrementers[delay_s] = cs task_status.started(cs) while True: @@ -91,8 +101,10 @@ async def increment_ohlc_buffer( total_s += lowest # increment all subscribed shm arrays - # TODO: this in ``numba`` - for delay_s, shms in _shms.items(): + # TODO: + # - this in ``numba`` + # - just lookup shms for this step instead of iterating? + for delay_s, shms in sampler.ohlcv_shms.items(): if total_s % delay_s != 0: continue @@ -117,18 +129,19 @@ async def increment_ohlc_buffer( # write to the buffer shm.push(last) - # broadcast the buffer index step - subs = _subscribers.get(delay_s, ()) + # broadcast the buffer index step to any subscribers for + # a given sample period. + subs = sampler.subscribers.get(delay_s, ()) - for ctx in subs: - try: - await ctx.send_yield({'index': shm._last.value}) - except ( - trio.BrokenResourceError, - trio.ClosedResourceError - ): - log.error(f'{ctx.chan.uid} dropped connection') - subs.remove(ctx) + for ctx in subs: + try: + await ctx.send_yield({'index': shm._last.value}) + except ( + trio.BrokenResourceError, + trio.ClosedResourceError + ): + log.error(f'{ctx.chan.uid} dropped connection') + subs.remove(ctx) @tractor.stream @@ -137,15 +150,14 @@ async def iter_ohlc_periods( delay_s: int, ) -> None: - """ + ''' Subscribe to OHLC sampling "step" events: when the time aggregation period increments, this event stream emits an index event. - """ + ''' # add our subscription - global _subscribers - subs = _subscribers.setdefault(delay_s, []) + subs = sampler.subscribers.setdefault(delay_s, []) subs.append(ctx) try: @@ -290,7 +302,10 @@ async def sample_and_broadcast( # so far seems like no since this should all # be single-threaded. Doing it anyway though # since there seems to be some kinda race.. - subs.remove((stream, tick_throttle)) + try: + subs.remove((stream, tick_throttle)) + except ValueError: + log.error(f'{stream} was already removed from subs!?') # TODO: a less naive throttler, here's some snippets: @@ -303,6 +318,8 @@ async def uniform_rate_send( quote_stream: trio.abc.ReceiveChannel, stream: tractor.MsgStream, + task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + ) -> None: # TODO: compute the approx overhead latency per cycle @@ -313,6 +330,8 @@ async def uniform_rate_send( last_send = time.time() diff = 0 + task_status.started() + while True: # compute the remaining time to sleep for this throttled cycle diff --git a/piker/data/_source.py b/piker/data/_source.py index 9b9b323d..dfa48453 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -17,7 +17,7 @@ """ numpy data source coversion helpers. """ -from typing import Dict, Any, List +from typing import Any import decimal import numpy as np @@ -59,6 +59,19 @@ tf_in_1m = { } +def mk_fqsn( + provider: str, + symbol: str, + +) -> str: + ''' + Generate a "fully qualified symbol name" which is + a reverse-hierarchical cross broker/provider symbol + + ''' + return '.'.join([symbol, provider]).lower() + + def float_digits( value: float, ) -> int: @@ -90,13 +103,13 @@ class Symbol(BaseModel): lot_tick_size: float # "volume" precision as min step value tick_size_digits: int lot_size_digits: int - broker_info: Dict[str, Dict[str, Any]] = {} + broker_info: dict[str, dict[str, Any]] = {} # specifies a "class" of financial instrument # ex. stock, futer, option, bond etc. @property - def brokers(self) -> List[str]: + def brokers(self) -> list[str]: return list(self.broker_info.keys()) def nearest_tick(self, value: float) -> float: @@ -118,6 +131,12 @@ class Symbol(BaseModel): self.key, ) + def iterfqsns(self) -> list[str]: + return [ + mk_fqsn(self.key, broker) + for broker in self.broker_info.keys() + ] + @validate_arguments def mk_symbol( @@ -129,7 +148,8 @@ def mk_symbol( broker_info: dict[str, Any] = {}, ) -> Symbol: - '''Create and return an instrument description for the + ''' + Create and return an instrument description for the "symbol" named as ``key``. ''' diff --git a/piker/data/feed.py b/piker/data/feed.py index 55f8b9b9..e2e91d7b 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Tyler Goodlet (in stewardship for piker0) +# Copyright (C) Tyler Goodlet (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -25,8 +25,9 @@ from contextlib import asynccontextmanager from functools import partial from types import ModuleType from typing import ( - Any, Sequence, + Any, AsyncIterator, Optional, + Awaitable, ) import trio @@ -47,11 +48,15 @@ from ._sharedmem import ( ShmArray, ) from .ingest import get_ingestormod -from ._source import base_iohlc_dtype, mk_symbol, Symbol +from ._source import ( + base_iohlc_dtype, + mk_symbol, + Symbol, + mk_fqsn, +) from ..ui import _search from ._sampling import ( - _shms, - _incrementers, + sampler, increment_ohlc_buffer, iter_ohlc_periods, sample_and_broadcast, @@ -67,12 +72,24 @@ class _FeedsBus(BaseModel): Data feeds broadcaster and persistence management. This is a brokerd side api used to manager persistent real-time - streams that can be allocated and left alive indefinitely. + streams that can be allocated and left alive indefinitely. A bus is + associated one-to-one with a particular broker backend where the + "bus" refers so a multi-symbol bus where quotes are interleaved in + time. + + Each "entry" in the bus includes: + - a stream used to push real time quotes (up to tick rates) + which is executed as a lone task that is cancellable via + a dedicated cancel scope. ''' + class Config: + arbitrary_types_allowed = True + underscore_attrs_are_private = False + brokername: str nursery: trio.Nursery - feeds: dict[str, tuple[trio.CancelScope, dict, dict]] = {} + feeds: dict[str, tuple[dict, dict]] = {} task_lock: trio.StrictFIFOLock = trio.StrictFIFOLock() @@ -86,14 +103,31 @@ class _FeedsBus(BaseModel): list[tuple[tractor.MsgStream, Optional[float]]] ] = {} - class Config: - arbitrary_types_allowed = True - underscore_attrs_are_private = False + async def start_task( + self, + target: Awaitable, + *args, - async def cancel_all(self) -> None: - for sym, (cs, msg, quote) in self.feeds.items(): - log.debug(f'Cancelling cached feed for {self.brokername}:{sym}') - cs.cancel() + ) -> None: + + async def start_with_cs( + task_status: TaskStatus[ + trio.CancelScope] = trio.TASK_STATUS_IGNORED, + ) -> None: + with trio.CancelScope() as cs: + await self.nursery.start( + target, + *args, + ) + task_status.started(cs) + + return await self.nursery.start(start_with_cs) + + # def cancel_task( + # self, + # task: trio.lowlevel.Task + # ) -> bool: + # ... _bus: _FeedsBus = None @@ -128,7 +162,8 @@ def get_feed_bus( @tractor.context async def _setup_persistent_brokerd( ctx: tractor.Context, - brokername: str + brokername: str, + ) -> None: ''' Allocate a actor-wide service nursery in ``brokerd`` @@ -136,44 +171,120 @@ async def _setup_persistent_brokerd( the broker backend as needed. ''' - try: - async with trio.open_nursery() as service_nursery: + get_console_log(tractor.current_actor().loglevel) - # assign a nursery to the feeds bus for spawning - # background tasks from clients - bus = get_feed_bus(brokername, service_nursery) + global _bus + assert not _bus - # unblock caller - await ctx.started() + async with trio.open_nursery() as service_nursery: + # assign a nursery to the feeds bus for spawning + # background tasks from clients + get_feed_bus(brokername, service_nursery) - # we pin this task to keep the feeds manager active until the - # parent actor decides to tear it down - await trio.sleep_forever() - finally: - # TODO: this needs to be shielded? - await bus.cancel_all() + # unblock caller + await ctx.started() + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() + + +async def manage_history( + mod: ModuleType, + shm: ShmArray, + bus: _FeedsBus, + symbol: str, + we_opened_shm: bool, + some_data_ready: trio.Event, + feed_is_live: trio.Event, + + task_status: TaskStatus = trio.TASK_STATUS_IGNORED, + +) -> None: + ''' + Load and manage historical data including the loading of any + available series from `marketstore` as well as conducting real-time + update of both that existing db and the allocated shared memory + buffer. + + ''' + task_status.started() + + opened = we_opened_shm + # TODO: history validation + # assert opened, f'Persistent shm for {symbol} was already open?!' + # if not opened: + # raise RuntimeError("Persistent shm for sym was already open?!") + + if opened: + # ask broker backend for new history + + # start history backfill task ``backfill_bars()`` is + # a required backend func this must block until shm is + # filled with first set of ohlc bars + cs = await bus.nursery.start(mod.backfill_bars, symbol, shm) + + # indicate to caller that feed can be delivered to + # remote requesting client since we've loaded history + # data that can be used. + some_data_ready.set() + + # detect sample step size for sampled historical data + times = shm.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + + # begin real-time updates of shm and tsb once the feed + # goes live. + await feed_is_live.wait() + + if opened: + sampler.ohlcv_shms.setdefault(delay_s, []).append(shm) + + # start shm incrementing for OHLC sampling at the current + # detected sampling period if one dne. + if sampler.incrementers.get(delay_s) is None: + cs = await bus.start_task( + increment_ohlc_buffer, + delay_s, + ) + + await trio.sleep_forever() + cs.cancel() async def allocate_persistent_feed( - bus: _FeedsBus, brokername: str, symbol: str, loglevel: str, + task_status: TaskStatus[trio.CancelScope] = trio.TASK_STATUS_IGNORED, ) -> None: + ''' + Create and maintain a "feed bus" which allocates tasks for real-time + streaming and optional historical data storage per broker/data provider + backend; this normally task runs *in* a `brokerd` actor. + If none exists, this allocates a ``_FeedsBus`` which manages the + lifetimes of streaming tasks created for each requested symbol. + + + 2 tasks are created: + - a real-time streaming task which connec + + ''' try: mod = get_brokermod(brokername) except ImportError: mod = get_ingestormod(brokername) - # allocate shm array for this broker/symbol - # XXX: we should get an error here if one already exists + fqsn = mk_fqsn(brokername, symbol) + # (maybe) allocate shm array for this broker/symbol which will + # be used for fast near-term history capture and processing. shm, opened = maybe_open_shm_array( - key=sym_to_shm_key(brokername, symbol), + key=fqsn, # use any broker defined ohlc dtype: dtype=getattr(mod, '_ohlc_dtype', base_iohlc_dtype), @@ -182,69 +293,81 @@ async def allocate_persistent_feed( readonly=False, ) - # do history validation? - # assert opened, f'Persistent shm for {symbol} was already open?!' - # if not opened: - # raise RuntimeError("Persistent shm for sym was already open?!") - + # mem chan handed to broker backend so it can push real-time + # quotes to this task for sampling and history storage (see below). send, quote_stream = trio.open_memory_channel(10) + + # data sync signals for both history loading and market quotes + some_data_ready = trio.Event() feed_is_live = trio.Event() - # establish broker backend quote stream - # ``stream_quotes()`` is a required backend func + # run 2 tasks: + # - a history loader / maintainer + # - a real-time streamer which consumers and sends new data to any + # consumers as well as writes to storage backends (as configured). + + # XXX: neither of these will raise but will cause an inf hang due to: + # https://github.com/python-trio/trio/issues/2258 + # bus.nursery.start_soon( + # await bus.start_task( + + await bus.nursery.start( + manage_history, + mod, + shm, + bus, + symbol, + opened, + some_data_ready, + feed_is_live, + ) + + # establish broker backend quote stream by calling + # ``stream_quotes()``, which is a required broker backend endpoint. init_msg, first_quotes = await bus.nursery.start( partial( mod.stream_quotes, send_chan=send, feed_is_live=feed_is_live, symbols=[symbol], - shm=shm, loglevel=loglevel, ) ) + # we hand an IPC-msg compatible shm token to the caller so it + # can read directly from the memory which will be written by + # this task. init_msg[symbol]['shm_token'] = shm.token - cs = bus.nursery.cancel_scope - - # TODO: make this into a composed type which also - # contains the backfiller cs for individual super-based - # resspawns when needed. - - # XXX: the ``symbol`` here is put into our native piker format (i.e. - # lower case). - bus.feeds[symbol.lower()] = (cs, init_msg, first_quotes) - - if opened: - # start history backfill task ``backfill_bars()`` is - # a required backend func this must block until shm is - # filled with first set of ohlc bars - await bus.nursery.start(mod.backfill_bars, symbol, shm) - - times = shm.array['time'] - delay_s = times[-1] - times[times != times[-1]][-1] + # TODO: pretty sure we don't need this? why not just leave 1s as + # the fastest "sample period" since we'll probably always want that + # for most purposes. # pass OHLC sample rate in seconds (be sure to use python int type) - init_msg[symbol]['sample_rate'] = int(delay_s) + # init_msg[symbol]['sample_rate'] = 1 #int(delay_s) - # yield back control to starting nursery + # yield back control to starting nursery once we receive either + # some history or a real-time quote. + log.info(f'waiting on history to load: {fqsn}') + await some_data_ready.wait() + + bus.feeds[symbol.lower()] = (init_msg, first_quotes) task_status.started((init_msg, first_quotes)) + # backend will indicate when real-time quotes have begun. await feed_is_live.wait() - if opened: - _shms.setdefault(delay_s, []).append(shm) - - # start shm incrementing for OHLC sampling - if _incrementers.get(delay_s) is None: - cs = await bus.nursery.start(increment_ohlc_buffer, delay_s) - sum_tick_vlm: bool = init_msg.get( 'shm_write_opts', {} ).get('sum_tick_vlm', True) # start sample loop try: - await sample_and_broadcast(bus, shm, quote_stream, sum_tick_vlm) + await sample_and_broadcast( + bus, + shm, + quote_stream, + sum_tick_vlm + ) finally: log.warning(f'{symbol}@{brokername} feed task terminated') @@ -257,29 +380,46 @@ async def open_feed_bus( symbol: str, loglevel: str, tick_throttle: Optional[float] = None, + start_stream: bool = True, ) -> None: + ''' + Open a data feed "bus": an actor-persistent per-broker task-oriented + data feed registry which allows managing real-time quote streams per + symbol. + ''' if loglevel is None: loglevel = tractor.current_actor().loglevel # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) + # local state sanity checks + # TODO: check for any stale shm entries for this symbol + # (after we also group them in a nice `/dev/shm/piker/` subdir). # ensure we are who we think we are assert 'brokerd' in tractor.current_actor().name bus = get_feed_bus(brokername) + bus._subscribers.setdefault(symbol, []) + fqsn = mk_fqsn(brokername, symbol) entry = bus.feeds.get(symbol) - bus._subscribers.setdefault(symbol, []) - # if no cached feed for this symbol has been created for this # brokerd yet, start persistent stream and shm writer task in # service nursery - async with bus.task_lock: - if entry is None: + if entry is None: + if not start_stream: + raise RuntimeError( + f'No stream feed exists for {fqsn}?\n' + f'You may need a `brokerd` started first.' + ) + + # allocate a new actor-local stream bus which will persist for + # this `brokerd`. + async with bus.task_lock: init_msg, first_quotes = await bus.nursery.start( partial( allocate_persistent_feed, @@ -295,21 +435,25 @@ async def open_feed_bus( loglevel=loglevel, ) ) + # TODO: we can remove this? assert isinstance(bus.feeds[symbol], tuple) # XXX: ``first_quotes`` may be outdated here if this is secondary # subscriber - cs, init_msg, first_quotes = bus.feeds[symbol] + init_msg, first_quotes = bus.feeds[symbol] # send this even to subscribers to existing feed? # deliver initial info message a first quote asap await ctx.started((init_msg, first_quotes)) + if not start_stream: + log.warning(f'Not opening real-time stream for {fqsn}') + await trio.sleep_forever() + + # real-time stream loop async with ( ctx.open_stream() as stream, - trio.open_nursery() as n, ): - if tick_throttle: # open a bg task which receives quotes over a mem chan @@ -317,7 +461,7 @@ async def open_feed_bus( # a max ``tick_throttle`` instantaneous rate. send, recv = trio.open_memory_channel(2**10) - n.start_soon( + cs = await bus.start_task( uniform_rate_send, tick_throttle, recv, @@ -333,7 +477,6 @@ async def open_feed_bus( try: uid = ctx.chan.uid - fqsn = f'{symbol}.{brokername}' async for msg in stream: @@ -353,8 +496,11 @@ async def open_feed_bus( finally: log.info( f'Stopping {symbol}.{brokername} feed for {ctx.chan.uid}') + if tick_throttle: - n.cancel_scope.cancel() + # TODO: a one-cancels-one nursery + # n.cancel_scope.cancel() + cs.cancel() try: bus._subscribers[symbol].remove(sub) except ValueError: @@ -367,6 +513,7 @@ async def open_sample_step_stream( delay_s: int, ) -> tractor.ReceiveMsgStream: + # XXX: this should be singleton on a host, # a lone broker-daemon per provider should be # created for all practical purposes @@ -375,8 +522,8 @@ async def open_sample_step_stream( portal.open_stream_from, iter_ohlc_periods, ), - kwargs={'delay_s': delay_s}, + ) as (cache_hit, istream): if cache_hit: # add a new broadcast subscription for the quote stream @@ -389,13 +536,15 @@ async def open_sample_step_stream( @dataclass class Feed: - """A data feed for client-side interaction with far-process# }}} - real-time data sources. + ''' + A data feed for client-side interaction with far-process real-time + data sources. This is an thin abstraction on top of ``tractor``'s portals for - interacting with IPC streams and conducting automatic - memory buffer orchestration. - """ + interacting with IPC streams and storage APIs (shm and time-series + db). + + ''' name: str shm: ShmArray mod: ModuleType @@ -407,7 +556,7 @@ class Feed: throttle_rate: Optional[int] = None _trade_stream: Optional[AsyncIterator[dict[str, Any]]] = None - _max_sample_rate: int = 0 + _max_sample_rate: int = 1 # cache of symbol info messages received as first message when # a stream startsc. @@ -442,13 +591,6 @@ class Feed: await self.stream.send('resume') -def sym_to_shm_key( - broker: str, - symbol: str, -) -> str: - return f'{broker}.{symbol}' - - @asynccontextmanager async def install_brokerd_search( @@ -485,11 +627,12 @@ async def install_brokerd_search( @asynccontextmanager async def open_feed( - brokername: str, - symbols: Sequence[str], + symbols: list[str], loglevel: Optional[str] = None, + backpressure: bool = True, + start_stream: bool = True, tick_throttle: Optional[float] = None, # Hz ) -> Feed: @@ -507,18 +650,20 @@ async def open_feed( # no feed for broker exists so maybe spawn a data brokerd async with ( + # if no `brokerd` for this backend exists yet we spawn + # and actor for one. maybe_spawn_brokerd( brokername, loglevel=loglevel ) as portal, + # (allocate and) connect to any feed bus for this broker portal.open_context( - open_feed_bus, brokername=brokername, symbol=sym, loglevel=loglevel, - + start_stream=start_stream, tick_throttle=tick_throttle, ) as (ctx, (init_msg, first_quotes)), @@ -527,7 +672,7 @@ async def open_feed( # XXX: be explicit about stream backpressure since we should # **never** overrun on feeds being too fast, which will # pretty much always happen with HFT XD - backpressure=True + backpressure=backpressure, ) as stream, ): @@ -546,12 +691,10 @@ async def open_feed( _portal=portal, throttle_rate=tick_throttle, ) - ohlc_sample_rates = [] for sym, data in init_msg.items(): si = data['symbol_info'] - ohlc_sample_rates.append(data['sample_rate']) symbol = mk_symbol( key=sym, @@ -572,9 +715,8 @@ async def open_feed( assert shm_token == shm.token # sanity - feed._max_sample_rate = max(ohlc_sample_rates) + feed._max_sample_rate = 1 - # yield feed try: yield feed finally: @@ -586,7 +728,7 @@ async def open_feed( async def maybe_open_feed( brokername: str, - symbols: Sequence[str], + symbols: list[str], loglevel: Optional[str] = None, **kwargs, @@ -607,12 +749,16 @@ async def maybe_open_feed( 'symbols': [sym], 'loglevel': loglevel, 'tick_throttle': kwargs.get('tick_throttle'), + + # XXX: super critical to have bool defaults here XD + 'backpressure': kwargs.get('backpressure', True), + 'start_stream': kwargs.get('start_stream', True), }, key=sym, ) as (cache_hit, feed): if cache_hit: - print('USING CACHED FEED') + log.info(f'Using cached feed for {brokername}.{sym}') # add a new broadcast subscription for the quote stream # if this feed is likely already in use async with feed.stream.subscribe() as bstream: diff --git a/piker/fsp/_engine.py b/piker/fsp/_engine.py index 1b853c60..f1dd49d7 100644 --- a/piker/fsp/_engine.py +++ b/piker/fsp/_engine.py @@ -123,7 +123,6 @@ async def fsp_compute( # TODO: push using a[['f0', 'f1', .., 'fn']] = .. syntax no? # if the output array is multi-field then push # each respective field. - # await tractor.breakpoint() fields = getattr(dst.array.dtype, 'fields', None).copy() fields.pop('index') # TODO: nptyping here! @@ -269,7 +268,7 @@ async def cascade( f'Registered FSP set:\n{lines}' ) - # update actor local flows table which registers + # update actorlocal flows table which registers # readonly "instances" of this fsp for symbol/source # so that consumer fsps can look it up by source + fsp. # TODO: ugh i hate this wind/unwind to list over the wire @@ -381,14 +380,19 @@ async def cascade( s, step, ld = is_synced(src, dst) + # detect sample period step for subscription to increment + # signal + times = src.array['time'] + delay_s = times[-1] - times[times != times[-1]][-1] + # Increment the underlying shared memory buffer on every # "increment" msg received from the underlying data feed. - async with feed.index_stream() as stream: + async with feed.index_stream(int(delay_s)) as istream: profiler(f'{func_name}: sample stream up') profiler.finish() - async for msg in stream: + async for _ in istream: # respawn the compute task if the source # array has been updated such that we compute diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 6048ca42..2a3689a3 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -818,11 +818,18 @@ class ChartPlotWidget(pg.PlotWidget): def default_view( self, index: int = -1, - ) -> None: - """Set the view box to the "default" startup view of the scene. - """ - xlast = self._arrays[self.name][index]['index'] + ) -> None: + ''' + Set the view box to the "default" startup view of the scene. + + ''' + try: + xlast = self._arrays[self.name][index]['index'] + except IndexError: + log.warning(f'array for {self.name} not loaded yet?') + return + begin = xlast - _bars_to_left_in_follow_mode end = xlast + _bars_from_right_in_follow_mode @@ -840,6 +847,8 @@ class ChartPlotWidget(pg.PlotWidget): def increment_view( self, + steps: int = 1, + ) -> None: """ Increment the data view one step to the right thus "following" @@ -848,8 +857,8 @@ class ChartPlotWidget(pg.PlotWidget): """ l, r = self.view_range() self.view.setXRange( - min=l + 1, - max=r + 1, + min=l + steps, + max=r + steps, # TODO: holy shit, wtf dude... why tf would this not be 0 by # default... speechless. @@ -858,7 +867,6 @@ class ChartPlotWidget(pg.PlotWidget): def draw_ohlc( self, - name: str, data: np.ndarray, diff --git a/piker/ui/_curve.py b/piker/ui/_curve.py index 7fc43e4e..f10f874c 100644 --- a/piker/ui/_curve.py +++ b/piker/ui/_curve.py @@ -108,7 +108,6 @@ class FastAppendCurve(pg.PlotCurveItem): path redraw. ''' - def __init__( self, *args, @@ -167,7 +166,13 @@ class FastAppendCurve(pg.PlotCurveItem): y: np.ndarray, ) -> QtGui.QPainterPath: + ''' + Update curve from input 2-d data. + Compare with a cached "x-range" state and (pre/a)ppend based on + a length diff. + + ''' profiler = pg.debug.Profiler(disabled=not pg_profile_enabled()) flip_cache = False @@ -316,12 +321,19 @@ class FastAppendCurve(pg.PlotCurveItem): self.setCacheMode(QtWidgets.QGraphicsItem.DeviceCoordinateCache) def disable_cache(self) -> None: + ''' + Disable the use of the pixel coordinate cache and trigger a geo event. + + ''' # XXX: pretty annoying but, without this there's little # artefacts on the append updates to the curve... self.setCacheMode(QtWidgets.QGraphicsItem.NoCache) self.prepareGeometryChange() def boundingRect(self): + ''' + Compute and then cache our rect. + ''' if self.path is None: return QtGui.QPainterPath().boundingRect() else: @@ -331,9 +343,10 @@ class FastAppendCurve(pg.PlotCurveItem): return self._br() def _br(self): - """Post init ``.boundingRect()```. + ''' + Post init ``.boundingRect()```. - """ + ''' hb = self.path.controlPointRect() hb_size = hb.size() # print(f'hb_size: {hb_size}') diff --git a/piker/ui/_display.py b/piker/ui/_display.py index 3bfd327a..c6d0f5aa 100644 --- a/piker/ui/_display.py +++ b/piker/ui/_display.py @@ -30,7 +30,7 @@ import tractor import trio from .. import brokers -from ..data.feed import open_feed, Feed +from ..data.feed import open_feed from ._chart import ( ChartPlotWidget, LinkedSplits, @@ -43,7 +43,7 @@ from ._fsp import ( has_vlm, open_vlm_displays, ) -from ..data._sharedmem import ShmArray, try_read +from ..data._sharedmem import ShmArray from ._forms import ( FieldsForm, mk_order_pane_layout, @@ -90,7 +90,10 @@ def chart_maxmin( l, lbar, rbar, r = last_bars_range in_view = array[lbar - ifirst:rbar - ifirst + 1] - assert in_view.size + if not in_view.size: + log.warning('Resetting chart to data') + chart.default_view() + return (last_bars_range, 0, 0, 0) mx, mn = np.nanmax(in_view['high']), np.nanmin(in_view['low']) @@ -107,6 +110,7 @@ def chart_maxmin( async def graphics_update_loop( + linked: LinkedSplits, stream: tractor.MsgStream, ohlcv: np.ndarray, @@ -131,6 +135,7 @@ async def graphics_update_loop( # of copying it from last bar's close # - 1-5 sec bar lookback-autocorrection like tws does? # (would require a background history checker task) + display_rate = linked.godwidget.window.current_screen().refreshRate() chart = linked.chart @@ -145,9 +150,8 @@ async def graphics_update_loop( vlm_view = vlm_chart.view maxmin = partial(chart_maxmin, chart, vlm_chart) - chart.default_view() - + last_bars_range: tuple[float, float] ( last_bars_range, last_mx, @@ -181,6 +185,7 @@ async def graphics_update_loop( chart.show() view = chart.view last_quote = time.time() + i_last = ohlcv.index # async def iter_drain_quotes(): # # NOTE: all code below this loop is expected to be synchronous @@ -215,7 +220,8 @@ async def graphics_update_loop( # in the absolute worst case we shouldn't see more then # twice the expected throttle rate right!? - and quote_rate >= _quote_throttle_rate * 1.5 + # and quote_rate >= _quote_throttle_rate * 2 + and quote_rate >= display_rate ): log.warning(f'High quote rate {symbol.key}: {quote_rate}') @@ -244,6 +250,22 @@ async def graphics_update_loop( # https://github.com/pikers/piker/issues/116 array = ohlcv.array + # NOTE: this used to be implemented in a dedicated + # "increment tas": ``check_for_new_bars()`` but it doesn't + # make sense to do a whole task switch when we can just do + # this simple index-diff and all the fsp sub-curve graphics + # are diffed on each draw cycle anyway; so updates to the + # "curve" length is already automatic. + + # increment the view position by the sample offset. + i_step = ohlcv.index + i_diff = i_step - i_last + if i_diff > 0: + chart.increment_view( + steps=i_diff, + ) + i_last = i_step + if vlm_chart: vlm_chart.update_curve_from_array('volume', array) vlm_sticky.update_from_data(*array[-1][['index', 'volume']]) @@ -425,79 +447,7 @@ async def graphics_update_loop( ) # chart.view._set_yrange() - -async def check_for_new_bars( - feed: Feed, - ohlcv: np.ndarray, - linkedsplits: LinkedSplits, - -) -> None: - ''' - Task which updates from new bars in the shared ohlcv buffer every - ``delay_s`` seconds. - - ''' - # TODO: right now we'll spin printing bars if the last time - # stamp is before a large period of no market activity. - # Likely the best way to solve this is to make this task - # aware of the instrument's tradable hours? - - price_chart = linkedsplits.chart - price_chart.default_view() - - async with feed.index_stream() as stream: - async for index in stream: - # update chart historical bars graphics by incrementing - # a time step and drawing the history and new bar - - # When appending a new bar, in the time between the insert - # from the writing process and the Qt render call, here, - # the index of the shm buffer may be incremented and the - # (render) call here might read the new flat bar appended - # to the buffer (since -1 index read). In that case H==L and the - # body will be set as None (not drawn) on what this render call - # *thinks* is the curent bar (even though it's reading data from - # the newly inserted flat bar. - # - # HACK: We need to therefore write only the history (not the - # current bar) and then either write the current bar manually - # or place a cursor for visual cue of the current time step. - - array = ohlcv.array - # avoid unreadable race case on backfills - while not try_read(array): - await trio.sleep(0.01) - - # XXX: this puts a flat bar on the current time step - # TODO: if we eventually have an x-axis time-step "cursor" - # we can get rid of this since it is extra overhead. - price_chart.update_ohlc_from_array( - price_chart.name, - array, - just_history=False, - ) - - # main chart overlays - # for name in price_chart._flows: - for curve_name in price_chart._flows: - price_chart.update_curve_from_array( - curve_name, - price_chart._arrays[curve_name] - ) - - # each subplot - for name, chart in linkedsplits.subplots.items(): - - # TODO: do we need the same unreadable guard as for the - # price chart (above) here? - chart.update_curve_from_array( - chart.name, - chart._shm.array, - array_key=chart.data_key - ) - - # shift the view if in follow mode - price_chart.increment_view() + # loop end async def display_symbol_data( @@ -548,7 +498,8 @@ async def display_symbol_data( # load in symbol's ohlc data godwidget.window.setWindowTitle( f'{symbol.key}@{symbol.brokers} ' - f'tick:{symbol.tick_size}' + f'tick:{symbol.tick_size} ' + f'step:1s ' ) linkedsplits = godwidget.linkedsplits @@ -627,14 +578,6 @@ async def display_symbol_data( vlm_chart, ) - # start sample step incrementer - ln.start_soon( - check_for_new_bars, - feed, - ohlcv, - linkedsplits - ) - async with ( open_order_mode( feed, diff --git a/piker/ui/_fsp.py b/piker/ui/_fsp.py index eac8f27d..ac35067c 100644 --- a/piker/ui/_fsp.py +++ b/piker/ui/_fsp.py @@ -813,7 +813,7 @@ async def open_vlm_displays( flow_rates, { # fsp engine conf 'func_name': 'flow_rates', - 'zero_on_step': True, + 'zero_on_step': False, }, # loglevel, ) diff --git a/setup.py b/setup.py index 6f8fd898..faaa8dac 100755 --- a/setup.py +++ b/setup.py @@ -66,7 +66,6 @@ setup( 'numpy', 'numba', 'pandas', - 'msgpack-numpy', # UI 'PyQt5',