diff --git a/piker/accounting/__init__.py b/piker/accounting/__init__.py index 1b776714..72b883df 100644 --- a/piker/accounting/__init__.py +++ b/piker/accounting/__init__.py @@ -33,7 +33,6 @@ from ._pos import ( Account, load_account, load_account_from_ledger, - open_pps, open_account, Position, ) @@ -68,7 +67,6 @@ __all__ = [ 'load_account_from_ledger', 'mk_allocator', 'open_account', - 'open_pps', 'open_trade_ledger', 'unpack_fqme', 'DerivTypes', diff --git a/piker/accounting/_pos.py b/piker/accounting/_pos.py index a172f74c..64c56ba1 100644 --- a/piker/accounting/_pos.py +++ b/piker/accounting/_pos.py @@ -356,13 +356,12 @@ class Position(Struct): ) -> bool: ''' Update clearing table by calculating the rolling ppu and - (accumulative) size in both the clears entry and local - attrs state. + (accumulative) size in both the clears entry and local attrs + state. Inserts are always done in datetime sorted order. ''' - # added: bool = False tid: str = t.tid if tid in self._events: log.debug( @@ -370,7 +369,7 @@ class Position(Struct): f'\n' f'{t}\n' ) - # return added + return False # TODO: apparently this IS possible with a dict but not # common and probably not that beneficial unless we're also @@ -451,6 +450,12 @@ class Position(Struct): # def suggest_split(self) -> float: # ... + # ?TODO, for sending rendered state over the wire? + # def summary(self) -> PositionSummary: + # do minimal conversion to a subset of fields + # currently defined in `.clearing._messages.BrokerdPosition` + + class Account(Struct): ''' @@ -494,9 +499,9 @@ class Account(Struct): def update_from_ledger( self, - ledger: TransactionLedger | dict[str, Transaction], + ledger: TransactionLedger|dict[str, Transaction], cost_scalar: float = 2, - symcache: SymbologyCache | None = None, + symcache: SymbologyCache|None = None, _mktmap_table: dict[str, MktPair] | None = None, @@ -749,7 +754,7 @@ class Account(Struct): # XXX WTF: if we use a tomlkit.Integer here we get this # super weird --1 thing going on for cumsize!?1! # NOTE: the fix was to always float() the size value loaded - # in open_pps() below! + # in open_account() below! config.write( config=self.conf, path=self.conf_path, @@ -933,7 +938,6 @@ def open_account( clears_table['dt'] = dt trans.append(Transaction( fqme=bs_mktid, - # sym=mkt, bs_mktid=bs_mktid, tid=tid, # XXX: not sure why sometimes these are loaded as @@ -956,11 +960,22 @@ def open_account( ): expiry: pendulum.DateTime = pendulum.parse(expiry) - pp = pp_objs[bs_mktid] = Position( - mkt, - split_ratio=split_ratio, - bs_mktid=bs_mktid, - ) + # !XXX, should never be duplicates over + # a backend-(broker)-system's unique market-IDs! + if pos := pp_objs.get(bs_mktid): + if mkt != pos.mkt: + log.warning( + f'Duplicated position but diff `MktPair.fqme` ??\n' + f'bs_mktid: {bs_mktid!r}\n' + f'pos.mkt: {pos.mkt}\n' + f'mkt: {mkt}\n' + ) + else: + pos = pp_objs[bs_mktid] = Position( + mkt, + split_ratio=split_ratio, + bs_mktid=bs_mktid, + ) # XXX: super critical, we need to be sure to include # all pps.toml clears to avoid reusing clears that were @@ -968,8 +983,13 @@ def open_account( # state, since today's records may have already been # processed! for t in trans: - pp.add_clear(t) - + added: bool = pos.add_clear(t) + if not added: + log.warning( + f'Txn already recorded in pp ??\n' + f'\n' + f'{t}\n' + ) try: yield acnt finally: @@ -977,20 +997,6 @@ def open_account( acnt.write_config() -# TODO: drop the old name and THIS! -@cm -def open_pps( - *args, - **kwargs, -) -> Generator[Account, None, None]: - log.warning( - '`open_pps()` is now deprecated!\n' - 'Please use `with open_account() as cnt:`' - ) - with open_account(*args, **kwargs) as acnt: - yield acnt - - def load_account_from_ledger( brokername: str, diff --git a/piker/accounting/calc.py b/piker/accounting/calc.py index ff12c055..12a65c86 100644 --- a/piker/accounting/calc.py +++ b/piker/accounting/calc.py @@ -22,7 +22,9 @@ you know when you're losing money (if possible) XD from __future__ import annotations from collections.abc import ValuesView from contextlib import contextmanager as cm +from functools import partial from math import copysign +from pprint import pformat from typing import ( Any, Callable, @@ -30,6 +32,7 @@ from typing import ( TYPE_CHECKING, ) +from tractor.devx import maybe_open_crash_handler import polars as pl from pendulum import ( DateTime, @@ -37,12 +40,16 @@ from pendulum import ( parse, ) +from ..log import get_logger + if TYPE_CHECKING: from ._ledger import ( Transaction, TransactionLedger, ) +log = get_logger(__name__) + def ppu( clears: Iterator[Transaction], @@ -238,6 +245,9 @@ def iter_by_dt( def dyn_parse_to_dt( tx: tuple[str, dict[str, Any]] | Transaction, + + debug: bool = False, + _invalid: list|None = None, ) -> DateTime: # handle `.items()` inputs @@ -250,52 +260,90 @@ def iter_by_dt( # get best parser for this record.. for k in parsers: if ( - isdict and k in tx + (v := getattr(tx, k, None)) or - getattr(tx, k, None) + ( + isdict + and + (v := tx.get(k)) + ) ): - v = ( - tx[k] if isdict - else tx.dt - ) - assert v is not None, ( - f'No valid value for `{k}`!?' - ) - # only call parser on the value if not None from # the `parsers` table above (when NOT using # `.get()`), otherwise pass through the value and # sort on it directly if ( not isinstance(v, DateTime) - and (parser := parsers.get(k)) + and + (parser := parsers.get(k)) ): - return parser(v) + ret = parser(v) else: - return v + ret = v + return ret + + else: + log.debug( + f'Parser-field not found in txn\n' + f'\n' + f'parser-field: {k!r}\n' + f'txn: {tx!r}\n' + f'\n' + f'Trying next..\n' + ) + continue + + # XXX: we should never really get here bc it means some kinda + # bad txn-record (field) data.. + # + # -> set the `debug_mode = True` if you want to trace such + # cases from REPL ;) else: - # TODO: move to top? - from piker.log import get_logger - log = get_logger(__name__) - # XXX: we should really never get here.. # only if a ledger record has no expected sort(able) # field will we likely hit this.. like with ze IB. # if no sortable field just deliver epoch? log.warning( 'No (time) sortable field for TXN:\n' - f'{tx}\n' + f'{tx!r}\n' ) - return from_timestamp(0) - # breakpoint() + report: str = ( + f'No supported time-field found in txn !?\n' + f'\n' + f'supported-time-fields: {parsers!r}\n' + f'\n' + f'txn: {tx!r}\n' + ) + if debug: + with maybe_open_crash_handler( + pdb=debug, + raise_on_exit=False, + ): + raise ValueError(report) + else: + log.error(report) + if _invalid is not None: + _invalid.append(tx) + return from_timestamp(0.) - entry: tuple[str, dict] | Transaction + entry: tuple[str, dict]|Transaction + invalid: list = [] for entry in sorted( records, - key=key or dyn_parse_to_dt, + key=key or partial( + dyn_parse_to_dt, + _invalid=invalid, + ), ): + if entry in invalid: + log.warning( + f'Ignoring txn w invalid timestamp ??\n' + f'{pformat(entry)}\n' + ) + continue + # NOTE the type sig above; either pairs or txns B) yield entry @@ -358,6 +406,7 @@ def open_ledger_dfs( acctname: str, ledger: TransactionLedger | None = None, + debug_mode: bool = False, **kwargs, @@ -372,8 +421,10 @@ def open_ledger_dfs( can update the ledger on exit. ''' - from piker.toolz import open_crash_handler - with open_crash_handler(): + with maybe_open_crash_handler( + pdb=debug_mode, + # raise_on_exit=False, + ): if not ledger: import time from ._ledger import open_trade_ledger diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index 5065d678..1e9d9c1b 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -362,6 +362,10 @@ async def update_and_audit_pos_msg( size=ibpos.position, avg_price=pikerpos.ppu, + + # XXX ensures matching even if multiple venue-names + # in `.bs_fqme`, likely from txn records.. + bs_mktid=mkt.bs_mktid, ) ibfmtmsg: str = pformat(ibpos._asdict()) @@ -430,7 +434,8 @@ async def aggr_open_orders( ) -> None: ''' - Collect all open orders from client and fill in `order_msgs: list`. + Collect all open orders from client and fill in `order_msgs: + list`. ''' trades: list[Trade] = client.ib.openTrades() diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 3794313f..96c01df8 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -388,6 +388,7 @@ async def open_brokerd_dialog( for ep_name in [ 'open_trade_dialog', # probably final name? 'trades_dialogue', # legacy + # ^!TODO, rm this since all backends ported no ?!? ]: trades_endpoint = getattr( brokermod, @@ -1027,8 +1028,18 @@ async def translate_and_relay_brokerd_events( ) if status == 'closed': - log.info(f'Execution for {oid} is complete!') - status_msg = book._active.pop(oid) + log.info( + f'Execution is complete!\n' + f'oid: {oid!r}\n' + ) + status_msg = book._active.pop(oid, None) + if status_msg is None: + log.warning( + f'Order was already cleared from book ??\n' + f'oid: {oid!r}\n' + f'\n' + f'Maybe the order cancelled before submitted ??\n' + ) elif status == 'canceled': log.cancel(f'Cancellation for {oid} is complete!') @@ -1552,19 +1563,18 @@ async def maybe_open_trade_relays( @tractor.context async def _emsd_main( - ctx: tractor.Context, + ctx: tractor.Context, # becomes `ems_ctx` below fqme: str, exec_mode: str, # ('paper', 'live') loglevel: str|None = None, -) -> tuple[ - dict[ - # brokername, acctid - tuple[str, str], +) -> tuple[ # `ctx.started()` value! + dict[ # positions + tuple[str, str], # brokername, acctid list[BrokerdPosition], ], - list[str], - dict[str, Status], + list[str], # accounts + dict[str, Status], # dialogs ]: ''' EMS (sub)actor entrypoint providing the execution management diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index 788fe669..779be0ce 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -301,6 +301,9 @@ class BrokerdError(Struct): # TODO: yeah, so we REALLY need to completely deprecate # this and use the `.accounting.Position` msg-type instead.. +# -[ ] an alternative might be to add a `Position.summary() -> +# `PositionSummary`-msg that we generate since `Position` has a lot +# of fields by default we likely don't want to send over the wire? class BrokerdPosition(Struct): ''' Position update event from brokerd. @@ -313,3 +316,4 @@ class BrokerdPosition(Struct): avg_price: float currency: str = '' name: str = 'position' + bs_mktid: str|int|None = None diff --git a/piker/ui/_remote_ctl.py b/piker/ui/_remote_ctl.py index ccc8d6e9..05e145e7 100644 --- a/piker/ui/_remote_ctl.py +++ b/piker/ui/_remote_ctl.py @@ -15,8 +15,8 @@ # along with this program. If not, see . ''' -Remote control tasks for sending annotations (and maybe more cmds) -to a chart from some other actor. +Remote control tasks for sending annotations (and maybe more cmds) to +a chart from some other actor. ''' from __future__ import annotations @@ -32,6 +32,7 @@ from typing import ( ) import tractor +import trio from tractor import trionics from tractor import ( Portal, @@ -316,7 +317,9 @@ class AnnotCtl(Struct): ) yield aid finally: - await self.remove(aid) + # async ipc send op + with trio.CancelScope(shield=True): + await self.remove(aid) async def redraw( self, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index ea6d498a..f1f0e62f 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -555,14 +555,13 @@ class OrderMode: def on_fill( self, - uuid: str, price: float, time_s: float, pointing: str | None = None, - ) -> None: + ) -> bool: ''' Fill msg handler. @@ -575,60 +574,83 @@ class OrderMode: - update fill bar size ''' - dialog = self.dialogs[uuid] + # XXX WARNING XXX + # if a `Status(resp='error')` arrives *before* this + # fill-status, the `.dialogs` entry may have already been + # popped and thus the below will skipped. + # + # NOTE, to avoid this confusing scenario ensure that any + # errors delivered thru from the broker-backend are not just + # "noisy reporting" (like is very common from IB..) and are + # instead ONLY errors-causing-order-dialog-cancellation! + if not (dialog := self.dialogs.get(uuid)): + log.warning( + f'Order was already cleared from `.dialogs` ??\n' + f'uuid: {uuid!r}\n' + ) + return False + lines = dialog.lines chart = self.chart - # XXX: seems to fail on certain types of races? - # assert len(lines) == 2 - if lines: - flume: Flume = self.feed.flumes[chart.linked.mkt.fqme] - _, _, ratio = flume.get_ds_info() - - for chart, shm in [ - (self.chart, flume.rt_shm), - (self.hist_chart, flume.hist_shm), - ]: - viz = chart.get_viz(chart.name) - index_field = viz.index_field - arr = shm.array - - # TODO: borked for int index based.. - index = flume.get_index(time_s, arr) - - # get absolute index for arrow placement - arrow_index = arr[index_field][index] - - self.arrows.add( - chart.plotItem, - uuid, - arrow_index, - price, - pointing=pointing, - color=lines[0].color - ) - else: + if not lines: log.warn("No line(s) for order {uuid}!?") + return False + + # update line state(s) + # + # ?XXX this fails on certain types of races? + # assert len(lines) == 2 + flume: Flume = self.feed.flumes[chart.linked.mkt.fqme] + _, _, ratio = flume.get_ds_info() + + for chart, shm in [ + (self.chart, flume.rt_shm), + (self.hist_chart, flume.hist_shm), + ]: + viz = chart.get_viz(chart.name) + index_field = viz.index_field + arr = shm.array + + # TODO: borked for int index based.. + index = flume.get_index(time_s, arr) + + # get absolute index for arrow placement + arrow_index = arr[index_field][index] + + self.arrows.add( + chart.plotItem, + uuid, + arrow_index, + price, + pointing=pointing, + color=lines[0].color + ) def on_cancel( self, - uuid: str + uuid: str, - ) -> None: + ) -> bool: - msg: Order = self.client._sent_orders.pop(uuid, None) - - if msg is not None: - self.lines.remove_line(uuid=uuid) - self.chart.linked.cursor.show_xhair() - - dialog = self.dialogs.pop(uuid, None) - if dialog: - dialog.last_status_close() - else: + msg: Order|None = self.client._sent_orders.pop(uuid, None) + if msg is None: log.warning( f'Received cancel for unsubmitted order {pformat(msg)}' ) + return False + + # remove GUI line, show cursor. + self.lines.remove_line(uuid=uuid) + self.chart.linked.cursor.show_xhair() + + # remove msg dialog (history) + dialog: Dialog|None = self.dialogs.pop(uuid, None) + if dialog: + dialog.last_status_close() + + return True + def cancel_orders_under_cursor(self) -> list[str]: return self.cancel_orders( @@ -1057,13 +1079,23 @@ async def process_trade_msg( if name in ( 'position', ): - sym: MktPair = mode.chart.linked.mkt + mkt: MktPair = mode.chart.linked.mkt pp_msg_symbol = msg['symbol'].lower() - fqme = sym.fqme - broker = sym.broker + pp_msg_bsmktid = msg['bs_mktid'] + fqme = mkt.fqme + broker = mkt.broker if ( + # match on any backed-specific(-unique)-ID first! + ( + pp_msg_bsmktid + and + mkt.bs_mktid == pp_msg_bsmktid + ) + or + # OW try against what's provided as an FQME.. pp_msg_symbol == fqme - or pp_msg_symbol == fqme.removesuffix(f'.{broker}') + or + pp_msg_symbol == fqme.removesuffix(f'.{broker}') ): log.info( f'Loading position for `{fqme}`:\n' @@ -1086,7 +1118,7 @@ async def process_trade_msg( return msg = Status(**msg) - resp = msg.resp + # resp: str = msg.resp oid = msg.oid dialog: Dialog = mode.dialogs.get(oid) @@ -1150,19 +1182,32 @@ async def process_trade_msg( mode.on_submit(oid) case Status(resp='error'): - - # do all the things for a cancel: - # - drop order-msg dialog from client table - # - delete level line from view - mode.on_cancel(oid) - # TODO: parse into broker-side msg, or should we # expect it to just be **that** msg verbatim (since # we'd presumably have only 1 `Error` msg-struct) broker_msg: dict = msg.brokerd_msg + + # XXX NOTE, this presumes the rxed "error" is + # order-dialog-cancel-causing, THUS backends much ONLY + # relay errors of this "severity"!! log.error( - f'Order {oid}->{resp} with:\n{pformat(broker_msg)}' + f'Order errored ??\n' + f'oid: {oid!r}\n' + f'\n' + f'{pformat(broker_msg)}\n' + f'\n' + f'=> CANCELLING ORDER DIALOG <=\n' + + # from tractor.devx.pformat import ppfmt + # !TODO LOL, wtf the msg is causing + # a recursion bug! + # -[ ] get this shit on msgspec stat! + # f'{ppfmt(broker_msg)}' ) + # do all the things for a cancel: + # - drop order-msg dialog from client table + # - delete level line from view + mode.on_cancel(oid) case Status(resp='canceled'): # delete level line from view @@ -1178,10 +1223,10 @@ async def process_trade_msg( # TODO: UX for a "pending" clear/live order log.info(f'Dark order triggered for {fmtmsg}') + # TODO: do the struct-msg version, blah blah.. + # req=Order(exec_mode='live', action='alert') as req, case Status( resp='triggered', - # TODO: do the struct-msg version, blah blah.. - # req=Order(exec_mode='live', action='alert') as req, req={ 'exec_mode': 'live', 'action': 'alert', diff --git a/tests/test_accounting.py b/tests/test_accounting.py index 952a9229..1e22c91d 100644 --- a/tests/test_accounting.py +++ b/tests/test_accounting.py @@ -12,12 +12,14 @@ from piker import config from piker.accounting import ( Account, calc, - Position, - TransactionLedger, - open_trade_ledger, + open_account, load_account, load_account_from_ledger, + open_trade_ledger, + Position, + TransactionLedger, ) +import tractor def test_root_conf_networking_section( @@ -53,12 +55,17 @@ def test_account_file_default_empty( ) def test_paper_ledger_position_calcs( fq_acnt: tuple[str, str], + debug_mode: bool, ): broker: str acnt_name: str broker, acnt_name = fq_acnt - accounts_path: Path = config.repodir() / 'tests' / '_inputs' + accounts_path: Path = ( + config.repodir() + / 'tests' + / '_inputs' # tests-local-subdir + ) ldr: TransactionLedger with ( @@ -77,6 +84,7 @@ def test_paper_ledger_position_calcs( ledger=ldr, _fp=accounts_path, + debug_mode=debug_mode, ) as (dfs, ledger), @@ -102,3 +110,87 @@ def test_paper_ledger_position_calcs( df = dfs[xrp] assert df['cumsize'][-1] == 0 assert pos.cumsize == 0 + + + +@pytest.mark.parametrize( + 'fq_acnt', + [ + ('ib', 'algopaper'), + ], +) +def test_ib_account_with_duplicated_mktids( + fq_acnt: tuple[str, str], + debug_mode: bool, +): + # ?TODO, once we start symcache-incremental-update-support? + # from piker.data import ( + # open_symcache, + # ) + # + # async def main(): + # async with ( + # # TODO: do this as part of `open_account()`!? + # open_symcache( + # 'ib', + # only_from_memcache=True, + # ) as symcache, + # ): + + + from piker.brokers.ib.ledger import ( + tx_sort, + + # ?TODO, once we want to pull lowlevel txns and process them? + # norm_trade_records, + # update_ledger_from_api_trades, + ) + + broker: str + acnt_id: str = 'algopaper' + broker, acnt_id = fq_acnt + accounts_def = config.load_accounts([broker]) + assert accounts_def[f'{broker}.{acnt_id}'] + + ledger: TransactionLedger + acnt: Account + with ( + tractor.devx.maybe_open_crash_handler(pdb=debug_mode), + + open_trade_ledger( + 'ib', + acnt_id, + tx_sort=tx_sort, + + # TODO, eventually incrementally updated for IB.. + # symcache=symcache, + symcache=None, + allow_from_sync_code=True, + + ) as ledger, + + open_account( + 'ib', + acnt_id, + write_on_exit=True, + ) as acnt, + ): + # per input params + symcache = ledger.symcache + assert not ( + symcache.pairs + or + symcache.pairs + or + symcache.mktmaps + ) + # re-compute all positions that have changed state. + # TODO: likely we should change the API to return the + # position updates from `.update_from_ledger()`? + active, closed = acnt.dump_active() + + # breakpoint() + + # TODO, (see above imports as well) incremental update from + # (updated) ledger? + # -[ ] pull some code from `.ib.broker` content. diff --git a/tests/test_ems.py b/tests/test_ems.py index 4a9e4a4c..07e28c33 100644 --- a/tests/test_ems.py +++ b/tests/test_ems.py @@ -42,7 +42,7 @@ from piker.accounting import ( unpack_fqme, ) from piker.accounting import ( - open_pps, + open_account, Position, ) @@ -136,7 +136,7 @@ def load_and_check_pos( ) -> None: - with open_pps(ppmsg.broker, ppmsg.account) as table: + with open_account(ppmsg.broker, ppmsg.account) as table: if ppmsg.size == 0: assert ppmsg.symbol not in table.pps