From b6d70d5012effa1d2be4f52e190272c238638b93 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 15 Sep 2025 18:29:19 -0400 Subject: [PATCH 01/11] ib-related: cope with invalid txn timestamps That is inside embedded `.accounting.calc.dyn_parse_to_dt()` closure add an optional `_invalid: list` param to where we can report bad-timestamped records which we instead override and return as `from_timestamp(0.)` (when the parser loop falls through) and report later (in summary ) from the `.accounting.calc.iter_by_dt()` caller. Add some logging and an optional debug block for future tracing. NOTE, this commit was re-edited during a conflict between the orig branches: `dev/binance_api_3.1` & `dev/alt_tpts_for_perf`. --- piker/accounting/calc.py | 82 +++++++++++++++++++++++++++++----------- 1 file changed, 60 insertions(+), 22 deletions(-) diff --git a/piker/accounting/calc.py b/piker/accounting/calc.py index ff12c055..64765e76 100644 --- a/piker/accounting/calc.py +++ b/piker/accounting/calc.py @@ -22,7 +22,9 @@ you know when you're losing money (if possible) XD from __future__ import annotations from collections.abc import ValuesView from contextlib import contextmanager as cm +from functools import partial from math import copysign +from pprint import pformat from typing import ( Any, Callable, @@ -37,12 +39,16 @@ from pendulum import ( parse, ) +from ..log import get_logger + if TYPE_CHECKING: from ._ledger import ( Transaction, TransactionLedger, ) +log = get_logger(__name__) + def ppu( clears: Iterator[Transaction], @@ -238,6 +244,9 @@ def iter_by_dt( def dyn_parse_to_dt( tx: tuple[str, dict[str, Any]] | Transaction, + + debug: bool = False, + _invalid: list|None = None, ) -> DateTime: # handle `.items()` inputs @@ -250,52 +259,81 @@ def iter_by_dt( # get best parser for this record.. for k in parsers: if ( - isdict and k in tx + (v := getattr(tx, k, None)) or - getattr(tx, k, None) + ( + isdict + and + (v := tx.get(k)) + ) ): - v = ( - tx[k] if isdict - else tx.dt - ) - assert v is not None, ( - f'No valid value for `{k}`!?' - ) - # only call parser on the value if not None from # the `parsers` table above (when NOT using # `.get()`), otherwise pass through the value and # sort on it directly if ( not isinstance(v, DateTime) - and (parser := parsers.get(k)) + and + (parser := parsers.get(k)) ): - return parser(v) + ret = parser(v) else: - return v + ret = v + return ret + + else: + log.debug( + f'Parser-field not found in txn\n' + f'\n' + f'parser-field: {k!r}\n' + f'txn: {tx!r}\n' + f'\n' + f'Trying next..\n' + ) + continue + + # XXX: should never get here.. else: - # TODO: move to top? - from piker.log import get_logger - log = get_logger(__name__) - # XXX: we should really never get here.. # only if a ledger record has no expected sort(able) # field will we likely hit this.. like with ze IB. # if no sortable field just deliver epoch? log.warning( 'No (time) sortable field for TXN:\n' - f'{tx}\n' + f'{tx!r}\n' ) - return from_timestamp(0) - # breakpoint() + if debug: + import tractor + with tractor.devx.maybe_open_crash_handler(): + raise ValueError( + f'No supported time-field found in txn !?\n' + f'\n' + f'supported-time-fields: {parsers!r}\n' + f'\n' + f'txn: {tx!r}\n' + ) + if _invalid is not None: + _invalid.append(tx) + return from_timestamp(0.) - entry: tuple[str, dict] | Transaction + entry: tuple[str, dict]|Transaction + invalid: list = [] for entry in sorted( records, - key=key or dyn_parse_to_dt, + key=key or partial( + dyn_parse_to_dt, + _invalid=invalid, + ), ): + if entry in invalid: + log.warning( + f'Ignoring txn w invalid timestamp ??\n' + f'{pformat(entry)}\n' + ) + continue + # NOTE the type sig above; either pairs or txns B) yield entry From d67ace75a4a402ceb1e5bfd4c07fb722d323081b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 26 Sep 2025 15:17:41 -0400 Subject: [PATCH 02/11] Don't override `Account.pps: dict` entries.. Despite a `.bs_mktid` ideally being a bijection with `MktPair.fqme` values, apparently some backends (cough IB) will switch the .` part in txn records resulting in multiple account-conf-file sections for the same dst asset. Obviously that means we can't allocate new `Position` entries keyed by that `bs_mktid`, instead be sure to **update them instead**! Deats, - add case logic to avoid pp overwrites using a `pp_objs.get()` check. - warn on duplicated pos entries whenever the current account-file entry's `mkt` doesn't match the pre-existing position's. - mk `Position.add_clear()` return a `bool` indicating if the record was newly added, warn when it was already existing/added prior. Also, - drop the already deprecated `open_pps()`, also from sub-pkg exports. - draft TODO for `Position.summary()` idea as a replacement for `BrokerdPosition`-msgs. --- piker/accounting/__init__.py | 2 -- piker/accounting/_pos.py | 64 ++++++++++++++++++++---------------- 2 files changed, 35 insertions(+), 31 deletions(-) diff --git a/piker/accounting/__init__.py b/piker/accounting/__init__.py index 1b776714..72b883df 100644 --- a/piker/accounting/__init__.py +++ b/piker/accounting/__init__.py @@ -33,7 +33,6 @@ from ._pos import ( Account, load_account, load_account_from_ledger, - open_pps, open_account, Position, ) @@ -68,7 +67,6 @@ __all__ = [ 'load_account_from_ledger', 'mk_allocator', 'open_account', - 'open_pps', 'open_trade_ledger', 'unpack_fqme', 'DerivTypes', diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index a172f74c..64c56ba1 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -356,13 +356,12 @@ class Position(Struct): ) -> bool: ''' Update clearing table by calculating the rolling ppu and - (accumulative) size in both the clears entry and local - attrs state. + (accumulative) size in both the clears entry and local attrs + state. Inserts are always done in datetime sorted order. ''' - # added: bool = False tid: str = t.tid if tid in self._events: log.debug( @@ -370,7 +369,7 @@ class Position(Struct): f'\n' f'{t}\n' ) - # return added + return False # TODO: apparently this IS possible with a dict but not # common and probably not that beneficial unless we're also @@ -451,6 +450,12 @@ class Position(Struct): # def suggest_split(self) -> float: # ... + # ?TODO, for sending rendered state over the wire? + # def summary(self) -> PositionSummary: + # do minimal conversion to a subset of fields + # currently defined in `.clearing._messages.BrokerdPosition` + + class Account(Struct): ''' @@ -494,9 +499,9 @@ class Account(Struct): def update_from_ledger( self, - ledger: TransactionLedger | dict[str, Transaction], + ledger: TransactionLedger|dict[str, Transaction], cost_scalar: float = 2, - symcache: SymbologyCache | None = None, + symcache: SymbologyCache|None = None, _mktmap_table: dict[str, MktPair] | None = None, @@ -749,7 +754,7 @@ class Account(Struct): # XXX WTF: if we use a tomlkit.Integer here we get this # super weird --1 thing going on for cumsize!?1! # NOTE: the fix was to always float() the size value loaded - # in open_pps() below! + # in open_account() below! config.write( config=self.conf, path=self.conf_path, @@ -933,7 +938,6 @@ def open_account( clears_table['dt'] = dt trans.append(Transaction( fqme=bs_mktid, - # sym=mkt, bs_mktid=bs_mktid, tid=tid, # XXX: not sure why sometimes these are loaded as @@ -956,11 +960,22 @@ def open_account( ): expiry: pendulum.DateTime = pendulum.parse(expiry) - pp = pp_objs[bs_mktid] = Position( - mkt, - split_ratio=split_ratio, - bs_mktid=bs_mktid, - ) + # !XXX, should never be duplicates over + # a backend-(broker)-system's unique market-IDs! + if pos := pp_objs.get(bs_mktid): + if mkt != pos.mkt: + log.warning( + f'Duplicated position but diff `MktPair.fqme` ??\n' + f'bs_mktid: {bs_mktid!r}\n' + f'pos.mkt: {pos.mkt}\n' + f'mkt: {mkt}\n' + ) + else: + pos = pp_objs[bs_mktid] = Position( + mkt, + split_ratio=split_ratio, + bs_mktid=bs_mktid, + ) # XXX: super critical, we need to be sure to include # all pps.toml clears to avoid reusing clears that were @@ -968,8 +983,13 @@ def open_account( # state, since today's records may have already been # processed! for t in trans: - pp.add_clear(t) - + added: bool = pos.add_clear(t) + if not added: + log.warning( + f'Txn already recorded in pp ??\n' + f'\n' + f'{t}\n' + ) try: yield acnt finally: @@ -977,20 +997,6 @@ def open_account( acnt.write_config() -# TODO: drop the old name and THIS! -@cm -def open_pps( - *args, - **kwargs, -) -> Generator[Account, None, None]: - log.warning( - '`open_pps()` is now deprecated!\n' - 'Please use `with open_account() as cnt:`' - ) - with open_account(*args, **kwargs) as acnt: - yield acnt - - def load_account_from_ledger( brokername: str, From 5b91b089639e1859016a749e48b035f08cab633e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 26 Sep 2025 17:38:22 -0400 Subject: [PATCH 03/11] Add an option `BrokerdPosition.bs_mktid` field Such that backends can deliver their own internal unique `MktPair.bs_mktid` when they can't seem to get it right via the `.fqme: str` export.. (COUGH ib, you piece of sh#$). Also add todo for possibly replacing the msg with a `Position.summary()` "snapshot" as a better and more rigorously generated wire-ready msg. --- piker/clearing/_messages.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index 788fe669..779be0ce 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -301,6 +301,9 @@ class BrokerdError(Struct): # TODO: yeah, so we REALLY need to completely deprecate # this and use the `.accounting.Position` msg-type instead.. +# -[ ] an alternative might be to add a `Position.summary() -> +# `PositionSummary`-msg that we generate since `Position` has a lot +# of fields by default we likely don't want to send over the wire? class BrokerdPosition(Struct): ''' Position update event from brokerd. @@ -313,3 +316,4 @@ class BrokerdPosition(Struct): avg_price: float currency: str = '' name: str = 'position' + bs_mktid: str|int|None = None From 388a9a4da7c38a02be93db4054f0a85494eca457 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 27 Sep 2025 11:55:35 -0400 Subject: [PATCH 04/11] ui.order_mode: prioritize mkt-match on `.bs_mktid` For backends which opt to set the new `BrokerdPosition.bs_mktid` field, give (matching logic) priority to it such that even if the `.symbol` field doesn't match the mkt currently focussed on chart, it will always match on a provider's own internal asset-mapping-id. The original fallback logic for `.fqme` matching is left as is. As an example with IB, a qqq.nasdaq.ib txn may have been filled on a non-primary venue as qqq.directedea.ib, in this case if the mkt is displayed and focused on chart we want the **entire position info** to be overlayed by the `OrderMode` UX without discrepancy. Other refinements, - improve logging and add a detailed edge-case-comment around the `.on_fill()` handler to clarify where if a benign 'error' msg is relayed from a backend it will cause the UI to operate as though the order **was not-cleared/cancelled** since the `.on_cancel()` handler will have likely been called just before, popping the `.dialogs` entry. Return `bool` to indicate whether the UI removed-lines / added-fill-arrows. - inverse the `return` branching logic in `.on_cancel()` to reduce indent. - add a very loud `log.error()` in `Status(resp='error')` case-block ensuring the console yells about the order being cancelled, also a todo for the weird msg-field recursion nonsense.. --- piker/ui/order_mode.py | 161 ++++++++++++++++++++++++++--------------- 1 file changed, 103 insertions(+), 58 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index ea6d498a..f1f0e62f 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -555,14 +555,13 @@ class OrderMode: def on_fill( self, - uuid: str, price: float, time_s: float, pointing: str | None = None, - ) -> None: + ) -> bool: ''' Fill msg handler. @@ -575,60 +574,83 @@ class OrderMode: - update fill bar size ''' - dialog = self.dialogs[uuid] + # XXX WARNING XXX + # if a `Status(resp='error')` arrives *before* this + # fill-status, the `.dialogs` entry may have already been + # popped and thus the below will skipped. + # + # NOTE, to avoid this confusing scenario ensure that any + # errors delivered thru from the broker-backend are not just + # "noisy reporting" (like is very common from IB..) and are + # instead ONLY errors-causing-order-dialog-cancellation! + if not (dialog := self.dialogs.get(uuid)): + log.warning( + f'Order was already cleared from `.dialogs` ??\n' + f'uuid: {uuid!r}\n' + ) + return False + lines = dialog.lines chart = self.chart - # XXX: seems to fail on certain types of races? - # assert len(lines) == 2 - if lines: - flume: Flume = self.feed.flumes[chart.linked.mkt.fqme] - _, _, ratio = flume.get_ds_info() - - for chart, shm in [ - (self.chart, flume.rt_shm), - (self.hist_chart, flume.hist_shm), - ]: - viz = chart.get_viz(chart.name) - index_field = viz.index_field - arr = shm.array - - # TODO: borked for int index based.. - index = flume.get_index(time_s, arr) - - # get absolute index for arrow placement - arrow_index = arr[index_field][index] - - self.arrows.add( - chart.plotItem, - uuid, - arrow_index, - price, - pointing=pointing, - color=lines[0].color - ) - else: + if not lines: log.warn("No line(s) for order {uuid}!?") + return False + + # update line state(s) + # + # ?XXX this fails on certain types of races? + # assert len(lines) == 2 + flume: Flume = self.feed.flumes[chart.linked.mkt.fqme] + _, _, ratio = flume.get_ds_info() + + for chart, shm in [ + (self.chart, flume.rt_shm), + (self.hist_chart, flume.hist_shm), + ]: + viz = chart.get_viz(chart.name) + index_field = viz.index_field + arr = shm.array + + # TODO: borked for int index based.. + index = flume.get_index(time_s, arr) + + # get absolute index for arrow placement + arrow_index = arr[index_field][index] + + self.arrows.add( + chart.plotItem, + uuid, + arrow_index, + price, + pointing=pointing, + color=lines[0].color + ) def on_cancel( self, - uuid: str + uuid: str, - ) -> None: + ) -> bool: - msg: Order = self.client._sent_orders.pop(uuid, None) - - if msg is not None: - self.lines.remove_line(uuid=uuid) - self.chart.linked.cursor.show_xhair() - - dialog = self.dialogs.pop(uuid, None) - if dialog: - dialog.last_status_close() - else: + msg: Order|None = self.client._sent_orders.pop(uuid, None) + if msg is None: log.warning( f'Received cancel for unsubmitted order {pformat(msg)}' ) + return False + + # remove GUI line, show cursor. + self.lines.remove_line(uuid=uuid) + self.chart.linked.cursor.show_xhair() + + # remove msg dialog (history) + dialog: Dialog|None = self.dialogs.pop(uuid, None) + if dialog: + dialog.last_status_close() + + return True + def cancel_orders_under_cursor(self) -> list[str]: return self.cancel_orders( @@ -1057,13 +1079,23 @@ async def process_trade_msg( if name in ( 'position', ): - sym: MktPair = mode.chart.linked.mkt + mkt: MktPair = mode.chart.linked.mkt pp_msg_symbol = msg['symbol'].lower() - fqme = sym.fqme - broker = sym.broker + pp_msg_bsmktid = msg['bs_mktid'] + fqme = mkt.fqme + broker = mkt.broker if ( + # match on any backed-specific(-unique)-ID first! + ( + pp_msg_bsmktid + and + mkt.bs_mktid == pp_msg_bsmktid + ) + or + # OW try against what's provided as an FQME.. pp_msg_symbol == fqme - or pp_msg_symbol == fqme.removesuffix(f'.{broker}') + or + pp_msg_symbol == fqme.removesuffix(f'.{broker}') ): log.info( f'Loading position for `{fqme}`:\n' @@ -1086,7 +1118,7 @@ async def process_trade_msg( return msg = Status(**msg) - resp = msg.resp + # resp: str = msg.resp oid = msg.oid dialog: Dialog = mode.dialogs.get(oid) @@ -1150,19 +1182,32 @@ async def process_trade_msg( mode.on_submit(oid) case Status(resp='error'): - - # do all the things for a cancel: - # - drop order-msg dialog from client table - # - delete level line from view - mode.on_cancel(oid) - # TODO: parse into broker-side msg, or should we # expect it to just be **that** msg verbatim (since # we'd presumably have only 1 `Error` msg-struct) broker_msg: dict = msg.brokerd_msg + + # XXX NOTE, this presumes the rxed "error" is + # order-dialog-cancel-causing, THUS backends much ONLY + # relay errors of this "severity"!! log.error( - f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' + f'Order errored ??\n' + f'oid: {oid!r}\n' + f'\n' + f'{pformat(broker_msg)}\n' + f'\n' + f'=> CANCELLING ORDER DIALOG <=\n' + + # from tractor.devx.pformat import ppfmt + # !TODO LOL, wtf the msg is causing + # a recursion bug! + # -[ ] get this shit on msgspec stat! + # f'{ppfmt(broker_msg)}' ) + # do all the things for a cancel: + # - drop order-msg dialog from client table + # - delete level line from view + mode.on_cancel(oid) case Status(resp='canceled'): # delete level line from view @@ -1178,10 +1223,10 @@ async def process_trade_msg( # TODO: UX for a "pending" clear/live order log.info(f'Dark order triggered for {fmtmsg}') + # TODO: do the struct-msg version, blah blah.. + # req=Order(exec_mode='live', action='alert') as req, case Status( resp='triggered', - # TODO: do the struct-msg version, blah blah.. - # req=Order(exec_mode='live', action='alert') as req, req={ 'exec_mode': 'live', 'action': 'alert', From 0e9b50de4b60863f481d0336b848834aa6194f40 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Sep 2025 13:21:11 -0400 Subject: [PATCH 05/11] `_ems`: tolerate and warn on already popped execs In the `translate_and_relay_brokerd_events()` loop task that is, such that we never crash on a `status_msg = book._active.pop(oid)` in the 'closed' status handler whenever a double removal happens. Turns out there were unforeseen races here when a benign backend error would cause an order-mode dialog to be cancelled (incorrectly) and then a UI side `.on_cancel()` would trigger too-early removal from the `book._active` table despite the backend sending an actual 'closed' event (much) later, this would crash on the now missing entry.. So instead we now, - obviously use `book._active.pop(oid, None)` - emit a `log.warning()` (not info lol) on a null-read and with a less "one-line-y" message explaining the double removal and maybe *why*. --- piker/clearing/_ems.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3794313f..96c01df8 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -388,6 +388,7 @@ async def open_brokerd_dialog( for ep_name in [ 'open_trade_dialog', # probably final name? 'trades_dialogue', # legacy + # ^!TODO, rm this since all backends ported no ?!? ]: trades_endpoint = getattr( brokermod, @@ -1027,8 +1028,18 @@ async def translate_and_relay_brokerd_events( ) if status == 'closed': - log.info(f'Execution for {oid} is complete!') - status_msg = book._active.pop(oid) + log.info( + f'Execution is complete!\n' + f'oid: {oid!r}\n' + ) + status_msg = book._active.pop(oid, None) + if status_msg is None: + log.warning( + f'Order was already cleared from book ??\n' + f'oid: {oid!r}\n' + f'\n' + f'Maybe the order cancelled before submitted ??\n' + ) elif status == 'canceled': log.cancel(f'Cancellation for {oid} is complete!') @@ -1552,19 +1563,18 @@ async def maybe_open_trade_relays( @tractor.context async def _emsd_main( - ctx: tractor.Context, + ctx: tractor.Context, # becomes `ems_ctx` below fqme: str, exec_mode: str, # ('paper', 'live') loglevel: str|None = None, -) -> tuple[ - dict[ - # brokername, acctid - tuple[str, str], +) -> tuple[ # `ctx.started()` value! + dict[ # positions + tuple[str, str], # brokername, acctid list[BrokerdPosition], ], - list[str], - dict[str, Status], + list[str], # accounts + dict[str, Status], # dialogs ]: ''' EMS (sub)actor entrypoint providing the execution management From c609858f202ab024a2920d056f9fc81a280091de Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Sep 2025 13:26:11 -0400 Subject: [PATCH 06/11] `ui._remote_ctl`: shield remote rect removals Since under `trio`-cancellation the `.remove()` is a checkpoint and will be masked by a taskc AND we **always want to remove the rect** despite the surrounding teardown conditions. --- piker/ui/_remote_ctl.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/piker/ui/_remote_ctl.py b/piker/ui/_remote_ctl.py index ccc8d6e9..05e145e7 100644 --- a/piker/ui/_remote_ctl.py +++ b/piker/ui/_remote_ctl.py @@ -15,8 +15,8 @@ # along with this program. If not, see . ''' -Remote control tasks for sending annotations (and maybe more cmds) -to a chart from some other actor. +Remote control tasks for sending annotations (and maybe more cmds) to +a chart from some other actor. ''' from __future__ import annotations @@ -32,6 +32,7 @@ from typing import ( ) import tractor +import trio from tractor import trionics from tractor import ( Portal, @@ -316,7 +317,9 @@ class AnnotCtl(Struct): ) yield aid finally: - await self.remove(aid) + # async ipc send op + with trio.CancelScope(shield=True): + await self.remove(aid) async def redraw( self, From 1a4f8fa76f38dfde3eceb47d6366b6ce7fcc61d0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Sep 2025 13:33:03 -0400 Subject: [PATCH 07/11] Drop `open_pps()` from ems tests --- tests/test_ems.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_ems.py b/tests/test_ems.py index 4a9e4a4c..07e28c33 100644 --- a/tests/test_ems.py +++ b/tests/test_ems.py @@ -42,7 +42,7 @@ from piker.accounting import ( unpack_fqme, ) from piker.accounting import ( - open_pps, + open_account, Position, ) @@ -136,7 +136,7 @@ def load_and_check_pos( ) -> None: - with open_pps(ppmsg.broker, ppmsg.account) as table: + with open_account(ppmsg.broker, ppmsg.account) as table: if ppmsg.size == 0: assert ppmsg.symbol not in table.pps From f5850fe5c2043c8ec6ca66899e43c3f4a0d2be96 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Sep 2025 15:02:50 -0400 Subject: [PATCH 08/11] Draft a gt-one-`.fqme`-in-txns/account-file test To start this is just a shell for the test, there's no checking logic yet.. put it as `test_accounting.test_ib_account_with_duplicated_mktids()`. The test is composed for now to be completely runtime-free using only the offline txn-ledger / symcache / account loading APIs, ideally we fill in the activated symbology-data-runtime cases once we figure a sane way to handle incremental symcache updates for backends like IB.. To actually fill the test out with real checks we still need to, - extract the problem account file from my ib.algopape into the test harness data. - pick some contracts with multiple fqmes despite a single bs_mktid and ensure they're aggregated as a single `Position` as well as, * ideally de-duplicating txns from the account file section for the mkt.. * warning appropriately about greater-then-one fqme for the bs_mktid and providing a way for the ledger re-writing to choose the appropriate `` as the "primary" when the data-symbology-runtime is up and possibly use it to incrementally update the IB symcache and store offline for next use? --- tests/test_accounting.py | 100 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 96 insertions(+), 4 deletions(-) diff --git a/tests/test_accounting.py b/tests/test_accounting.py index 952a9229..1e22c91d 100644 --- a/tests/test_accounting.py +++ b/tests/test_accounting.py @@ -12,12 +12,14 @@ from piker import config from piker.accounting import ( Account, calc, - Position, - TransactionLedger, - open_trade_ledger, + open_account, load_account, load_account_from_ledger, + open_trade_ledger, + Position, + TransactionLedger, ) +import tractor def test_root_conf_networking_section( @@ -53,12 +55,17 @@ def test_account_file_default_empty( ) def test_paper_ledger_position_calcs( fq_acnt: tuple[str, str], + debug_mode: bool, ): broker: str acnt_name: str broker, acnt_name = fq_acnt - accounts_path: Path = config.repodir() / 'tests' / '_inputs' + accounts_path: Path = ( + config.repodir() + / 'tests' + / '_inputs' # tests-local-subdir + ) ldr: TransactionLedger with ( @@ -77,6 +84,7 @@ def test_paper_ledger_position_calcs( ledger=ldr, _fp=accounts_path, + debug_mode=debug_mode, ) as (dfs, ledger), @@ -102,3 +110,87 @@ def test_paper_ledger_position_calcs( df = dfs[xrp] assert df['cumsize'][-1] == 0 assert pos.cumsize == 0 + + + +@pytest.mark.parametrize( + 'fq_acnt', + [ + ('ib', 'algopaper'), + ], +) +def test_ib_account_with_duplicated_mktids( + fq_acnt: tuple[str, str], + debug_mode: bool, +): + # ?TODO, once we start symcache-incremental-update-support? + # from piker.data import ( + # open_symcache, + # ) + # + # async def main(): + # async with ( + # # TODO: do this as part of `open_account()`!? + # open_symcache( + # 'ib', + # only_from_memcache=True, + # ) as symcache, + # ): + + + from piker.brokers.ib.ledger import ( + tx_sort, + + # ?TODO, once we want to pull lowlevel txns and process them? + # norm_trade_records, + # update_ledger_from_api_trades, + ) + + broker: str + acnt_id: str = 'algopaper' + broker, acnt_id = fq_acnt + accounts_def = config.load_accounts([broker]) + assert accounts_def[f'{broker}.{acnt_id}'] + + ledger: TransactionLedger + acnt: Account + with ( + tractor.devx.maybe_open_crash_handler(pdb=debug_mode), + + open_trade_ledger( + 'ib', + acnt_id, + tx_sort=tx_sort, + + # TODO, eventually incrementally updated for IB.. + # symcache=symcache, + symcache=None, + allow_from_sync_code=True, + + ) as ledger, + + open_account( + 'ib', + acnt_id, + write_on_exit=True, + ) as acnt, + ): + # per input params + symcache = ledger.symcache + assert not ( + symcache.pairs + or + symcache.pairs + or + symcache.mktmaps + ) + # re-compute all positions that have changed state. + # TODO: likely we should change the API to return the + # position updates from `.update_from_ledger()`? + active, closed = acnt.dump_active() + + # breakpoint() + + # TODO, (see above imports as well) incremental update from + # (updated) ledger? + # -[ ] pull some code from `.ib.broker` content. From 90389d0b94d38800cfb89ef4089c7ac1ed42a6b3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 29 Sep 2025 15:14:35 -0400 Subject: [PATCH 09/11] `accouning.calc`: enable crash handlers on `debug_mode` input (via test harness) --- piker/accounting/calc.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/piker/accounting/calc.py b/piker/accounting/calc.py index 64765e76..e9190228 100644 --- a/piker/accounting/calc.py +++ b/piker/accounting/calc.py @@ -32,6 +32,7 @@ from typing import ( TYPE_CHECKING, ) +from tractor.devx import maybe_open_crash_handler import polars as pl from pendulum import ( DateTime, @@ -396,6 +397,7 @@ def open_ledger_dfs( acctname: str, ledger: TransactionLedger | None = None, + debug_mode: bool = False, **kwargs, @@ -410,8 +412,7 @@ def open_ledger_dfs( can update the ledger on exit. ''' - from piker.toolz import open_crash_handler - with open_crash_handler(): + with maybe_open_crash_handler(pdb=debug_mode): if not ledger: import time from ._ledger import open_trade_ledger From 58654915ac68c3509af1042efc66cfdcc8c0a3a3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 26 Sep 2025 17:44:06 -0400 Subject: [PATCH 10/11] Set `.bs_mktid` on all IB position-msg emissions.. --- piker/brokers/ib/broker.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 5065d678..1e9d9c1b 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -362,6 +362,10 @@ async def update_and_audit_pos_msg( size=ibpos.position, avg_price=pikerpos.ppu, + + # XXX ensures matching even if multiple venue-names + # in `.bs_fqme`, likely from txn records.. + bs_mktid=mkt.bs_mktid, ) ibfmtmsg: str = pformat(ibpos._asdict()) @@ -430,7 +434,8 @@ async def aggr_open_orders( ) -> None: ''' - Collect all open orders from client and fill in `order_msgs: list`. + Collect all open orders from client and fill in `order_msgs: + list`. ''' trades: list[Trade] = client.ib.openTrades() From 7b68444c7ac9a33909ce4317234c9adf8d7c7870 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 2 Oct 2025 12:17:57 -0400 Subject: [PATCH 11/11] accounting.calc: `.error()` on bad txn-time fields.. Since i'm seeing IB records with a `None` value and i don't want to be debugging every time order-mode boots up.. Also use `pdb=debug` in `.open_ledger_dfs()` Note, this had conflicts on `piker/accounting/calc.py` when rebasing onto the refactored `brokers_refinery` history which were resolved manually! --- piker/accounting/calc.py | 34 +++++++++++++++++++++++----------- 1 file changed, 23 insertions(+), 11 deletions(-) diff --git a/piker/accounting/calc.py b/piker/accounting/calc.py index e9190228..12a65c86 100644 --- a/piker/accounting/calc.py +++ b/piker/accounting/calc.py @@ -294,7 +294,11 @@ def iter_by_dt( ) continue - # XXX: should never get here.. + # XXX: we should never really get here bc it means some kinda + # bad txn-record (field) data.. + # + # -> set the `debug_mode = True` if you want to trace such + # cases from REPL ;) else: # XXX: we should really never get here.. # only if a ledger record has no expected sort(able) @@ -304,16 +308,21 @@ def iter_by_dt( 'No (time) sortable field for TXN:\n' f'{tx!r}\n' ) + report: str = ( + f'No supported time-field found in txn !?\n' + f'\n' + f'supported-time-fields: {parsers!r}\n' + f'\n' + f'txn: {tx!r}\n' + ) if debug: - import tractor - with tractor.devx.maybe_open_crash_handler(): - raise ValueError( - f'No supported time-field found in txn !?\n' - f'\n' - f'supported-time-fields: {parsers!r}\n' - f'\n' - f'txn: {tx!r}\n' - ) + with maybe_open_crash_handler( + pdb=debug, + raise_on_exit=False, + ): + raise ValueError(report) + else: + log.error(report) if _invalid is not None: _invalid.append(tx) @@ -412,7 +421,10 @@ def open_ledger_dfs( can update the ledger on exit. ''' - with maybe_open_crash_handler(pdb=debug_mode): + with maybe_open_crash_handler( + pdb=debug_mode, + # raise_on_exit=False, + ): if not ledger: import time from ._ledger import open_trade_ledger