From c6da09f3c65627324a08e513b626ddc581f1b68c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 18 May 2023 18:07:12 -0400 Subject: [PATCH] Add fast(er), time-sorted ledger records Turns out that reading **and** writing with `tomlkit` is just wayya slow for large documents like ledger files so move to using the `tomli` sibling pkg `tomli-w` which seems to much improve on the latency, though obviously longer run we're likely going to want: - a better algorithm for only back loading records using as little history as possible - a different serialization format for production maybe something like apache parquet? The only issue with using a non-style-preserving writer is that we don't necessarily get TOML conf ordering for free (without first ordering it ourselves), and thus this patch also adds much more general date-time sorting machinery which is now **required** when using `open_trades_ledger()` via a `tx_sort: Callable`. By default we now provide `.accounting._ledger.iter_by_dt()` (exposed in the subpkg mod) which conducts dynamic "datetime key detection" based parsing of records based on a `parsers: dict[str, Callabe]` input table. The default should handle most use cases including all currently supported live backends (kraken, ib) as well as our paper engine ledger-records format. Granulars: - adjust `Position.iter_clears()` to use new `iter_by_dt(key=lambda ..)` signature. - add `tomli-w` to setup and our `tomlkit` fork to requirements file. - move `.write_config()` to bottom of class defn. - fix closed pos popping to not error if pp was already popped.. --- piker/accounting/__init__.py | 1 + piker/accounting/_ledger.py | 146 +++++++++++++++++++++-------------- piker/accounting/_pos.py | 18 ++++- requirements.txt | 5 ++ setup.py | 6 +- 5 files changed, 113 insertions(+), 63 deletions(-) diff --git a/piker/accounting/__init__.py b/piker/accounting/__init__.py index 94779319..4c4a0ca1 100644 --- a/piker/accounting/__init__.py +++ b/piker/accounting/__init__.py @@ -22,6 +22,7 @@ for tendiez. from ..log import get_logger from ._ledger import ( + iter_by_dt, Transaction, TransactionLedger, open_trade_ledger, diff --git a/piker/accounting/_ledger.py b/piker/accounting/_ledger.py index 9c3f80a4..4073b3a6 100644 --- a/piker/accounting/_ledger.py +++ b/piker/accounting/_ledger.py @@ -32,10 +32,11 @@ from typing import ( from pendulum import ( datetime, + DateTime, + from_timestamp, parse, ) -import tomlkit -import tomli +import tomli_w # for fast ledger writing from .. import config from ..data.types import Struct @@ -116,37 +117,13 @@ class TransactionLedger(UserDict): self, ledger_dict: dict, file_path: Path, + tx_sort: Callable, ) -> None: self.file_path = file_path + self.tx_sort = tx_sort super().__init__(ledger_dict) - def write_config(self) -> None: - ''' - Render the self.data ledger dict to it's TOML file form. - - ''' - towrite: dict[str, Any] = self.data.copy() - - for tid, txdict in self.data.items(): - - # drop key for non-expiring assets - if ( - 'expiry' in txdict - and txdict['expiry'] is None - ): - txdict.pop('expiry') - - # re-write old acro-key - fqme = txdict.get('fqsn') - if fqme: - txdict['fqme'] = fqme - - print(f'WRITING LEDGER {self.file_path}') - with self.file_path.open(mode='w') as fp: - tomlkit.dump(towrite, fp) - print(f'FINISHED WRITING LEDGER {self.file_path}') - def update_from_t( self, t: Transaction, @@ -182,6 +159,7 @@ class TransactionLedger(UserDict): # and instead call it for each entry incrementally: # normer = mod.norm_trade_record(txdict) + # TODO: use tx_sort here yah? for tid, txdict in self.data.items(): # special field handling for datetimes # to ensure pendulum is used! @@ -195,22 +173,20 @@ class TransactionLedger(UserDict): # the ``.sys: MktPair`` info, so skip. continue - yield ( - tid, - Transaction( - fqsn=fqme, - tid=txdict['tid'], - dt=dt, - price=txdict['price'], - size=txdict['size'], - cost=txdict.get('cost', 0), - bs_mktid=txdict['bs_mktid'], + tx = Transaction( + fqsn=fqme, + tid=txdict['tid'], + dt=dt, + price=txdict['price'], + size=txdict['size'], + cost=txdict.get('cost', 0), + bs_mktid=txdict['bs_mktid'], - # TODO: change to .sys! - sym=mkt, - expiry=parse(expiry) if expiry else None, - ) + # TODO: change to .sys! + sym=mkt, + expiry=parse(expiry) if expiry else None, ) + yield tid, tx def to_trans( self, @@ -223,12 +199,81 @@ class TransactionLedger(UserDict): ''' return dict(self.iter_trans(**kwargs)) + def write_config( + self, + + ) -> None: + ''' + Render the self.data ledger dict to it's TOML file form. + + ''' + cpy = self.data.copy() + towrite: dict[str, Any] = {} + for tid, trans in cpy.items(): + + # drop key for non-expiring assets + txdict = towrite[tid] = self.data[tid] + if ( + 'expiry' in txdict + and txdict['expiry'] is None + ): + txdict.pop('expiry') + + # re-write old acro-key + fqme = txdict.get('fqsn') + if fqme: + txdict['fqme'] = fqme + + with self.file_path.open(mode='wb') as fp: + tomli_w.dump(towrite, fp) + + +def iter_by_dt( + records: dict[str, Any], + + # NOTE: parsers are looked up in the insert order + # so if you know that the record stats show some field + # is more common then others, stick it at the top B) + parsers: dict[tuple[str], Callable] = { + 'dt': None, # parity case + 'datetime': parse, # datetime-str + 'time': from_timestamp, # float epoch + }, + key: Callable | None = None, + +) -> Iterator[tuple[str, dict]]: + ''' + Iterate entries of a ``records: dict`` table sorted by entry recorded + datetime presumably set at the ``'dt'`` field in each entry. + + ''' + txs = records.items() + + def dyn_parse_to_dt( + pair: tuple[str, dict], + ) -> DateTime: + _, txdict = pair + k, v, parser = next( + (k, txdict[k], parsers[k]) for k in parsers if k in txdict + ) + + return parser(v) if parser else v + + for tid, data in sorted( + records.items(), + key=key or dyn_parse_to_dt, + ): + yield tid, data + @cm def open_trade_ledger( broker: str, account: str, + # default is to sort by detected datetime-ish field + tx_sort: Callable = iter_by_dt, + ) -> Generator[dict, None, None]: ''' Indempotently create and read in a trade log file from the @@ -244,6 +289,7 @@ def open_trade_ledger( ledger = TransactionLedger( ledger_dict=cpy, file_path=fpath, + tx_sort=tx_sort, ) try: yield ledger @@ -254,19 +300,3 @@ def open_trade_ledger( # https://stackoverflow.com/questions/12956957/print-diff-of-python-dictionaries log.info(f'Updating ledger for {fpath}:\n') ledger.write_config() - - -def iter_by_dt( - clears: dict[str, Any], - -) -> Iterator[tuple[str, dict]]: - ''' - Iterate entries of a ``clears: dict`` table sorted by entry recorded - datetime presumably set at the ``'dt'`` field in each entry. - - ''' - for tid, data in sorted( - list(clears.items()), - key=lambda item: item[1]['dt'], - ): - yield tid, data diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index 9a07614c..2d3700bc 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -307,10 +307,16 @@ class Position(Struct): datetime-stamped order. ''' - return iter_by_dt(self.clears) + # sort on the already existing datetime that should have + # been generated for the entry's table + return iter_by_dt( + self.clears, + key=lambda entry: entry[1]['dt'] + ) def calc_ppu( self, + # include transaction cost in breakeven price # and presume the worst case of the same cost # to exit this transaction (even though in reality @@ -726,7 +732,15 @@ class PpTable(Struct): if closed: bs_mktid: str for bs_mktid, pos in closed.items(): - self.conf.pop(pos.symbol.fqme) + fqme: str = pos.symbol.fqme + if fqme in self.conf: + self.conf.pop(fqme) + else: + # TODO: we reallly need a diff set of + # loglevels/colors per subsys. + log.warning( + f'Recent position for {fqme} was closed!' + ) # if there are no active position entries according # to the toml dump output above, then clear the config diff --git a/requirements.txt b/requirements.txt index 25951629..ba4dc620 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,3 +13,8 @@ # ``asyncvnc`` for sending interactions to ib-gw inside docker -e git+https://github.com/pikers/asyncvnc.git@main#egg=asyncvnc + + +# ``tomlkit`` for account files and configs; we've +# added some new features that need to get upstreamed: +-e git+https://github.com/pikers/tomlkit.git@writing_docs_tweaks#egg=tomlkit diff --git a/setup.py b/setup.py index a3e60cd6..c63622b2 100755 --- a/setup.py +++ b/setup.py @@ -44,8 +44,9 @@ setup( ] }, install_requires=[ - 'tomlkit', # fork & fix for now: + # 'tomlkit', # fork & fix for now.. 'tomli', # for pre-3.11 + 'tomli-w', # for fast ledger writing 'colorlog', 'attrs', 'pygments', @@ -65,8 +66,7 @@ setup( # normally pinned to particular git hashes.. # 'tractor', # 'asyncvnc', - # 'pyqtgraph', - # anyio-marketstore # mkts tsdb client + # 'anyio-marketstore', # mkts tsdb client # brokers 'asks', # for non-ws rest apis