Not only improves startup latency but also avoids a bug where the rt
buffer was being tsdb-history prepended *before* the backfilling of
recent data from the backend was complete resulting in our of order
frames in shm.
If a history manager raises a `DataUnavailable` just assume the sample
rate isn't supported and that no shm prepends will be done. Further seed
the shm array in such cases as before from the 1m history's last datum.
Also, fix tsdb -> shm back-loading, cancelling tsdb queries when either
no array-data is returned or a frame is delivered which has a start time
no lesser then the least last retrieved. Use strict timeframes for every
`Storage` API call.
Our default sample periods are 60s (1m) for the history chart and 1s for
the fast chart. This patch adds concurrent loading of both (or more)
different sample period data sets using the existing loading code but
with new support for looping through a passed "timeframe" table which
points to each shm instance.
More detailed adjustments include:
- breaking the "basic" and tsdb loading into 2 new funcs:
`basic_backfill()` and `tsdb_backfill()` the latter of which is run
when the tsdb daemon is discovered.
- adjust the fast shm buffer to offset with one day's worth of 1s so
that only up to a day is backfilled as history in the fast chart.
- adjust bus task starting in `manage_history()` to deliver back the
offset indices for both fast and slow shms and set them on the
`Feed` object as `.izero_hist/rt: int` values:
- allows the chart-UI linked view region handlers to use the offsets
in the view-linking-transform math to index-align the history and
fast chart.
It doesn't seem to be any slower on our least throttled backend
(binance) and it removes a bunch of hard to get correct frame
re-ordering logic that i'm not sure really ever fully worked XD
Commented some issues we still need to resolve as well.
Adjust all history query machinery to pass a `timeframe: int` in seconds
and set default of 60 (aka 1m) such that history views from here forward
will be 1m sampled OHLCV. Further when the tsdb is detected as up load
a full 10 years of data if possible on the 1m - backends will eventually
get a config section (`brokers.toml`) that allow user's to tune this.
As part of supporting a "history view" chart which shows downsampled
datums alongside our 1s (or higher) sampled OHLC we need a separate
buffer to store a the slower history from broker backends. This begins
that design by allocating 2 buffers:
- `rt_shm: ShmArray` which maps to a `/dev/shm/` file with `_rt` suffix
- `hist_shm: ShmArray` which maps to a file with `_hist` suffix
Deliver both of these shms back from both `manage_history()` and load
them as `Feed.rt_shm`/`.hist_shm` on the client side.
Impl deats:
- init the rt buffer with the first datum from loaded history and
assign all OHLC values to that row's 'close' and the vlm to 0.
- pass the hist buffer to the backfiller task
- only spawn **one** global sampler array-row increment task per
`brokerd` and pass in the 1s delay which we presume is our lowest
OHLC sample rate for now.
- drop `open_sample_step_stream()` and just move its body contents into
`Feed.index_stream()`
It seems once in a while a frame can get missed or dropped (at least
with binance?) so in those cases, when the request erlangs is already at
max, we just manually request the missing frame and presume things will
work out XD
Further, discard out of order frames that are "from the future" that
somehow end up in the async queue once in a while? Not sure why this
happens but it seems thus far just discarding them is nbd.
Bleh/🤦, the ``end_dt`` in scope is not the "earliest" frame's
`end_dt` in the async response queue.. Parse the queue's latest epoch
and use **that** to compare to the last last pushed datetime index..
Add more detailed logging to help debug any (un)expected datetime index
gaps.
When the tsdb has a last datum that is in the past less then a "frame's
worth" of sample steps we need to slice out only the data from the
latest frame that doesn't overlap; this fixes that slice logic..
Previously i dunno wth it was doing..
Expect each backend to deliver a `config: dict[str, Any]` which provides
concurrency controls to `trimeter`'s batch task scheduler such that
backends can define their own concurrency limits.
The dirty deats in this patch include handling history "gaps" where
a query returns a history-frame-result which spans more then the typical
frame size (in seconds). In such cases we reset the target frame index
(datetime index sequence implemented with a `pendulum.Period`) using
a generator protocol `.send()` such that the sequence can be dynamically
re-indexed starting at the new (possibly) pre-gap datetime. The new gap
logic also allows us to detect out of order frames easier and thus wait
for the next-in-order to arrive before making more requests.
Use the new `open_history_client()` endpoint/API and expect backends to
provide a history "getter" routine that can be called to load historical
data into shm even when **not** using a tsdb. Add logic for filling in
data from the tsdb once the backend has provided data up to the last
recorded in the db. Add logic for avoiding overruns of the shm buffer
with more-then-necessary queries of tsdb data.
If `marketstore` is detected try to only load most recent missing data
from the data provider (broker) and the rest from the tsdb and push it
all to shm for display in the UI. If the provider/broker doesn't have
the history client endpoint, just use the old one for now so we can
start to incrementally add support. Don't start the ohlc step
incrementer task until the backend signals that the feed is live.
Also, Start tinkering with `tractor.trionics.ipython_embed()`
In effort to get back to a usable REPL around the mkts client
this adds usage of the new `tractor` integration api as well as logic
for skipping backfilling if existing tsdb arrays are found.
Starts a wrapper around the `marketstore` client to do basic ohlcv query
and retrieval and prototypes out write methods for ohlc and tick.
Try to connect to `marketstore` automatically (which will fail if not
started currently) but we will eventually first do a service query.
Further:
- get `pikerd` working with and without `--tsdb` flag.
- support spawning `brokerd` with no real-time quotes.
- bring back in "fqsn" support that was originally not
in this history before commits factoring.
In order to support instruments with lifetimes (aka derivatives) we need
generally need special symbol annotations which detail such meta data
(such as `MNQ.GLOBEX.20220717` for daq futes). Further there is really
no reason for the public api for this feed layer to care about getting
a special "brokername" field since generally the data is coming directly
from UIs (eg. search selection) so we might as well accept a fqsn (fully
qualified symbol name) which includes the broker name; for now a suffix
like `'.ib'`. We may change this schema (soon) but this at least gets us
to a point where we expect the full name including broker/provider.
An additional detail: for certain "generic" symbol names (like for
futes) we will pull a so called "front contract" and map this to
a specific fqsn underneath, so there is a double (cached) entry for that
entry such that other consumers can use it the same way if desired.
Some other machinery changes:
- expect the `stream_quotes()` endpoint to deliver it's `.started()` msg
almost immediately since we now need it deliver any fqsn asap (yes
this means the ep should no longer wait on a "live" first quote and
instead deliver what quote data it can right away.
- expect the quotes ohlc sampler task to add in the broker name before
broadcast to remote (actor) consumers since the backend isn't (yet)
expected to do that add in itself.
- obviously we start using all the new fqsn related `Symbol` apis
Break up real-time quote feed and history loading into 2 separate tasks
and deliver a client side `data.Feed` as soon as history is loaded
(instead of waiting for a rt quote - the previous logic). If
a symbol doesn't have history then likely the feed shouldn't be loaded
(since presumably client code will need at least "some" datums history
to do anything) and waiting on a real-time quote is dumb, since it'll
hang if the market isn't open XD. If a symbol doesn't have history we
can always write a zero/null array when we run into that case. This also
greatly speeds up feed loading when both history and quotes are available.
TL;DR summary:
- add a `_Feedsbus.start_task()` one-cancel-scope-per-task method for
assisting with (re-)starting and stopping long running persistent
feeds (basically a "one cancels one" style nursery API).
- add a `manage_history()` task which does all history loading (and
eventually real-time writing) which has an independent signal and
start it in a separate task.
- drop the "sample rate per symbol" stuff since client code doesn't really
care when it can just inspect shm indexing/time-steps itself.
- run throttle tasks in the bus nursery thus avoiding cancelling the
underlying sampler task on feed client disconnects.
- don't store a repeated ref the bus nursery's cancel scope..
This should in theory result in increased burstiness since we remove
the plain `trio.sleep()` and instead always wait on the receive channel
as much as possible until the `trio.move_on_after()` (+ time diffing
calcs) times out and signals the next throttled send cycle. This also is
slightly easier to grok code-wise instead of the `try, except` and
another tight while loop until a `trio.WouldBlock`. The only simpler
way i can think to do it is with 2 tasks: 1 to collect ticks and the
other to read and send at the throttle rate.
Comment out the log msg for now to avoid latency and add much more
detailed comments. Add an overrun log msg to the main sample loop.
Try out he new broadcast channels from `tractor` for data feeds
we already have cached. Any time there's a cache hit we load the
cached feed and just slap a broadcast receiver on it for the local
consumer task.
If a client attaches to a quotes data feed and requests a throttle rate,
be sure to unsub that side-band memchan + task when it detaches and
especially so on any transport connection error.
Also, use an explicit `tractor.Context.cancel()` on the client feed
block exit since we removed the implicit cancel option from the
`tractor` api.
This allows for more deterministically managing long running sub-daemon
services under `pikerd` using the new context api from `tractor`.
The contexts are allocated in an async exit stack and torn down at root
daemon termination. Spawn brokerds using this method by changing the
persistence entry point to be a `@tractor.context`.