From 15025d6047ede75e640d490998b168578eb333ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Sep 2021 21:26:28 -0400 Subject: [PATCH 01/39] Move config module to top level --- piker/brokers/ib.py | 2 +- piker/brokers/questrade.py | 2 +- piker/cli/__init__.py | 3 ++- piker/{brokers => }/config.py | 2 +- piker/ui/_forms.py | 14 +++++++------- 5 files changed, 12 insertions(+), 11 deletions(-) rename piker/{brokers => }/config.py (99%) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 121ad428..cf646f07 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -53,7 +53,7 @@ from ib_insync.client import Client as ib_Client from fuzzywuzzy import process as fuzzy import numpy as np -from . import config +from .. import config from ..log import get_logger, get_console_log from .._daemon import maybe_spawn_brokerd from ..data._source import from_df diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 7a06ce76..30ba049e 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -43,7 +43,7 @@ import asks from ..calc import humanize, percent_change from .._cacheables import open_cached_client, async_lifo_cache -from . import config +from .. import config from ._util import resproc, BrokerError, SymbolNotFound from ..log import get_logger, colorize_json, get_console_log from . import get_brokermod diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index e5e9a2d1..22022e84 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -8,8 +8,9 @@ import trio import tractor from ..log import get_console_log, get_logger, colorize_json -from ..brokers import get_brokermod, config +from ..brokers import get_brokermod from .._daemon import _tractor_kwargs +from .. import config log = get_logger('cli') diff --git a/piker/brokers/config.py b/piker/config.py similarity index 99% rename from piker/brokers/config.py rename to piker/config.py index 1fbd8ce1..e979a354 100644 --- a/piker/brokers/config.py +++ b/piker/config.py @@ -25,7 +25,7 @@ from typing import Optional import toml import click -from ..log import get_logger +from .log import get_logger log = get_logger('broker-config') diff --git a/piker/ui/_forms.py b/piker/ui/_forms.py index b504a408..966192e6 100644 --- a/piker/ui/_forms.py +++ b/piker/ui/_forms.py @@ -47,7 +47,7 @@ from PyQt5.QtWidgets import ( from ._event import open_handlers from ._style import hcolor, _font, _font_small, DpiAwareFont from ._label import FormatLabel -from .. import brokers +from .. import config class FontAndChartAwareLineEdit(QLineEdit): @@ -382,21 +382,21 @@ def mk_form( form._font_size = font_size or _font_small.px_size # generate sub-components from schema dict - for key, config in fields_schema.items(): - wtype = config['type'] - label = str(config.get('label', key)) + for key, conf in fields_schema.items(): + wtype = conf['type'] + label = str(conf.get('label', key)) # plain (line) edit field if wtype == 'edit': w = form.add_edit_field( key, label, - config['default_value'] + conf['default_value'] ) # drop-down selection elif wtype == 'select': - values = list(config['default_value']) + values = list(conf['default_value']) w = form.add_select_field( key, label, @@ -635,7 +635,7 @@ def mk_order_pane_layout( # font_size: int = _font_small.px_size - 2 font_size: int = _font.px_size - 2 - accounts = brokers.config.load_accounts() + accounts = config.load_accounts() # TODO: maybe just allocate the whole fields form here # and expect an async ctx entry? From 5333d25bf62098d971d6f99fa05f95240eefd746 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Sep 2021 21:27:53 -0400 Subject: [PATCH 02/39] Better separation of UI vs. allocator settings Get rid of `PositionTracker.init_status_ui()` and instead make a helper func `mk_allocator()` which takes in the alloc and adjusts default settings on the allocator alone (which is expected to be passed in). Expect a `Position` instance to be passed into the tracker which will be looked up for UI updates. Move *update-from-position-msg* ops into a `Position.update_from_msg()` method. --- piker/ui/_position.py | 160 ++++++++++++++++++++++-------------------- 1 file changed, 83 insertions(+), 77 deletions(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 3d5364ad..8cff1b95 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -63,6 +63,24 @@ class Position(BaseModel): # ordered record of known constituent trade messages fills: list[Status] = [] + def update_from_msg( + self, + msg: BrokerdPosition, + + ) -> None: + + # XXX: better place to do this? + symbol = self.symbol + + lot_size_digits = symbol.lot_size_digits + avg_price, size = ( + round(msg['avg_price'], ndigits=symbol.tick_size_digits), + round(msg['size'], ndigits=lot_size_digits), + ) + + self.avg_price = avg_price + self.size = size + _size_units = bidict({ 'currency': '$ size', @@ -250,6 +268,60 @@ class Allocator(BaseModel): return round(prop * self.slots) +def mk_allocator( + + alloc: Allocator, + startup_pp: Position, + config_section: dict = {}, + +) -> (float, Allocator): + + asset_type = alloc.symbol.type_key + + # load and retreive user settings for default allocations + # ``config.toml`` + slots = 4 + currency_limit = 5e3 + + alloc.slots = slots + alloc.currency_limit = currency_limit + + # default entry sizing + if asset_type in ('stock', 'crypto', 'forex'): + + alloc.size_unit = '$ size' + + elif asset_type in ('future', 'option', 'futures_option'): + + # since it's harder to know how currency "applies" in this case + # given leverage properties + alloc.size_unit = '# units' + + # set units limit to slots size thus making make the next + # entry step 1.0 + alloc.units_limit = slots + + # if the current position is already greater then the limit + # settings, increase the limit to the current position + if alloc.size_unit == 'currency': + startup_size = startup_pp.size * startup_pp.avg_price + + if startup_size > alloc.currency_limit: + alloc.currency_limit = round(startup_size, ndigits=2) + + limit_text = alloc.currency_limit + + else: + startup_size = startup_pp.size + + if startup_size > alloc.units_limit: + alloc.units_limit = startup_size + + limit_text = alloc.units_limit + + return limit_text, alloc + + @dataclass class SettingsPane: '''Composite set of widgets plus an allocator model for configuring @@ -356,59 +428,6 @@ class SettingsPane: # UI in some way? return True - def init_status_ui( - self, - ): - alloc = self.alloc - asset_type = alloc.symbol.type_key - # form = self.form - - # TODO: pull from piker.toml - # default config - slots = 4 - currency_limit = 5e3 - - startup_pp = self.tracker.startup_pp - - alloc.slots = slots - alloc.currency_limit = currency_limit - - # default entry sizing - if asset_type in ('stock', 'crypto', 'forex'): - - alloc.size_unit = '$ size' - - elif asset_type in ('future', 'option', 'futures_option'): - - # since it's harder to know how currency "applies" in this case - # given leverage properties - alloc.size_unit = '# units' - - # set units limit to slots size thus making make the next - # entry step 1.0 - alloc.units_limit = slots - - # if the current position is already greater then the limit - # settings, increase the limit to the current position - if alloc.size_unit == 'currency': - startup_size = startup_pp.size * startup_pp.avg_price - - if startup_size > alloc.currency_limit: - alloc.currency_limit = round(startup_size, ndigits=2) - - limit_text = alloc.currency_limit - - else: - startup_size = startup_pp.size - - if startup_size > alloc.units_limit: - alloc.units_limit = startup_size - - limit_text = alloc.units_limit - - self.on_ui_settings_change('limit', limit_text) - self.update_status_ui(size=startup_size) - def update_status_ui( self, size: float = None, @@ -533,9 +552,9 @@ class PositionTracker: # inputs chart: 'ChartPlotWidget' # noqa alloc: Allocator + startup_pp: Position # allocated - startup_pp: Position live_pp: Position pp_label: Label size_label: Label @@ -547,17 +566,14 @@ class PositionTracker: self, chart: 'ChartPlotWidget', # noqa alloc: Allocator, + startup_pp: Position, ) -> None: self.chart = chart self.alloc = alloc - self.live_pp = Position( - symbol=chart.linked.symbol, - size=0, - avg_price=0, - ) - self.startup_pp = self.live_pp.copy() + self.startup_pp = startup_pp + self.live_pp = startup_pp.copy() view = chart.getViewBox() @@ -622,9 +638,8 @@ class PositionTracker: self.pp_label.update() self.size_label.update() - def update_from_pp_msg( + def update_from_pp( self, - msg: BrokerdPosition, position: Optional[Position] = None, ) -> None: @@ -632,23 +647,14 @@ class PositionTracker: EMS ``BrokerdPosition`` msg. ''' - # XXX: better place to do this? - symbol = self.chart.linked.symbol - lot_size_digits = symbol.lot_size_digits - avg_price, size = ( - round(msg['avg_price'], ndigits=symbol.tick_size_digits), - round(msg['size'], ndigits=lot_size_digits), - ) - # live pp updates pp = position or self.live_pp - pp.avg_price = avg_price - pp.size = size + # pp.update_from_msg(msg) self.update_line( - avg_price, - size, - lot_size_digits, + pp.avg_price, + pp.size, + self.chart.linked.symbol.lot_size_digits, ) # label updates @@ -656,11 +662,11 @@ class PositionTracker: self.alloc.slots_used(pp), ndigits=1) self.size_label.render() - if size == 0: + if pp.size == 0: self.hide() else: - self._level_marker.level = avg_price + self._level_marker.level = pp.avg_price # these updates are critical to avoid lag on view/scene changes self._level_marker.update() # trigger paint From 343cb4b0ae57a92319dbf9937987868d6abc0447 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Sep 2021 21:36:30 -0400 Subject: [PATCH 03/39] Port order mode setup to new pp apis; reduces implicit update logic --- piker/ui/order_mode.py | 139 ++++++++++++++++++++++++----------------- 1 file changed, 81 insertions(+), 58 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 85555bad..c3e86d7b 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -32,7 +32,7 @@ from pydantic import BaseModel import tractor import trio -from .. import brokers +from .. import config from ..calc import pnl from ..clearing._client import open_ems, OrderBook from ..data._source import Symbol @@ -41,7 +41,14 @@ from ..data.feed import Feed from ..log import get_logger from ._editors import LineEditor, ArrowEditor from ._lines import order_line, LevelLine -from ._position import PositionTracker, SettingsPane, Allocator, _size_units +from ._position import ( + Position, + Allocator, + mk_allocator, + PositionTracker, + SettingsPane, + _size_units, +) from ._window import MultiStatus from ..clearing._messages import Order from ._forms import open_form_input_handling @@ -511,36 +518,91 @@ async def open_order_mode( lines = LineEditor(chart=chart) arrows = ArrowEditor(chart, {}) - # load account names from ``brokers.toml`` - accounts = bidict(brokers.config.load_accounts()) + form = chart.sidepane - # allocator - alloc = Allocator( - symbol=symbol, - account=None, # select paper by default - _accounts=accounts, - size_unit=_size_units['currency'], - units_limit=400, - currency_limit=5e3, - slots=4, + # update any from exising positions received from ``brokerd`` + symbol = chart.linked.symbol + symkey = chart.linked._symbol.key + + pp_msg = None + for sym, msg in positions.items(): + if sym.lower() in symkey: + pp_msg = msg + break + + # net-zero pp + startup_pp = Position( + symbol=chart.linked.symbol, + size=0, + avg_price=0, ) - form = chart.sidepane + if pp_msg: + startup_pp.update_from_msg(msg) + + # load account names from ``brokers.toml`` + accounts = bidict(config.load_accounts()) + + # allocator + limit_value, alloc = mk_allocator( + alloc=Allocator( + symbol=symbol, + account=None, # select paper by default + _accounts=accounts, + size_unit=_size_units['currency'], + units_limit=400, + currency_limit=5e3, + slots=4, + ), + startup_pp=startup_pp, + ) form.model = alloc - pp_tracker = PositionTracker(chart, alloc) - pp_tracker.hide() + pp_tracker = PositionTracker( + chart, + alloc, + startup_pp + ) + pp_tracker.update_from_pp(startup_pp) + + if startup_pp.size == 0: + # if no position, don't show pp tracking graphics + pp_tracker.hide() # order pane widgets and allocation model order_pane = SettingsPane( tracker=pp_tracker, form=form, alloc=alloc, + + # XXX: ugh, so hideous... fill_bar=form.fill_bar, pnl_label=form.left_label, step_label=form.bottom_label, limit_label=form.top_label, ) + # make fill bar and positioning snapshot + # XXX: this need to be called *before* the first + # pp tracker update(s) below to ensure the limit size unit has + # been correctly set prior to updating the line's pp size label + # (the one on the RHS). + # TODO: should probably split out the alloc config from the UI + # config startup steps.. + order_pane.on_ui_settings_change('limit', limit_value) + order_pane.update_status_ui(size=startup_pp.size) + + # top level abstraction which wraps all this crazyness into + # a namespace.. + mode = OrderMode( + chart, + book, + lines, + arrows, + multistatus, + pp_tracker, + allocator=alloc, + pane=order_pane, + ) # XXX: would love to not have to do this separate from edit # fields (which are done in an async loop - see below) @@ -556,51 +618,12 @@ async def open_order_mode( ) ) - # top level abstraction which wraps all this crazyness into - # a namespace.. - mode = OrderMode( - chart, - book, - lines, - arrows, - multistatus, - pp_tracker, - allocator=alloc, - pane=order_pane, - ) - # TODO: create a mode "manager" of sorts? # -> probably just call it "UxModes" err sumthin? # so that view handlers can access it view.order_mode = mode - our_sym = mode.chart.linked._symbol.key - - # update any exising position - pp_msg = None - for sym, msg in positions.items(): - if sym.lower() in our_sym: - pp_msg = msg - break - - # make fill bar and positioning snapshot - # XXX: this need to be called *before* the first - # pp tracker update(s) below to ensure the limit size unit has - # been correctly set prior to updating the line's pp size label - # (the one on the RHS). - # TODO: should probably split out the alloc config from the UI - # config startup steps.. - order_pane.init_status_ui() - - # we should probably make the allocator config - # and explitict helper func call that takes in the aloc and - # the postion / symbol info then take that alloc ref and - # update the pp_tracker and pp_pane? - if pp_msg: - pp_tracker.update_from_pp_msg(msg) - - order_pane.update_status_ui() - + # real-time pnl display task allocation live_pp = mode.pp.live_pp size = live_pp.size if size: @@ -627,7 +650,6 @@ async def open_order_mode( # set 0% pnl mode.pane.pnl_label.format(pnl=0) - # Begin order-response streaming done() @@ -751,7 +773,8 @@ async def process_trades_and_update_ui( sym = mode.chart.linked.symbol if msg['symbol'].lower() in sym.key: - tracker.update_from_pp_msg(msg) + tracker.live_pp.update_from_msg(msg) + tracker.update_from_pp() # update order pane widgets mode.pane.update_status_ui() From 214c6223286f83e2eef5eae66f6de44fd88fcf1b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Mon, 6 Sep 2021 22:05:42 -0400 Subject: [PATCH 04/39] Move allocator components to clearing sub-pkg --- piker/clearing/_allocate.py | 301 ++++++++++++++++++++++++++++++++++++ piker/ui/_position.py | 280 +-------------------------------- piker/ui/order_mode.py | 22 +-- 3 files changed, 313 insertions(+), 290 deletions(-) create mode 100644 piker/clearing/_allocate.py diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py new file mode 100644 index 00000000..d283c58f --- /dev/null +++ b/piker/clearing/_allocate.py @@ -0,0 +1,301 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Position allocation logic and protocols. + +''' +from enum import Enum +from typing import Optional + +from bidict import bidict +from pydantic import BaseModel, validator + +from ..data._source import Symbol +from ._messages import BrokerdPosition, Status + + +class Position(BaseModel): + '''Basic pp (personal position) model with attached fills history. + + This type should be IPC wire ready? + + ''' + symbol: Symbol + + # last size and avg entry price + size: float + avg_price: float # TODO: contextual pricing + + # ordered record of known constituent trade messages + fills: list[Status] = [] + + def update_from_msg( + self, + msg: BrokerdPosition, + + ) -> None: + + # XXX: better place to do this? + symbol = self.symbol + + lot_size_digits = symbol.lot_size_digits + avg_price, size = ( + round(msg['avg_price'], ndigits=symbol.tick_size_digits), + round(msg['size'], ndigits=lot_size_digits), + ) + + self.avg_price = avg_price + self.size = size + + +_size_units = bidict({ + 'currency': '$ size', + 'units': '# units', + # TODO: but we'll need a `.get_accounts()` or something + # 'percent_of_port': '% of port', +}) +SizeUnit = Enum( + 'SizeUnit', + _size_units, +) + + +class Allocator(BaseModel): + + class Config: + validate_assignment = True + copy_on_model_validation = False + arbitrary_types_allowed = True + + # required to get the account validator lookup working? + extra = 'allow' + # underscore_attrs_are_private = False + + symbol: Symbol + + account: Optional[str] = 'paper' + _accounts: bidict[str, Optional[str]] + + @validator('account', pre=True) + def set_account(cls, v, values): + if v: + return values['_accounts'][v] + + size_unit: SizeUnit = 'currency' + _size_units: dict[str, Optional[str]] = _size_units + + @validator('size_unit') + def lookup_key(cls, v): + # apply the corresponding enum key for the text "description" value + return v.name + + # TODO: if we ever want ot support non-uniform entry-slot-proportion + # "sizes" + # disti_weight: str = 'uniform' + + units_limit: float + currency_limit: float + slots: int + + def step_sizes( + self, + ) -> (float, float): + '''Return the units size for each unit type as a tuple. + + ''' + slots = self.slots + return ( + self.units_limit / slots, + self.currency_limit / slots, + ) + + def limit(self) -> float: + if self.size_unit == 'currency': + return self.currency_limit + else: + return self.units_limit + + def next_order_info( + self, + + startup_pp: Position, + live_pp: Position, + price: float, + action: str, + + ) -> dict: + '''Generate order request info for the "next" submittable order + depending on position / order entry config. + + ''' + sym = self.symbol + ld = sym.lot_size_digits + + size_unit = self.size_unit + live_size = live_pp.size + abs_live_size = abs(live_size) + abs_startup_size = abs(startup_pp.size) + + u_per_slot, currency_per_slot = self.step_sizes() + + if size_unit == 'units': + slot_size = u_per_slot + l_sub_pp = self.units_limit - abs_live_size + + elif size_unit == 'currency': + live_cost_basis = abs_live_size * live_pp.avg_price + slot_size = currency_per_slot / price + l_sub_pp = (self.currency_limit - live_cost_basis) / price + + # an entry (adding-to or starting a pp) + if ( + action == 'buy' and live_size > 0 or + action == 'sell' and live_size < 0 or + live_size == 0 + ): + + order_size = min(slot_size, l_sub_pp) + + # an exit (removing-from or going to net-zero pp) + else: + # when exiting a pp we always try to slot the position + # in the instrument's units, since doing so in a derived + # size measure (eg. currency value, percent of port) would + # result in a mis-mapping of slots sizes in unit terms + # (i.e. it would take *more* slots to exit at a profit and + # *less* slots to exit at a loss). + pp_size = max(abs_startup_size, abs_live_size) + slotted_pp = pp_size / self.slots + + if size_unit == 'currency': + # compute the "projected" limit's worth of units at the + # current pp (weighted) price: + slot_size = currency_per_slot / live_pp.avg_price + + else: + slot_size = u_per_slot + + # if our position is greater then our limit setting + # we'll want to use slot sizes which are larger then what + # the limit would normally determine + order_size = max(slotted_pp, slot_size) + + if ( + abs_live_size < slot_size or + + # NOTE: front/back "loading" heurstic: + # if the remaining pp is in between 0-1.5x a slot's + # worth, dump the whole position in this last exit + # therefore conducting so called "back loading" but + # **without** going past a net-zero pp. if the pp is + # > 1.5x a slot size, then front load: exit a slot's and + # expect net-zero to be acquired on the final exit. + slot_size < pp_size < round((1.5*slot_size), ndigits=ld) + ): + order_size = abs_live_size + + slots_used = 1.0 # the default uniform policy + if order_size < slot_size: + # compute a fractional slots size to display + slots_used = self.slots_used( + Position(symbol=sym, size=order_size, avg_price=price) + ) + + return { + 'size': abs(round(order_size, ndigits=ld)), + 'size_digits': ld, + + # TODO: incorporate multipliers for relevant derivatives + 'fiat_size': round(order_size * price, ndigits=2), + 'slots_used': slots_used, + } + + def slots_used( + self, + pp: Position, + + ) -> float: + '''Calc and return the number of slots used by this ``Position``. + + ''' + abs_pp_size = abs(pp.size) + + if self.size_unit == 'currency': + # live_currency_size = size or (abs_pp_size * pp.avg_price) + live_currency_size = abs_pp_size * pp.avg_price + prop = live_currency_size / self.currency_limit + + else: + # return (size or abs_pp_size) / alloc.units_limit + prop = abs_pp_size / self.units_limit + + # TODO: REALLY need a way to show partial slots.. + # for now we round at the midway point between slots + return round(prop * self.slots) + + +def mk_allocator( + + alloc: Allocator, + startup_pp: Position, + +) -> (float, Allocator): + + asset_type = alloc.symbol.type_key + + # load and retreive user settings for default allocations + # ``config.toml`` + slots = 4 + currency_limit = 5e3 + + alloc.slots = slots + alloc.currency_limit = currency_limit + + # default entry sizing + if asset_type in ('stock', 'crypto', 'forex'): + + alloc.size_unit = '$ size' + + elif asset_type in ('future', 'option', 'futures_option'): + + # since it's harder to know how currency "applies" in this case + # given leverage properties + alloc.size_unit = '# units' + + # set units limit to slots size thus making make the next + # entry step 1.0 + alloc.units_limit = slots + + # if the current position is already greater then the limit + # settings, increase the limit to the current position + if alloc.size_unit == 'currency': + startup_size = startup_pp.size * startup_pp.avg_price + + if startup_size > alloc.currency_limit: + alloc.currency_limit = round(startup_size, ndigits=2) + + limit_text = alloc.currency_limit + + else: + startup_size = startup_pp.size + + if startup_size > alloc.units_limit: + alloc.units_limit = startup_size + + limit_text = alloc.units_limit + + return limit_text, alloc diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 8cff1b95..7c48e940 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -20,15 +20,12 @@ Position info and display """ from __future__ import annotations from dataclasses import dataclass -from enum import Enum from functools import partial from math import floor from typing import Optional -from bidict import bidict from pyqtgraph import functions as fn -from pydantic import BaseModel, validator from ._annotate import LevelMarker from ._anchors import ( @@ -36,8 +33,7 @@ from ._anchors import ( gpath_pin, ) from ..calc import humanize -from ..clearing._messages import BrokerdPosition, Status -from ..data._source import Symbol +from ..clearing._allocate import Allocator, Position from ._label import Label from ._lines import LevelLine, order_line from ._style import _font @@ -48,280 +44,6 @@ from ..clearing._messages import Order log = get_logger(__name__) -class Position(BaseModel): - '''Basic pp (personal position) model with attached fills history. - - This type should be IPC wire ready? - - ''' - symbol: Symbol - - # last size and avg entry price - size: float - avg_price: float # TODO: contextual pricing - - # ordered record of known constituent trade messages - fills: list[Status] = [] - - def update_from_msg( - self, - msg: BrokerdPosition, - - ) -> None: - - # XXX: better place to do this? - symbol = self.symbol - - lot_size_digits = symbol.lot_size_digits - avg_price, size = ( - round(msg['avg_price'], ndigits=symbol.tick_size_digits), - round(msg['size'], ndigits=lot_size_digits), - ) - - self.avg_price = avg_price - self.size = size - - -_size_units = bidict({ - 'currency': '$ size', - 'units': '# units', - # TODO: but we'll need a `.get_accounts()` or something - # 'percent_of_port': '% of port', -}) -SizeUnit = Enum( - 'SizeUnit', - _size_units, -) - - -class Allocator(BaseModel): - - class Config: - validate_assignment = True - copy_on_model_validation = False - arbitrary_types_allowed = True - - # required to get the account validator lookup working? - extra = 'allow' - # underscore_attrs_are_private = False - - symbol: Symbol - - account: Optional[str] = 'paper' - _accounts: bidict[str, Optional[str]] - - @validator('account', pre=True) - def set_account(cls, v, values): - if v: - return values['_accounts'][v] - - size_unit: SizeUnit = 'currency' - _size_units: dict[str, Optional[str]] = _size_units - - @validator('size_unit') - def lookup_key(cls, v): - # apply the corresponding enum key for the text "description" value - return v.name - - # TODO: if we ever want ot support non-uniform entry-slot-proportion - # "sizes" - # disti_weight: str = 'uniform' - - units_limit: float - currency_limit: float - slots: int - - def step_sizes( - self, - ) -> (float, float): - '''Return the units size for each unit type as a tuple. - - ''' - slots = self.slots - return ( - self.units_limit / slots, - self.currency_limit / slots, - ) - - def limit(self) -> float: - if self.size_unit == 'currency': - return self.currency_limit - else: - return self.units_limit - - def next_order_info( - self, - - startup_pp: Position, - live_pp: Position, - price: float, - action: str, - - ) -> dict: - '''Generate order request info for the "next" submittable order - depending on position / order entry config. - - ''' - sym = self.symbol - ld = sym.lot_size_digits - - size_unit = self.size_unit - live_size = live_pp.size - abs_live_size = abs(live_size) - abs_startup_size = abs(startup_pp.size) - - u_per_slot, currency_per_slot = self.step_sizes() - - if size_unit == 'units': - slot_size = u_per_slot - l_sub_pp = self.units_limit - abs_live_size - - elif size_unit == 'currency': - live_cost_basis = abs_live_size * live_pp.avg_price - slot_size = currency_per_slot / price - l_sub_pp = (self.currency_limit - live_cost_basis) / price - - # an entry (adding-to or starting a pp) - if ( - action == 'buy' and live_size > 0 or - action == 'sell' and live_size < 0 or - live_size == 0 - ): - - order_size = min(slot_size, l_sub_pp) - - # an exit (removing-from or going to net-zero pp) - else: - # when exiting a pp we always try to slot the position - # in the instrument's units, since doing so in a derived - # size measure (eg. currency value, percent of port) would - # result in a mis-mapping of slots sizes in unit terms - # (i.e. it would take *more* slots to exit at a profit and - # *less* slots to exit at a loss). - pp_size = max(abs_startup_size, abs_live_size) - slotted_pp = pp_size / self.slots - - if size_unit == 'currency': - # compute the "projected" limit's worth of units at the - # current pp (weighted) price: - slot_size = currency_per_slot / live_pp.avg_price - - else: - slot_size = u_per_slot - - # if our position is greater then our limit setting - # we'll want to use slot sizes which are larger then what - # the limit would normally determine - order_size = max(slotted_pp, slot_size) - - if ( - abs_live_size < slot_size or - - # NOTE: front/back "loading" heurstic: - # if the remaining pp is in between 0-1.5x a slot's - # worth, dump the whole position in this last exit - # therefore conducting so called "back loading" but - # **without** going past a net-zero pp. if the pp is - # > 1.5x a slot size, then front load: exit a slot's and - # expect net-zero to be acquired on the final exit. - slot_size < pp_size < round((1.5*slot_size), ndigits=ld) - ): - order_size = abs_live_size - - slots_used = 1.0 # the default uniform policy - if order_size < slot_size: - # compute a fractional slots size to display - slots_used = self.slots_used( - Position(symbol=sym, size=order_size, avg_price=price) - ) - - return { - 'size': abs(round(order_size, ndigits=ld)), - 'size_digits': ld, - - # TODO: incorporate multipliers for relevant derivatives - 'fiat_size': round(order_size * price, ndigits=2), - 'slots_used': slots_used, - } - - def slots_used( - self, - pp: Position, - - ) -> float: - '''Calc and return the number of slots used by this ``Position``. - - ''' - abs_pp_size = abs(pp.size) - - if self.size_unit == 'currency': - # live_currency_size = size or (abs_pp_size * pp.avg_price) - live_currency_size = abs_pp_size * pp.avg_price - prop = live_currency_size / self.currency_limit - - else: - # return (size or abs_pp_size) / alloc.units_limit - prop = abs_pp_size / self.units_limit - - # TODO: REALLY need a way to show partial slots.. - # for now we round at the midway point between slots - return round(prop * self.slots) - - -def mk_allocator( - - alloc: Allocator, - startup_pp: Position, - config_section: dict = {}, - -) -> (float, Allocator): - - asset_type = alloc.symbol.type_key - - # load and retreive user settings for default allocations - # ``config.toml`` - slots = 4 - currency_limit = 5e3 - - alloc.slots = slots - alloc.currency_limit = currency_limit - - # default entry sizing - if asset_type in ('stock', 'crypto', 'forex'): - - alloc.size_unit = '$ size' - - elif asset_type in ('future', 'option', 'futures_option'): - - # since it's harder to know how currency "applies" in this case - # given leverage properties - alloc.size_unit = '# units' - - # set units limit to slots size thus making make the next - # entry step 1.0 - alloc.units_limit = slots - - # if the current position is already greater then the limit - # settings, increase the limit to the current position - if alloc.size_unit == 'currency': - startup_size = startup_pp.size * startup_pp.avg_price - - if startup_size > alloc.currency_limit: - alloc.currency_limit = round(startup_size, ndigits=2) - - limit_text = alloc.currency_limit - - else: - startup_size = startup_pp.size - - if startup_size > alloc.units_limit: - alloc.units_limit = startup_size - - limit_text = alloc.units_limit - - return limit_text, alloc - - @dataclass class SettingsPane: '''Composite set of widgets plus an allocator model for configuring diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index c3e86d7b..6ef9228f 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -35,6 +35,12 @@ import trio from .. import config from ..calc import pnl from ..clearing._client import open_ems, OrderBook +from ..clearing._allocate import ( + Allocator, + mk_allocator, + Position, + _size_units, +) from ..data._source import Symbol from ..data._normalize import iterticks from ..data.feed import Feed @@ -42,12 +48,8 @@ from ..log import get_logger from ._editors import LineEditor, ArrowEditor from ._lines import order_line, LevelLine from ._position import ( - Position, - Allocator, - mk_allocator, PositionTracker, SettingsPane, - _size_units, ) from ._window import MultiStatus from ..clearing._messages import Order @@ -545,6 +547,7 @@ async def open_order_mode( # allocator limit_value, alloc = mk_allocator( + alloc=Allocator( symbol=symbol, account=None, # select paper by default @@ -571,6 +574,7 @@ async def open_order_mode( # order pane widgets and allocation model order_pane = SettingsPane( + tracker=pp_tracker, form=form, alloc=alloc, @@ -581,14 +585,10 @@ async def open_order_mode( step_label=form.bottom_label, limit_label=form.top_label, ) - # make fill bar and positioning snapshot - # XXX: this need to be called *before* the first - # pp tracker update(s) below to ensure the limit size unit has - # been correctly set prior to updating the line's pp size label - # (the one on the RHS). - # TODO: should probably split out the alloc config from the UI - # config startup steps.. + + # set startup limit value read during alloc init order_pane.on_ui_settings_change('limit', limit_value) + # make fill bar and positioning snapshot order_pane.update_status_ui(size=startup_pp.size) # top level abstraction which wraps all this crazyness into From d38a6bf0322a9786dd63fe62440a5d4ff2d8cff2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 08:38:24 -0400 Subject: [PATCH 05/39] Create alloc instance in factory body, template out defaults loading --- piker/clearing/_allocate.py | 48 ++++++++++++++++++++++++------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py index d283c58f..b052e38d 100644 --- a/piker/clearing/_allocate.py +++ b/piker/clearing/_allocate.py @@ -250,27 +250,45 @@ class Allocator(BaseModel): def mk_allocator( - alloc: Allocator, + symbol: Symbol, + accounts: dict[str, str], startup_pp: Position, -) -> (float, Allocator): + # default allocation settings + defaults: dict[str, float] = { + 'account': None, # select paper by default + 'size_unit': _size_units['currency'], + 'units_limit': 400, + 'currency_limit': 5e3, + 'slots': 4, + }, + **kwargs, - asset_type = alloc.symbol.type_key +) -> Allocator: + + if kwargs: + defaults.update(kwargs) # load and retreive user settings for default allocations # ``config.toml`` - slots = 4 - currency_limit = 5e3 + user_def = { + 'currency_limit': 5e3, + 'slots': 4, + } - alloc.slots = slots - alloc.currency_limit = currency_limit + defaults.update(user_def) - # default entry sizing - if asset_type in ('stock', 'crypto', 'forex'): + alloc = Allocator( + symbol=symbol, + _accounts=accounts, + **defaults, + ) - alloc.size_unit = '$ size' + asset_type = symbol.type_key - elif asset_type in ('future', 'option', 'futures_option'): + # specific configs by asset class / type + + if asset_type in ('future', 'option', 'futures_option'): # since it's harder to know how currency "applies" in this case # given leverage properties @@ -278,7 +296,7 @@ def mk_allocator( # set units limit to slots size thus making make the next # entry step 1.0 - alloc.units_limit = slots + alloc.units_limit = alloc.slots # if the current position is already greater then the limit # settings, increase the limit to the current position @@ -288,14 +306,10 @@ def mk_allocator( if startup_size > alloc.currency_limit: alloc.currency_limit = round(startup_size, ndigits=2) - limit_text = alloc.currency_limit - else: startup_size = startup_pp.size if startup_size > alloc.units_limit: alloc.units_limit = startup_size - limit_text = alloc.units_limit - - return limit_text, alloc + return alloc From 09d34f735521c0292e2c08f7bb39409dccebfd34 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 09:21:55 -0400 Subject: [PATCH 06/39] Make `accounts` field public, add an account name method --- piker/clearing/_allocate.py | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py index b052e38d..33e2c635 100644 --- a/piker/clearing/_allocate.py +++ b/piker/clearing/_allocate.py @@ -83,17 +83,16 @@ class Allocator(BaseModel): # required to get the account validator lookup working? extra = 'allow' - # underscore_attrs_are_private = False + underscore_attrs_are_private = False symbol: Symbol - + accounts: bidict[str, Optional[str]] account: Optional[str] = 'paper' - _accounts: bidict[str, Optional[str]] - @validator('account', pre=True) + @validator('account', pre=False) def set_account(cls, v, values): if v: - return values['_accounts'][v] + return values['accounts'][v] size_unit: SizeUnit = 'currency' _size_units: dict[str, Optional[str]] = _size_units @@ -129,6 +128,9 @@ class Allocator(BaseModel): else: return self.units_limit + def account_name(self) -> str: + return self.accounts.inverse[self.account] + def next_order_info( self, @@ -280,7 +282,7 @@ def mk_allocator( alloc = Allocator( symbol=symbol, - _accounts=accounts, + accounts=accounts, **defaults, ) From 7b86b6ae204242d9ac5e7eb7b2d65efa66308752 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 09:22:24 -0400 Subject: [PATCH 07/39] Add account settings change support --- piker/ui/_position.py | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 7c48e940..8328b480 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -92,7 +92,6 @@ class SettingsPane: '''Called on any order pane edit field value change. ''' - print(f'settings change: {key}: {value}') alloc = self.alloc size_unit = alloc.size_unit @@ -112,12 +111,14 @@ class SettingsPane: pass elif key == 'account': - print(f'TODO: change account -> {value}') + account_name = value or 'paper' + assert alloc.account_name() == account_name else: raise ValueError(f'Unknown setting {key}') # read out settings and update UI + log.info(f'settings change: {key}: {value}') suffix = {'currency': ' $', 'units': ' u'}[size_unit] limit = alloc.limit() @@ -142,6 +143,7 @@ class SettingsPane: self.form.fields['size_unit'].setCurrentText( alloc._size_units[alloc.size_unit] ) + self.form.fields['account'].setCurrentText(alloc.account_name()) self.form.fields['slots'].setText(str(alloc.slots)) self.form.fields['limit'].setText(str(limit)) From 2bc07ae05b0acc31b61ca0afee9d9699e8362c94 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 09:22:56 -0400 Subject: [PATCH 08/39] Try explicit matches of symbol to our adhoc set for pp msgs --- piker/brokers/ib.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index cf646f07..2e80c0a0 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1277,8 +1277,6 @@ async def stream_quotes( calc_price=calc_price ) - # con = quote['contract'] - # topic = '.'.join((con['symbol'], suffix)).lower() quote['symbol'] = topic await send_chan.send({topic: quote}) @@ -1295,12 +1293,21 @@ def pack_position(pos: Position) -> dict[str, Any]: symbol = con.localSymbol.replace(' ', '') else: - symbol = con.symbol + symbol = con.symbol.lower() + + exch = (con.primaryExchange or con.exchange).lower() + symkey = '.'.join((symbol, exch)) + + if not exch: + # attempt to lookup the symbol from our + # hacked set.. + for sym in _adhoc_futes_set: + if symbol in sym: + symkey = sym + break + + # TODO: options contracts into a sane format.. - symkey = '.'.join([ - symbol.lower(), - (con.primaryExchange or con.exchange).lower(), - ]) return BrokerdPosition( broker='ib', account=pos.account, From 5d25a0d370a6fe15d578c278c644c49df28ad0da Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 09:23:18 -0400 Subject: [PATCH 09/39] Better pp loading at startup - directly lookup the position data for the current symbol - let `mk_alloc()` create the allocator - load and set account name for pp in sidepane --- piker/ui/order_mode.py | 50 ++++++++++++++++++++---------------------- 1 file changed, 24 insertions(+), 26 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 6ef9228f..15b37281 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -39,7 +39,6 @@ from ..clearing._allocate import ( Allocator, mk_allocator, Position, - _size_units, ) from ..data._source import Symbol from ..data._normalize import iterticks @@ -509,7 +508,7 @@ async def open_order_mode( trades_stream, positions ), - trio.open_nursery() as n, + trio.open_nursery() as tn, ): log.info(f'Opening order mode for {brokername}.{symbol.key}') @@ -526,37 +525,34 @@ async def open_order_mode( symbol = chart.linked.symbol symkey = chart.linked._symbol.key - pp_msg = None - for sym, msg in positions.items(): - if sym.lower() in symkey: - pp_msg = msg - break + # NOTE: requires that the backend exactly specifies + # the expected symbol key in it's positions msg. + pp_msg = positions.get(symkey) # net-zero pp startup_pp = Position( - symbol=chart.linked.symbol, + symbol=symbol, size=0, avg_price=0, ) - if pp_msg: - startup_pp.update_from_msg(msg) - # load account names from ``brokers.toml`` accounts = bidict(config.load_accounts()) + pp_account = None + + if pp_msg: + log.info(f'Loading pp for {symkey}:\n{pformat(pp_msg)}') + startup_pp.update_from_msg(pp_msg) + pp_account = accounts.inverse.get(pp_msg.get('account')) + + # lookup account for this pp or load the user default + # for this backend # allocator - limit_value, alloc = mk_allocator( - - alloc=Allocator( - symbol=symbol, - account=None, # select paper by default - _accounts=accounts, - size_unit=_size_units['currency'], - units_limit=400, - currency_limit=5e3, - slots=4, - ), + alloc = mk_allocator( + symbol=symbol, + accounts=accounts, + account=pp_account, startup_pp=startup_pp, ) form.model = alloc @@ -587,7 +583,9 @@ async def open_order_mode( ) # set startup limit value read during alloc init - order_pane.on_ui_settings_change('limit', limit_value) + order_pane.on_ui_settings_change('limit', alloc.limit()) + order_pane.on_ui_settings_change('account', pp_account) + # make fill bar and positioning snapshot order_pane.update_status_ui(size=startup_pp.size) @@ -640,7 +638,7 @@ async def open_order_mode( ) # spawn updater task - n.start_soon( + tn.start_soon( display_pnl, feed, mode, @@ -672,9 +670,9 @@ async def open_order_mode( # to handle input since the ems connection is ready started.set() - n.start_soon( + tn.start_soon( process_trades_and_update_ui, - n, + tn, feed, mode, trades_stream, From b5c1120ad0ce1bc374c5af9fd209ac565e3f3993 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 12:54:10 -0400 Subject: [PATCH 10/39] Set account in ui handler --- piker/ui/_position.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 8328b480..9bdc8ffd 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -78,7 +78,7 @@ class SettingsPane: '''Called on any order pane drop down selection change. ''' - print(f'selection input: {text}') + log.info(f'selection input: {text}') setattr(self.alloc, key, text) self.on_ui_settings_change(key, text) @@ -112,7 +112,7 @@ class SettingsPane: elif key == 'account': account_name = value or 'paper' - assert alloc.account_name() == account_name + alloc.account = account_name else: raise ValueError(f'Unknown setting {key}') From 063788499ab419cef509c89dac763c12d359e93b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 7 Sep 2021 12:54:32 -0400 Subject: [PATCH 11/39] Use a pnl task per symbol --- piker/ui/order_mode.py | 73 +++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 33 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 15b37281..87332ac6 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -625,8 +625,7 @@ async def open_order_mode( live_pp = mode.pp.live_pp size = live_pp.size if size: - global _zero_pp - _zero_pp = False + global _pnl_tasks # compute and display pnl status immediately mode.pane.pnl_label.format( @@ -681,7 +680,7 @@ async def open_order_mode( yield mode -_zero_pp: bool = True +_pnl_tasks: dict[str, bool] = {} async def display_pnl( @@ -693,12 +692,15 @@ async def display_pnl( Error if this task is spawned where there is a net-zero pp. ''' - global _zero_pp - assert not _zero_pp + global _pnl_tasks pp = order_mode.pp live = pp.live_pp + sym = live.symbol.key + assert not _pnl_tasks.get(sym) + _pnl_tasks[sym] = True + if live.size < 0: types = ('ask', 'last', 'last', 'utrade') @@ -709,37 +711,39 @@ async def display_pnl( raise RuntimeError('No pp?!?!') # real-time update pnl on the status pane - async with feed.stream.subscribe() as bstream: - # last_tick = time.time() - async for quotes in bstream: + try: + async with feed.stream.subscribe() as bstream: + # last_tick = time.time() + async for quotes in bstream: - # now = time.time() - # period = now - last_tick + # now = time.time() + # period = now - last_tick - for sym, quote in quotes.items(): + for sym, quote in quotes.items(): - for tick in iterticks(quote, types): - # print(f'{1/period} Hz') + for tick in iterticks(quote, types): + # print(f'{1/period} Hz') - size = live.size + size = live.size + if size == 0: + # terminate this update task since we're + # no longer in a pp + order_mode.pane.pnl_label.format(pnl=0) + return - if size == 0: - # terminate this update task since we're - # no longer in a pp - _zero_pp = True - order_mode.pane.pnl_label.format(pnl=0) - return + else: + # compute and display pnl status + order_mode.pane.pnl_label.format( + pnl=copysign(1, size) * pnl( + live.avg_price, + tick['price'], + ), + ) - else: - # compute and display pnl status - order_mode.pane.pnl_label.format( - pnl=copysign(1, size) * pnl( - live.avg_price, - tick['price'], - ), - ) - - # last_tick = time.time() + # last_tick = time.time() + finally: + assert _pnl_tasks[sym] + assert _pnl_tasks.pop(sym) async def process_trades_and_update_ui( @@ -754,7 +758,7 @@ async def process_trades_and_update_ui( get_index = mode.chart.get_index tracker = mode.pp - global _zero_pp + global _pnl_tasks # this is where we receive **back** messages # about executions **from** the EMS actor @@ -771,14 +775,17 @@ async def process_trades_and_update_ui( sym = mode.chart.linked.symbol if msg['symbol'].lower() in sym.key: + tracker.live_pp.update_from_msg(msg) tracker.update_from_pp() # update order pane widgets mode.pane.update_status_ui() - if mode.pp.live_pp.size and _zero_pp: - _zero_pp = False + if ( + tracker.live_pp.size and + sym.key not in _pnl_tasks + ): n.start_soon( display_pnl, feed, From 0d2cddec9a26949462e4100afab73b960aa399af Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Sep 2021 14:01:54 -0400 Subject: [PATCH 12/39] Return accounts in `bidict` --- piker/clearing/_allocate.py | 2 +- piker/config.py | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py index 33e2c635..7549b3c2 100644 --- a/piker/clearing/_allocate.py +++ b/piker/clearing/_allocate.py @@ -129,7 +129,7 @@ class Allocator(BaseModel): return self.units_limit def account_name(self) -> str: - return self.accounts.inverse[self.account] + return self.accounts.inverse[self.account] def next_order_info( self, diff --git a/piker/config.py b/piker/config.py index e979a354..60575e2e 100644 --- a/piker/config.py +++ b/piker/config.py @@ -22,6 +22,7 @@ from os.path import dirname import shutil from typing import Optional +from bidict import bidict import toml import click @@ -104,10 +105,13 @@ def write( return toml.dump(config, cf) -def load_accounts() -> dict[str, Optional[str]]: +def load_accounts( + provider: Optional[str] = None + +) -> bidict[str, Optional[str]]: # our default paper engine entry - accounts: dict[str, Optional[str]] = {'paper': None} + accounts = bidict({'paper': None}) conf, path = load() section = conf.get('accounts') @@ -116,7 +120,11 @@ def load_accounts() -> dict[str, Optional[str]]: else: for brokername, account_labels in section.items(): - for name, value in account_labels.items(): - accounts[f'{brokername}.{name}'] = value + if ( + provider is None or + provider and brokername == provider + ): + for name, value in account_labels.items(): + accounts[f'{brokername}.{name}'] = value return accounts From 504040eb59338edde5d678aa3b2580de3b0e5312 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Sep 2021 14:03:18 -0400 Subject: [PATCH 13/39] Add an `account` field to EMS msging schemas --- piker/clearing/_messages.py | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index 126326ab..e7fadccd 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -45,6 +45,7 @@ class Order(BaseModel): # internal ``emdsd`` unique "order id" oid: str # uuid4 symbol: Union[str, Symbol] + account: str # should we set a default as '' ? price: float size: float @@ -86,6 +87,7 @@ class Status(BaseModel): # 'broker_cancelled', # 'broker_executed', # 'broker_filled', + # 'broker_errored', # 'alert_submitted', # 'alert_triggered', @@ -118,6 +120,7 @@ class BrokerdCancel(BaseModel): oid: str # piker emsd order id time_ns: int + account: str # "broker request id": broker specific/internal order id if this is # None, creates a new order otherwise if the id is valid the backend # api must modify the existing matching order. If the broker allows @@ -131,6 +134,7 @@ class BrokerdOrder(BaseModel): action: str # {buy, sell} oid: str + account: str time_ns: int # "broker request id": broker specific/internal order id if this is @@ -162,6 +166,7 @@ class BrokerdOrderAck(BaseModel): # emsd id originally sent in matching request msg oid: str + account: str = '' class BrokerdStatus(BaseModel): @@ -170,6 +175,9 @@ class BrokerdStatus(BaseModel): reqid: Union[int, str] time_ns: int + # XXX: should be best effort set for every update + account: str = '' + # { # 'submitted', # 'cancelled', @@ -224,7 +232,11 @@ class BrokerdError(BaseModel): This is still a TODO thing since we're not sure how to employ it yet. ''' name: str = 'error' - reqid: Union[int, str] + oid: str + + # if no brokerd order request was actually submitted (eg. we errored + # at the ``pikerd`` layer) then there will be ``reqid`` allocated. + reqid: Union[int, str] = '' symbol: str reason: str From b01538f183c7de8d410a5dad3d0af4f9ab4311c5 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Sep 2021 15:46:33 -0400 Subject: [PATCH 14/39] Support an account field in clearing system Each backend broker may support multiple (types) of accounts; this patch lets clients send order requests that pass through an `account` field in certain `emsd` <-> `brokerd` transactions. This allows each provider to read in and conduct logic based on what account value is passed via requests to the `trades_dialogue()` endpoint as well as tie together positioning updates with relevant account keys for display in UIs. This also adds relay support for a `Status` msg with a `'broker_errored'` status which for now will trigger the same logic as cancelled orders on the client side and thus will remove order lines submitted on a chart. --- piker/clearing/_ems.py | 11 +++++++++-- piker/ui/order_mode.py | 10 ++++++++-- 2 files changed, 17 insertions(+), 4 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index f5eeff87..48c8cce3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -201,6 +201,7 @@ async def clear_dark_triggers( msg = BrokerdOrder( action=cmd['action'], oid=oid, + account=cmd['account'], time_ns=time.time_ns(), # this **creates** new order request for the @@ -621,8 +622,11 @@ async def translate_and_relay_brokerd_events( # another stupid ib error to handle # if 10147 in message: cancel + resp = 'broker_errored' + broker_details = msg.dict() + # don't relay message to order requester client - continue + # continue elif name in ( 'status', @@ -741,6 +745,7 @@ async def process_client_order_cmds( oid=oid, reqid=reqid, time_ns=time.time_ns(), + account=live_entry.account, ) # NOTE: cancel response will be relayed back in messages @@ -814,6 +819,7 @@ async def process_client_order_cmds( action=action, price=trigger_price, size=size, + account=msg.account, ) # send request to backend @@ -1016,6 +1022,7 @@ async def _emsd_main( try: _router.clients.add(ems_client_order_stream) + # main entrypoint, run here until cancelled. await process_client_order_cmds( ems_client_order_stream, @@ -1035,7 +1042,7 @@ async def _emsd_main( dialogues = _router.dialogues - for oid, client_stream in dialogues.items(): + for oid, client_stream in dialogues.copy().items(): if client_stream == ems_client_order_stream: diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 87332ac6..49a6cb99 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -103,7 +103,7 @@ class OrderMode: arrows: ArrowEditor multistatus: MultiStatus pp: PositionTracker - allocator: 'Allocator' # noqa + alloc: 'Allocator' # noqa pane: SettingsPane active: bool = False @@ -193,6 +193,7 @@ class OrderMode: order = self._staged_order = Order( action=action, price=price, + account=self.alloc.account_name(), size=0, symbol=symbol, brokers=symbol.brokers, @@ -538,6 +539,8 @@ async def open_order_mode( # load account names from ``brokers.toml`` accounts = bidict(config.load_accounts()) + # process pps back from broker, only present + # account names reported back from ``brokerd``. pp_account = None if pp_msg: @@ -598,7 +601,7 @@ async def open_order_mode( arrows, multistatus, pp_tracker, - allocator=alloc, + alloc=alloc, pane=order_pane, ) @@ -823,10 +826,13 @@ async def process_trades_and_update_ui( elif resp in ( 'broker_cancelled', 'broker_inactive', + 'broker_errored', 'dark_cancelled' ): # delete level line from view mode.on_cancel(oid) + broker_msg = msg['brokerd_msg'] + log.warning(f'Order {oid} failed with:\n{pformat(broker_msg)}') elif resp in ( 'dark_triggered' From dedfb27a3a4eaa922dcb6b58f822df9e711d0ee4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 8 Sep 2021 15:55:45 -0400 Subject: [PATCH 15/39] Add per-account order entry for ib Make the `handle_order_requests()` tasks now lookup the appropriate API client for a given account (or error if it can't be found) and use it for submission. Account names are loaded from the `brokers.toml::accounts.ib` section both UI side and in the `brokerd`. Change `_aio_get_client()` to a `load_aio_client()` which now tries to scan and load api clients for all connections defined in the config as well as deliver the client cache and account lookup tables. --- piker/brokers/ib.py | 179 ++++++++++++++++++++++++++++++++------------ 1 file changed, 132 insertions(+), 47 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 2e80c0a0..c4cab60d 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -62,8 +62,7 @@ from ._util import SymbolNotFound, NoData from ..clearing._messages import ( BrokerdOrder, BrokerdOrderAck, BrokerdStatus, BrokerdPosition, BrokerdCancel, - BrokerdFill, - # BrokerdError, + BrokerdFill, BrokerdError, ) @@ -196,8 +195,8 @@ _adhoc_futes_set = { 'mgc.nymex', 'xagusd.cmdty', # silver spot - 'ni.nymex', # silver futes - 'qi.comex', # mini-silver futes + 'ni.nymex', # silver futes + 'qi.comex', # mini-silver futes } # exchanges we don't support at the moment due to not knowing @@ -222,7 +221,9 @@ class Client: """ def __init__( self, + ib: ibis.IB, + ) -> None: self.ib = ib self.ib.RaiseRequestErrors = True @@ -513,7 +514,7 @@ class Client: price: float, action: str, size: int, - account: str = '', # if blank the "default" tws account is used + account: str, # if blank the "default" tws account is used # XXX: by default 0 tells ``ib_insync`` methods that there is no # existing order so ask the client to create a new one (which it @@ -536,6 +537,7 @@ class Client: Order( orderId=reqid or 0, # stupid api devs.. action=action.upper(), # BUY/SELL + # lookup the literal account number by name here. account=account, orderType='LMT', lmtPrice=price, @@ -659,9 +661,10 @@ class Client: self.ib.errorEvent.connect(push_err) - async def positions( + def positions( self, account: str = '', + ) -> list[Position]: """ Retrieve position info for ``account``. @@ -695,8 +698,11 @@ def get_config() -> dict[str, Any]: return section +_accounts2clients: dict[str, Client] = {} + + @asynccontextmanager -async def _aio_get_client( +async def load_aio_clients( host: str = '127.0.0.1', port: int = None, @@ -710,23 +716,23 @@ async def _aio_get_client( TODO: consider doing this with a ctx mngr eventually? ''' + global _accounts2clients + global _client_cache + conf = get_config() + client = None # first check cache for existing client + if port: + log.info(f'Loading requested client on port: {port}') + client = _client_cache.get((host, port)) + + if client and client.ib.isConnected(): + yield client, _client_cache, _accounts2clients + return + + # allocate new and/or reload disconnected but cached clients try: - if port: - client = _client_cache[(host, port)] - else: - # grab first cached client - client = list(_client_cache.values())[0] - - if not client.ib.isConnected(): - # we have a stale client to re-allocate - raise KeyError - - yield client - - except (KeyError, IndexError): # TODO: in case the arbiter has no record # of existing brokerd we need to broadcast for one. @@ -753,6 +759,8 @@ async def _aio_get_client( ) order = ports['order'] + accounts_def = config.load_accounts('ib') + try_ports = [ports[key] for key in order] ports = try_ports if port is None else [port] @@ -762,29 +770,64 @@ async def _aio_get_client( _err = None + # (re)load any and all clients that can be found + # from connection details in ``brokers.toml``. for port in ports: - try: - log.info(f"Connecting to the EYEBEE on port {port}!") - await ib.connectAsync(host, port, clientId=client_id) - break - except ConnectionRefusedError as ce: - _err = ce - log.warning(f'Failed to connect on {port}') + client = _client_cache.get((host, port)) + + if not client or not client.ib.isConnected(): + try: + log.info(f"Connecting to the EYEBEE on port {port}!") + await ib.connectAsync(host, port, clientId=client_id) + + # create and cache client + client = Client(ib) + + # Pre-collect all accounts available for this + # connection and map account names to this client + # instance. + pps = ib.positions() + if pps: + for pp in pps: + _accounts2clients[ + accounts_def.inverse[pp.account] + ] = client + + # if there are no positions or accounts + # without positions we should still register + # them for this client + for value in ib.accountValues(): + acct = value.account + if acct not in _accounts2clients: + _accounts2clients[ + accounts_def.inverse[acct] + ] = client + + log.info( + f'Loaded accounts: {_accounts2clients} for {client} ' + f'@ {host}:{port}' + ) + + log.info(f"Caching client for {(host, port)}") + _client_cache[(host, port)] = client + + except ConnectionRefusedError as ce: + _err = ce + log.warning(f'Failed to connect on {port}') else: - raise ConnectionRefusedError(_err) + if not _client_cache: + raise ConnectionRefusedError(_err) - # create and cache - try: - client = Client(ib) + # retreive first loaded client + clients = list(_client_cache.values()) + if clients: + client = clients[0] - _client_cache[(host, port)] = client - log.debug(f"Caching client for {(host, port)}") + yield client, _client_cache, _accounts2clients - yield client - - except BaseException: - ib.disconnect() - raise + except BaseException: + ib.disconnect() + raise async def _aio_run_client_method( @@ -793,8 +836,12 @@ async def _aio_run_client_method( from_trio=None, **kwargs, ) -> None: - async with _aio_get_client() as client: + async with load_aio_clients() as ( + client, + clients, + accts2clients, + ): async_meth = getattr(client, meth) # handle streaming methods @@ -1081,8 +1128,11 @@ async def _setup_quote_stream( """ global _quote_streams - async with _aio_get_client() as client: - + async with load_aio_clients() as ( + client, + clients, + accts2clients, + ): contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) @@ -1324,11 +1374,41 @@ async def handle_order_requests( ) -> None: + global _accounts2clients + accounts_def = config.load_accounts('ib') + # request_msg: dict async for request_msg in ems_order_stream: log.info(f'Received order request {request_msg}') action = request_msg['action'] + account = request_msg['account'] + + acct_number = accounts_def.get(account) + if not acct_number: + log.error( + f'An IB account number for name {account} is not found?\n' + 'Make sure you have all TWS and GW instances running.' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'No account found: `{account}` ?', + ).dict()) + continue + + client = _accounts2clients.get(account) + if not client: + log.error( + f'An IB client for account name {account} is not found.\n' + 'Make sure you have all TWS and GW instances running.' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'No api client loaded for account: `{account}` ?', + ).dict()) + continue if action in {'buy', 'sell'}: # validate @@ -1343,6 +1423,7 @@ async def handle_order_requests( price=order.price, action=order.action, size=order.size, + account=order.account, # XXX: by default 0 tells ``ib_insync`` methods that # there is no existing order so ask the client to create @@ -1359,6 +1440,7 @@ async def handle_order_requests( # broker specific request id reqid=reqid, time_ns=time.time_ns(), + account=account, ).dict() ) @@ -1388,15 +1470,16 @@ async def trades_dialogue( ib_trade_events_stream = await _trio_run_client_method( method='recv_trade_updates', ) + global _accounts2clients + global _client_cache # deliver positions to subscriber before anything else - positions = await _trio_run_client_method(method='positions') - all_positions = {} - for pos in positions: - msg = pack_position(pos) - all_positions[msg.symbol] = msg.dict() + for client in _client_cache.values(): + for pos in client.positions(): + msg = pack_position(pos) + all_positions[msg.symbol] = msg.dict() await ctx.started(all_positions) @@ -1413,7 +1496,8 @@ async def trades_dialogue( # ib-gw goes down? Not sure exactly how that's happening looking # at the eventkit code above but we should probably handle it... async for event_name, item in ib_trade_events_stream: - print(f' ib sending {item}') + + log.info(f'ib sending {event_name}:\n{pformat(item)}') # TODO: templating the ib statuses in comparison with other # brokers is likely the way to go: @@ -1453,6 +1537,7 @@ async def trades_dialogue( reqid=trade.order.orderId, time_ns=time.time_ns(), # cuz why not + # account=client. # everyone doin camel case.. status=status.status.lower(), # force lower case From c53b8ec43c526b014844a82b1adb06538ce61ea0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Sep 2021 07:57:11 -0400 Subject: [PATCH 16/39] Make `ib` backend multi-client capable This adds full support for a single `brokerd` managing multiple API endpoint clients in tandem. Get the client scan loop correct and load accounts from all discovered clients as specified in a user's `broker.toml`. We now just always re-scan for all clients and if there's a cache hit just skip a creation/connection logic. Route orders with an account name to the correct client in the `handle_order_requests()` endpoint and spawn an event relay task per client for transmitting trade events back to `emsd`. --- piker/brokers/ib.py | 379 +++++++++++++++++++++++--------------------- 1 file changed, 202 insertions(+), 177 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index c4cab60d..a2166f07 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -219,6 +219,8 @@ class Client: Note: this client requires running inside an ``asyncio`` loop. """ + _contracts: dict[str, Contract] = {} + def __init__( self, @@ -229,7 +231,6 @@ class Client: self.ib.RaiseRequestErrors = True # contract cache - self._contracts: dict[str, Contract] = {} self._feeds: dict[str, trio.abc.SendChannel] = {} # NOTE: the ib.client here is "throttled" to 45 rps by default @@ -505,7 +506,7 @@ class Client: return contract, ticker, details # async to be consistent for the client proxy, and cuz why not. - async def submit_limit( + def submit_limit( self, # ignored since ib doesn't support defining your # own order id @@ -554,7 +555,7 @@ class Client: # their own weird client int counting ids.. return trade.order.orderId - async def submit_cancel( + def submit_cancel( self, reqid: str, ) -> None: @@ -571,6 +572,7 @@ class Client: async def recv_trade_updates( self, to_trio: trio.abc.SendChannel, + ) -> None: """Stream a ticker using the std L1 api. """ @@ -720,50 +722,32 @@ async def load_aio_clients( global _client_cache conf = get_config() + ib = None client = None - # first check cache for existing client - if port: - log.info(f'Loading requested client on port: {port}') - client = _client_cache.get((host, port)) + # attempt to get connection info from config; if no .toml entry + # exists, we try to load from a default localhost connection. + host = conf.get('host', '127.0.0.1') + ports = conf.get( + 'ports', - if client and client.ib.isConnected(): - yield client, _client_cache, _accounts2clients - return + # default order is to check for gw first + { + 'gw': 4002, + 'tws': 7497, + 'order': ['gw', 'tws'] + } + ) + order = ports['order'] + accounts_def = config.load_accounts('ib') + + try_ports = [ports[key] for key in order] + ports = try_ports if port is None else [port] + + we_connected = [] # allocate new and/or reload disconnected but cached clients try: - - # TODO: in case the arbiter has no record - # of existing brokerd we need to broadcast for one. - - if client_id is None: - # if this is a persistent brokerd, try to allocate a new id for - # each client - client_id = next(_client_ids) - - ib = NonShittyIB() - - # attempt to get connection info from config; if no .toml entry - # exists, we try to load from a default localhost connection. - host = conf.get('host', '127.0.0.1') - ports = conf.get( - 'ports', - - # default order is to check for gw first - { - 'gw': 4002, - 'tws': 7497, - 'order': ['gw', 'tws'] - } - ) - order = ports['order'] - - accounts_def = config.load_accounts('ib') - - try_ports = [ports[key] for key in order] - ports = try_ports if port is None else [port] - # TODO: support multiple clients allowing for execution on # multiple accounts (including a paper instance running on the # same machine) and switching between accounts in the EMs @@ -774,9 +758,15 @@ async def load_aio_clients( # from connection details in ``brokers.toml``. for port in ports: client = _client_cache.get((host, port)) - + accounts_found: dict[str, Client] = {} if not client or not client.ib.isConnected(): try: + ib = NonShittyIB() + + # if this is a persistent brokerd, try to allocate + # a new id for each client + client_id = next(_client_ids) + log.info(f"Connecting to the EYEBEE on port {port}!") await ib.connectAsync(host, port, clientId=client_id) @@ -789,7 +779,7 @@ async def load_aio_clients( pps = ib.positions() if pps: for pp in pps: - _accounts2clients[ + accounts_found[ accounts_def.inverse[pp.account] ] = client @@ -798,18 +788,21 @@ async def load_aio_clients( # them for this client for value in ib.accountValues(): acct = value.account - if acct not in _accounts2clients: - _accounts2clients[ + if acct not in accounts_found: + accounts_found[ accounts_def.inverse[acct] ] = client log.info( - f'Loaded accounts: {_accounts2clients} for {client} ' - f'@ {host}:{port}' + f'Loaded accounts for client @ {host}:{port}\n' + f'{pformat(accounts_found)}' ) + # update all actor-global caches log.info(f"Caching client for {(host, port)}") _client_cache[(host, port)] = client + we_connected.append(client) + _accounts2clients.update(accounts_found) except ConnectionRefusedError as ce: _err = ce @@ -826,7 +819,8 @@ async def load_aio_clients( yield client, _client_cache, _accounts2clients except BaseException: - ib.disconnect() + for client in we_connected: + client.ib.disconnect() raise @@ -834,14 +828,16 @@ async def _aio_run_client_method( meth: str, to_trio=None, from_trio=None, + client=None, **kwargs, ) -> None: async with load_aio_clients() as ( - client, + _client, clients, accts2clients, ): + client = client or _client async_meth = getattr(client, meth) # handle streaming methods @@ -855,7 +851,9 @@ async def _aio_run_client_method( async def _trio_run_client_method( method: str, + client: Optional[Client] = None, **kwargs, + ) -> None: """Asyncio entry point to run tasks against the ``ib_insync`` api. @@ -875,12 +873,12 @@ async def _trio_run_client_method( ): kwargs['_treat_as_stream'] = True - result = await tractor.to_asyncio.run_task( + return await tractor.to_asyncio.run_task( _aio_run_client_method, meth=method, + client=client, **kwargs ) - return result class _MethodProxy: @@ -1371,11 +1369,11 @@ def pack_position(pos: Position) -> dict[str, Any]: async def handle_order_requests( ems_order_stream: tractor.MsgStream, + accounts_def: dict[str, str], ) -> None: global _accounts2clients - accounts_def = config.load_accounts('ib') # request_msg: dict async for request_msg in ems_order_stream: @@ -1415,15 +1413,13 @@ async def handle_order_requests( order = BrokerdOrder(**request_msg) # call our client api to submit the order - reqid = await _trio_run_client_method( - - method='submit_limit', + reqid = client.submit_limit( oid=order.oid, symbol=order.symbol, price=order.price, action=order.action, size=order.size, - account=order.account, + account=acct_number, # XXX: by default 0 tells ``ib_insync`` methods that # there is no existing order so ask the client to create @@ -1446,11 +1442,7 @@ async def handle_order_requests( elif action == 'cancel': msg = BrokerdCancel(**request_msg) - - await _trio_run_client_method( - method='submit_cancel', - reqid=msg.reqid - ) + client.submit_cancel(reqid=msg.reqid) else: log.error(f'Unknown order command: {request_msg}') @@ -1467,169 +1459,202 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - ib_trade_events_stream = await _trio_run_client_method( - method='recv_trade_updates', - ) + accounts_def = config.load_accounts('ib') + global _accounts2clients global _client_cache # deliver positions to subscriber before anything else all_positions = {} - for client in _client_cache.values(): - for pos in client.positions(): - msg = pack_position(pos) - all_positions[msg.symbol] = msg.dict() + clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] + for account, client in _accounts2clients.items(): + + # each client to an api endpoint will have it's own event stream + trade_event_stream = await _trio_run_client_method( + method='recv_trade_updates', + client=client, + ) + clients.append((client, trade_event_stream)) + + for client in _client_cache.values(): + for pos in client.positions(): + msg = pack_position(pos) + all_positions[msg.symbol] = msg.dict() await ctx.started(all_positions) - action_map = {'BOT': 'buy', 'SLD': 'sell'} - async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): # start order request handler **before** local trades event loop - n.start_soon(handle_order_requests, ems_stream) + n.start_soon(handle_order_requests, ems_stream, accounts_def) - # TODO: for some reason we can receive a ``None`` here when the - # ib-gw goes down? Not sure exactly how that's happening looking - # at the eventkit code above but we should probably handle it... - async for event_name, item in ib_trade_events_stream: + # allocate event relay tasks for each client connection + for client, stream in clients: + n.start_soon( + deliver_trade_events, + stream, + ems_stream, + accounts_def + ) - log.info(f'ib sending {event_name}:\n{pformat(item)}') + # block until cancelled + await trio.sleep_forever() - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - # XXX: here's some other sucky cases from the api - # - short-sale but securities haven't been located, in this - # case we should probably keep the order in some kind of - # weird state or cancel it outright? +async def deliver_trade_events( - # status='PendingSubmit', message=''), - # status='Cancelled', message='Error 404, - # reqId 1550: Order held while securities are located.'), - # status='PreSubmitted', message='')], + trade_event_stream: trio.MemoryReceiveChannel, + ems_stream: tractor.MsgStream, + accounts_def: dict[str, str], - if event_name == 'status': +) -> None: + '''Format and relay all trade events for a given client to the EMS. - # XXX: begin normalization of nonsense ib_insync internal - # object-state tracking representations... + ''' + action_map = {'BOT': 'buy', 'SLD': 'sell'} - # unwrap needed data from ib_insync internal types - trade: Trade = item - status: OrderStatus = trade.orderStatus + # TODO: for some reason we can receive a ``None`` here when the + # ib-gw goes down? Not sure exactly how that's happening looking + # at the eventkit code above but we should probably handle it... + async for event_name, item in trade_event_stream: - # skip duplicate filled updates - we get the deats - # from the execution details event - msg = BrokerdStatus( + log.info(f'ib sending {event_name}:\n{pformat(item)}') - reqid=trade.order.orderId, - time_ns=time.time_ns(), # cuz why not - # account=client. + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) - # everyone doin camel case.. - status=status.status.lower(), # force lower case + # XXX: here's some other sucky cases from the api + # - short-sale but securities haven't been located, in this + # case we should probably keep the order in some kind of + # weird state or cancel it outright? - filled=status.filled, - reason=status.whyHeld, + # status='PendingSubmit', message=''), + # status='Cancelled', message='Error 404, + # reqId 1550: Order held while securities are located.'), + # status='PreSubmitted', message='')], - # this seems to not be necessarily up to date in the - # execDetails event.. so we have to send it here I guess? - remaining=status.remaining, + if event_name == 'status': - broker_details={'name': 'ib'}, - ) + # XXX: begin normalization of nonsense ib_insync internal + # object-state tracking representations... - elif event_name == 'fill': + # unwrap needed data from ib_insync internal types + trade: Trade = item + status: OrderStatus = trade.orderStatus - # for wtv reason this is a separate event type - # from IB, not sure why it's needed other then for extra - # complexity and over-engineering :eyeroll:. - # we may just end up dropping these events (or - # translating them to ``Status`` msgs) if we can - # show the equivalent status events are no more latent. + # skip duplicate filled updates - we get the deats + # from the execution details event + msg = BrokerdStatus( - # unpack ib_insync types - # pep-0526 style: - # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations - trade: Trade - fill: Fill - trade, fill = item - execu: Execution = fill.execution + reqid=trade.order.orderId, + time_ns=time.time_ns(), # cuz why not + account=accounts_def.inverse[trade.order.account], - # TODO: normalize out commissions details? - details = { - 'contract': asdict(fill.contract), - 'execution': asdict(fill.execution), - 'commissions': asdict(fill.commissionReport), - 'broker_time': execu.time, # supposedly server fill time - 'name': 'ib', - } + # everyone doin camel case.. + status=status.status.lower(), # force lower case - msg = BrokerdFill( - # should match the value returned from `.submit_limit()` - reqid=execu.orderId, - time_ns=time.time_ns(), # cuz why not + filled=status.filled, + reason=status.whyHeld, - action=action_map[execu.side], - size=execu.shares, - price=execu.price, + # this seems to not be necessarily up to date in the + # execDetails event.. so we have to send it here I guess? + remaining=status.remaining, - broker_details=details, - # XXX: required by order mode currently - broker_time=details['broker_time'], + broker_details={'name': 'ib'}, + ) - ) + elif event_name == 'fill': - elif event_name == 'error': + # for wtv reason this is a separate event type + # from IB, not sure why it's needed other then for extra + # complexity and over-engineering :eyeroll:. + # we may just end up dropping these events (or + # translating them to ``Status`` msgs) if we can + # show the equivalent status events are no more latent. - err: dict = item + # unpack ib_insync types + # pep-0526 style: + # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations + trade: Trade + fill: Fill + trade, fill = item + execu: Execution = fill.execution - # f$#$% gawd dammit insync.. - con = err['contract'] - if isinstance(con, Contract): - err['contract'] = asdict(con) + # TODO: normalize out commissions details? + details = { + 'contract': asdict(fill.contract), + 'execution': asdict(fill.execution), + 'commissions': asdict(fill.commissionReport), + 'broker_time': execu.time, # supposedly server fill time + 'name': 'ib', + } - if err['reqid'] == -1: - log.error(f'TWS external order error:\n{pformat(err)}') + msg = BrokerdFill( + # should match the value returned from `.submit_limit()` + reqid=execu.orderId, + time_ns=time.time_ns(), # cuz why not - # don't forward for now, it's unecessary.. but if we wanted to, - # msg = BrokerdError(**err) - continue + action=action_map[execu.side], + size=execu.shares, + price=execu.price, - elif event_name == 'position': - msg = pack_position(item) + broker_details=details, + # XXX: required by order mode currently + broker_time=details['broker_time'], - if getattr(msg, 'reqid', 0) < -1: + ) - # it's a trade event generated by TWS usage. - log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + elif event_name == 'error': - msg.reqid = 'tws-' + str(-1 * msg.reqid) + err: dict = item - # mark msg as from "external system" - # TODO: probably something better then this.. and start - # considering multiplayer/group trades tracking - msg.broker_details['external_src'] = 'tws' - continue + # f$#$% gawd dammit insync.. + con = err['contract'] + if isinstance(con, Contract): + err['contract'] = asdict(con) - # XXX: we always serialize to a dict for msgpack - # translations, ideally we can move to an msgspec (or other) - # encoder # that can be enabled in ``tractor`` ahead of - # time so we can pass through the message types directly. - await ems_stream.send(msg.dict()) + if err['reqid'] == -1: + log.error(f'TWS external order error:\n{pformat(err)}') + + # TODO: what schema for this msg if we're going to make it + # portable across all backends? + # msg = BrokerdError(**err) + continue + + elif event_name == 'position': + msg = pack_position(item) + + if getattr(msg, 'reqid', 0) < -1: + + # it's a trade event generated by TWS usage. + log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + + msg.reqid = 'tws-' + str(-1 * msg.reqid) + + # mark msg as from "external system" + # TODO: probably something better then this.. and start + # considering multiplayer/group trades tracking + msg.broker_details['external_src'] = 'tws' + continue + + # XXX: we always serialize to a dict for msgpack + # translations, ideally we can move to an msgspec (or other) + # encoder # that can be enabled in ``tractor`` ahead of + # time so we can pass through the message types directly. + await ems_stream.send(msg.dict()) @tractor.context From 15aba154f23769365b80c80dce595ce3781e6389 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Sep 2021 10:33:52 -0400 Subject: [PATCH 17/39] Return account name in next order info --- piker/clearing/_allocate.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py index 7549b3c2..999f9ff9 100644 --- a/piker/clearing/_allocate.py +++ b/piker/clearing/_allocate.py @@ -129,7 +129,7 @@ class Allocator(BaseModel): return self.units_limit def account_name(self) -> str: - return self.accounts.inverse[self.account] + return self.accounts.inverse[self.account] def next_order_info( self, @@ -224,6 +224,9 @@ class Allocator(BaseModel): # TODO: incorporate multipliers for relevant derivatives 'fiat_size': round(order_size * price, ndigits=2), 'slots_used': slots_used, + + # update line LHS label with account name + 'account': self.account_name(), } def slots_used( From 5e947e788734a26bfe97b758dafc55165fdea573 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Sep 2021 10:34:14 -0400 Subject: [PATCH 18/39] Maybe show account names on order lines --- piker/ui/_lines.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/piker/ui/_lines.py b/piker/ui/_lines.py index 5572e3c2..acd1e88a 100644 --- a/piker/ui/_lines.py +++ b/piker/ui/_lines.py @@ -665,7 +665,7 @@ def order_line( # display the order pos size, which is some multiple # of the user defined base unit size fmt_str=( - '{size:.{size_digits}f}u{fiat_text}' + '{account_text}{size:.{size_digits}f}u{fiat_text}' ), color=line.color, ) @@ -679,13 +679,23 @@ def order_line( if not fiat_size: return '' - return f' -> ${humanize(fiat_size)}' + return f' ~ ${humanize(fiat_size)}' + + def maybe_show_account_name(fields: dict) -> str: + account = fields.get('account') + if not account: + return '' + + return f'{account}: ' + label.fields = { 'size': size, 'size_digits': 0, 'fiat_size': None, 'fiat_text': maybe_show_fiat_text, + 'account': None, + 'account_text': maybe_show_account_name, } label.orient_v = orient_v From c9eb0b5afb234583b4968e97001073398c4f811a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Sep 2021 10:34:48 -0400 Subject: [PATCH 19/39] Show account name on pp line --- piker/ui/_position.py | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 9bdc8ffd..29fc34b3 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -198,6 +198,8 @@ class SettingsPane: # update bound-in staged order order.price = level order.size = order_info['size'] + # NOTE: the account is set at order stage time + # inside ``OrderMode.line_from_order()``. def position_line( @@ -470,7 +472,6 @@ class PositionTracker: return arrow - # TODO: per account lines on a single (or very related) symbol def update_line( self, price: float, @@ -506,7 +507,10 @@ class PositionTracker: line.update_labels({ 'size': size, 'size_digits': size_digits, - 'fiat_size': round(price * size, ndigits=2) + 'fiat_size': round(price * size, ndigits=2), + + # TODO: per account lines on a single (or very related) symbol + 'account': self.alloc.account_name(), }) line.show() From 87bca9aae1c3e48777a43971a21b85318a4928cf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 9 Sep 2021 10:37:20 -0400 Subject: [PATCH 20/39] Tweak accounts schema to be per-provider --- config/brokers.toml | 18 ++++++++++-------- piker/config.py | 26 +++++++++++++++----------- 2 files changed, 25 insertions(+), 19 deletions(-) diff --git a/config/brokers.toml b/config/brokers.toml index e14fdf20..7d288648 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -14,12 +14,14 @@ private_key = "" [ib] host = "127.0.0.1" -[ib.accounts] -margin = "" -registered = "" -paper = "" +ports.gw = 4002 +ports.tws = 7497 +ports.order = ["gw", "tws",] -[ib.ports] -gw = 4002 -tws = 7497 -order = [ "gw", "tws",] +accounts.margin = "X0000000" +accounts.ira = "X0000000" +accounts.paper = "XX0000000" + +# the order in which accounts will be selected (if found through +# `brokerd`) when a new symbol is loaded +accounts_order = ['paper', 'margin', 'ira'] diff --git a/piker/config.py b/piker/config.py index 60575e2e..2d1a948b 100644 --- a/piker/config.py +++ b/piker/config.py @@ -106,6 +106,7 @@ def write( def load_accounts( + provider: Optional[str] = None ) -> bidict[str, Optional[str]]: @@ -114,17 +115,20 @@ def load_accounts( accounts = bidict({'paper': None}) conf, path = load() - section = conf.get('accounts') - if section is None: - log.warning('No accounts config found?') - else: - for brokername, account_labels in section.items(): - if ( - provider is None or - provider and brokername == provider - ): - for name, value in account_labels.items(): - accounts[f'{brokername}.{name}'] = value + for provider_name, section in conf.items(): + accounts_section = section.get('accounts') + if ( + provider is None or + provider and provider_name == provider + ): + if accounts_section is None: + log.warning(f'No accounts named for {provider_name}?') + continue + else: + for label, value in accounts_section.items(): + accounts[ + f'{provider_name}.{label}' + ] = value return accounts From e1efb0943b1f778b6e7b8b3f31197273d93d4ed3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Sep 2021 11:33:08 -0400 Subject: [PATCH 21/39] Track per-account pps in ems memory --- piker/clearing/_ems.py | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 48c8cce3..63644a5c 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -260,8 +260,15 @@ async def clear_dark_triggers( @dataclass class TradesRelay: + + # for now we keep only a single connection open with + # each ``brokerd`` for simplicity. brokerd_dialogue: tractor.MsgStream - positions: dict[str, float] + + # map of symbols to dicts of accounts to pp msgs + positions: dict[str, dict[str, BrokerdPosition]] + + # count of connected ems clients for this ``brokerd`` consumers: int = 0 @@ -514,10 +521,13 @@ async def translate_and_relay_brokerd_events( pos_msg = BrokerdPosition(**brokerd_msg).dict() - # keep up to date locally in ``emsd`` - relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg) + # XXX: this will be useful for automatic strats yah? + # keep pps per account up to date locally in ``emsd`` mem + relay.positions.setdefault(pos_msg['symbol'], {}).setdefault( + pos_msg['account'], {} + ).update(pos_msg) - # relay through position msgs immediately by + # fan-out-relay position msgs immediately by # broadcasting updates on all client streams for client_stream in router.clients: await client_stream.send(pos_msg) From 4e1bac0071cf7bbc927ed1f5689362a2946036ea Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Sep 2021 11:33:58 -0400 Subject: [PATCH 22/39] Update label on `.show()` --- piker/ui/_label.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/ui/_label.py b/piker/ui/_label.py index 42ab0776..c9a0b2a0 100644 --- a/piker/ui/_label.py +++ b/piker/ui/_label.py @@ -224,6 +224,7 @@ class Label: def show(self) -> None: self.txt.show() + self.txt.update() def hide(self) -> None: self.txt.hide() From 46d3bf0484259c89df800d4a5fe0861b050dd2b4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Sep 2021 11:34:29 -0400 Subject: [PATCH 23/39] Drop commented assert about `form.model` --- piker/ui/_forms.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/piker/ui/_forms.py b/piker/ui/_forms.py index 966192e6..9e7e40ea 100644 --- a/piker/ui/_forms.py +++ b/piker/ui/_forms.py @@ -417,8 +417,6 @@ async def open_form_input_handling( ) -> FieldsForm: - # assert form.model, f'{form} must define a `.model`' - async with open_handlers( list(form.fields.values()), From f9e5769b01b4de4d513a673c4f18a9799edcd413 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Sep 2021 11:35:00 -0400 Subject: [PATCH 24/39] Lintn: add missing space --- piker/data/_source.py | 1 + 1 file changed, 1 insertion(+) diff --git a/piker/data/_source.py b/piker/data/_source.py index 1a8c635d..46302508 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -106,6 +106,7 @@ class Symbol(BaseModel): mult = 1 / self.tick_size return round(value * mult) / mult + @validate_arguments def mk_symbol( From 71afce69d03ad5166a5b6516beab9d27bf51538c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Sep 2021 11:35:30 -0400 Subject: [PATCH 25/39] Append paper account last when loading --- piker/config.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/piker/config.py b/piker/config.py index 2d1a948b..70d9c9c5 100644 --- a/piker/config.py +++ b/piker/config.py @@ -107,20 +107,17 @@ def write( def load_accounts( - provider: Optional[str] = None + providers: Optional[list[str]] = None ) -> bidict[str, Optional[str]]: - # our default paper engine entry - accounts = bidict({'paper': None}) - conf, path = load() - + accounts = bidict() for provider_name, section in conf.items(): accounts_section = section.get('accounts') if ( - provider is None or - provider and provider_name == provider + providers is None or + providers and provider_name in providers ): if accounts_section is None: log.warning(f'No accounts named for {provider_name}?') @@ -131,4 +128,6 @@ def load_accounts( f'{provider_name}.{label}' ] = value + # our default paper engine entry + accounts['paper'] = None return accounts From d25aec53e33790ef8351b9ed43b6a18284766fd4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Sep 2021 11:36:46 -0400 Subject: [PATCH 26/39] Append pp values per account during startup on ib --- piker/brokers/ib.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index a2166f07..d40857a5 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -740,7 +740,7 @@ async def load_aio_clients( ) order = ports['order'] - accounts_def = config.load_accounts('ib') + accounts_def = config.load_accounts(['ib']) try_ports = [ports[key] for key in order] ports = try_ports if port is None else [port] @@ -1459,7 +1459,7 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - accounts_def = config.load_accounts('ib') + accounts_def = config.load_accounts(['ib']) global _accounts2clients global _client_cache @@ -1480,7 +1480,9 @@ async def trades_dialogue( for client in _client_cache.values(): for pos in client.positions(): msg = pack_position(pos) - all_positions[msg.symbol] = msg.dict() + all_positions.setdefault( + msg.symbol, [] + ).append(msg.dict()) await ctx.started(all_positions) From f16591612eb913393f7a6f9703ccc9c418282173 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Sep 2021 11:50:24 -0400 Subject: [PATCH 27/39] Support real-time account switch and status update Make a pp tracker per account and load on order mode boot. Only show details on the pp tracker for the selected account. Make the settings pane assign a `.current_pp` state on the order mode instance (for the charted symbol) on account selection switches and no longer keep a ref to a single pp tracker and allocator in the pane. `SettingsPane.update_status_ui()` now expects an explicit tracker reference as input. Still need to figure out the pnl update task logic despite the intermittent account changes. --- piker/ui/_interaction.py | 13 +-- piker/ui/_position.py | 77 ++++++++++------- piker/ui/order_mode.py | 174 +++++++++++++++++++++++---------------- 3 files changed, 159 insertions(+), 105 deletions(-) diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index 022566d4..d33d553e 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -198,7 +198,7 @@ async def handle_viewmode_kb_inputs( Qt.Key_P, } ): - pp_pane = order_mode.pp.pane + pp_pane = order_mode.current_pp.pane if pp_pane.isHidden(): pp_pane.show() else: @@ -213,7 +213,7 @@ async def handle_viewmode_kb_inputs( if order_keys_pressed: # show the pp size label - order_mode.pp.show() + order_mode.current_pp.show() # TODO: show pp config mini-params in status bar widget # mode.pp_config.show() @@ -259,20 +259,23 @@ async def handle_viewmode_kb_inputs( ) and key in NUMBER_LINE ): - # hot key to set order slots size + # hot key to set order slots size. + # change edit field to current number line value, + # update the pp allocator bar, unhighlight the + # field when ctrl is released. num = int(text) pp_pane = order_mode.pane pp_pane.on_ui_settings_change('slots', num) edit = pp_pane.form.fields['slots'] edit.selectAll() + # un-highlight on ctrl release on_next_release = edit.deselect - pp_pane.update_status_ui() else: # none active # hide pp label - order_mode.pp.hide_info() + order_mode.current_pp.hide_info() # if none are pressed, remove "staged" level # line under cursor position diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 29fc34b3..c3161751 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -50,10 +50,6 @@ class SettingsPane: order entry sizes and position limits per tradable instrument. ''' - # config for and underlying validation model - tracker: PositionTracker - alloc: Allocator - # input fields form: FieldsForm @@ -64,9 +60,8 @@ class SettingsPane: pnl_label: QLabel limit_label: QLabel - def transform_to(self, size_unit: str) -> None: - if self.alloc.size_unit == size_unit: - return + # encompasing high level namespace + order_mode: Optional['OrderMode'] = None # typing: ignore # noqa def on_selection_change( self, @@ -79,7 +74,6 @@ class SettingsPane: ''' log.info(f'selection input: {text}') - setattr(self.alloc, key, text) self.on_ui_settings_change(key, text) def on_ui_settings_change( @@ -92,10 +86,34 @@ class SettingsPane: '''Called on any order pane edit field value change. ''' - alloc = self.alloc + mode = self.order_mode + + if key == 'account': + # an account switch request + + # hide detail on the old pp + old_tracker = mode.current_pp + old_tracker.hide_info() + + # re-assign the order mode tracker + account_name = value + tracker = mode.trackers[account_name] + self.order_mode.current_pp = tracker + assert tracker.alloc.account_name() == account_name + self.form.fields['account'].setCurrentText(account_name) + tracker.show() + tracker.hide_info() + + # load the new account's allocator + alloc = tracker.alloc + + else: + tracker = mode.current_pp + alloc = tracker.alloc + size_unit = alloc.size_unit - # write any passed settings to allocator + # WRITE any settings to current pp's allocator if key == 'limit': if size_unit == 'currency': alloc.currency_limit = float(value) @@ -110,14 +128,10 @@ class SettingsPane: # the current settings in the new units pass - elif key == 'account': - account_name = value or 'paper' - alloc.account = account_name - - else: + elif key != 'account': raise ValueError(f'Unknown setting {key}') - # read out settings and update UI + # READ out settings and update UI log.info(f'settings change: {key}: {value}') suffix = {'currency': ' $', 'units': ' u'}[size_unit] @@ -125,7 +139,7 @@ class SettingsPane: # TODO: a reverse look up from the position to the equivalent # account(s), if none then look to user config for default? - self.update_status_ui() + self.update_status_ui(pp=tracker) step_size, currency_per_slot = alloc.step_sizes() @@ -143,7 +157,6 @@ class SettingsPane: self.form.fields['size_unit'].setCurrentText( alloc._size_units[alloc.size_unit] ) - self.form.fields['account'].setCurrentText(alloc.account_name()) self.form.fields['slots'].setText(str(alloc.slots)) self.form.fields['limit'].setText(str(limit)) @@ -154,13 +167,14 @@ class SettingsPane: def update_status_ui( self, - size: float = None, + + pp: PositionTracker, ) -> None: - alloc = self.alloc + alloc = pp.alloc slots = alloc.slots - used = alloc.slots_used(self.tracker.live_pp) + used = alloc.slots_used(pp.live_pp) # calculate proportion of position size limit # that exists and display in fill bar @@ -173,12 +187,17 @@ class SettingsPane: min(used, slots) ) + # TODO: move to order mode module! doesn't need to be a method since + # we partial in all the state def on_level_change_update_next_order_info( self, level: float, + + # these are all ``partial``-ed in at callback assignment time. line: LevelLine, order: Order, + tracker: PositionTracker, ) -> None: '''A callback applied for each level change to the line @@ -187,9 +206,9 @@ class SettingsPane: ``OrderMode.line_from_order()`` ''' - order_info = self.alloc.next_order_info( - startup_pp=self.tracker.startup_pp, - live_pp=self.tracker.live_pp, + order_info = tracker.alloc.next_order_info( + startup_pp=tracker.startup_pp, + live_pp=tracker.live_pp, price=level, action=order.action, ) @@ -267,8 +286,8 @@ def position_line( class PositionTracker: - '''Track and display a real-time position for a single symbol - on a chart. + '''Track and display real-time positions for a single symbol + over multiple accounts on a single chart. Graphically composed of a level line and marker as well as labels for indcating current position information. Updates are made to the @@ -277,11 +296,12 @@ class PositionTracker: ''' # inputs chart: 'ChartPlotWidget' # noqa + alloc: Allocator startup_pp: Position + live_pp: Position # allocated - live_pp: Position pp_label: Label size_label: Label line: Optional[LevelLine] = None @@ -297,6 +317,7 @@ class PositionTracker: ) -> None: self.chart = chart + self.alloc = alloc self.startup_pp = startup_pp self.live_pp = startup_pp.copy() @@ -375,7 +396,6 @@ class PositionTracker: ''' # live pp updates pp = position or self.live_pp - # pp.update_from_msg(msg) self.update_line( pp.avg_price, @@ -413,7 +433,6 @@ class PositionTracker: def show(self) -> None: if self.live_pp.size: - self.line.show() self.line.show_labels() diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 49a6cb99..f61fa179 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -27,7 +27,6 @@ import time from typing import Optional, Dict, Callable, Any import uuid -from bidict import bidict from pydantic import BaseModel import tractor import trio @@ -36,7 +35,6 @@ from .. import config from ..calc import pnl from ..clearing._client import open_ems, OrderBook from ..clearing._allocate import ( - Allocator, mk_allocator, Position, ) @@ -102,12 +100,12 @@ class OrderMode: lines: LineEditor arrows: ArrowEditor multistatus: MultiStatus - pp: PositionTracker - alloc: 'Allocator' # noqa pane: SettingsPane + trackers: dict[str, PositionTracker] + # switched state, the current position + current_pp: Optional[PositionTracker] = None active: bool = False - name: str = 'order' dialogs: dict[str, OrderDialog] = field(default_factory=dict) @@ -155,6 +153,7 @@ class OrderMode: self.pane.on_level_change_update_next_order_info, line=line, order=order, + tracker=self.current_pp, ) else: @@ -193,7 +192,7 @@ class OrderMode: order = self._staged_order = Order( action=action, price=price, - account=self.alloc.account_name(), + account=self.current_pp.alloc.account_name(), size=0, symbol=symbol, brokers=symbol.brokers, @@ -499,7 +498,7 @@ async def open_order_mode( book: OrderBook trades_stream: tractor.MsgStream - positions: dict + position_msgs: dict # spawn EMS actor-service async with ( @@ -507,7 +506,7 @@ async def open_order_mode( open_ems(brokername, symbol) as ( book, trades_stream, - positions + position_msgs ), trio.open_nursery() as tn, @@ -520,64 +519,91 @@ async def open_order_mode( lines = LineEditor(chart=chart) arrows = ArrowEditor(chart, {}) + # allocation and account settings side pane form = chart.sidepane - # update any from exising positions received from ``brokerd`` + # symbol id symbol = chart.linked.symbol - symkey = chart.linked._symbol.key + symkey = symbol.key - # NOTE: requires that the backend exactly specifies - # the expected symbol key in it's positions msg. - pp_msg = positions.get(symkey) - - # net-zero pp - startup_pp = Position( - symbol=symbol, - size=0, - avg_price=0, - ) + # map of per-provider account keys to position tracker instances + trackers: dict[str, PositionTracker] = {} # load account names from ``brokers.toml`` - accounts = bidict(config.load_accounts()) - # process pps back from broker, only present - # account names reported back from ``brokerd``. - pp_account = None + accounts = config.load_accounts(providers=symbol.brokers) + if accounts: + # first account listed is the one we select at startup + # (aka order based selection). + pp_account = next(iter(accounts.keys())) + else: + pp_account = 'paper' - if pp_msg: - log.info(f'Loading pp for {symkey}:\n{pformat(pp_msg)}') - startup_pp.update_from_msg(pp_msg) - pp_account = accounts.inverse.get(pp_msg.get('account')) + # for each account with this broker, allocate pp tracker components + for config_name, account in accounts.items(): + # net-zero pp + startup_pp = Position( + symbol=symbol, + size=0, + avg_price=0, + ) - # lookup account for this pp or load the user default - # for this backend - - # allocator - alloc = mk_allocator( - symbol=symbol, - accounts=accounts, - account=pp_account, - startup_pp=startup_pp, - ) - form.model = alloc - - pp_tracker = PositionTracker( - chart, - alloc, - startup_pp - ) - pp_tracker.update_from_pp(startup_pp) - - if startup_pp.size == 0: - # if no position, don't show pp tracking graphics + # allocator + alloc = mk_allocator( + symbol=symbol, + accounts=accounts, + account=config_name, + startup_pp=startup_pp, + ) + pp_tracker = PositionTracker( + chart, + alloc, + startup_pp + ) pp_tracker.hide() + trackers[config_name] = pp_tracker + + # TODO: preparse and account-map these msgs and do it all in ONE LOOP! + + # NOTE: requires the backend exactly specifies + # the expected symbol key in its positions msg. + pp_msgs = position_msgs.get(symkey, ()) + + # update all pp trackers with existing data relayed + # from ``brokerd``. + for msg in pp_msgs: + log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') + account_name = accounts.inverse.get(msg.get('account')) + tracker = trackers[account_name] + + # TODO: do we even really need the "startup pp" or can we + # just take the max and pass that into the some state / the + # alloc? + tracker.startup_pp.update_from_msg(msg) + tracker.live_pp.update_from_msg(msg) + + # TODO: + # if this startup size is greater the allocator limit, + # increase the limit to the current pp size which is done + # in this alloc factory.. + # tracker.alloc = mk_allocator( + # symbol=symbol, + # accounts=accounts, + # account=account_name, + # startup_pp=tracker.live_pp, + # ) + + # tracker.update_from_pp(tracker.startup_pp) + tracker.update_from_pp(tracker.live_pp) + + if tracker.startup_pp.size != 0: + # if no position, don't show pp tracking graphics + tracker.show() + + tracker.hide_info() # order pane widgets and allocation model order_pane = SettingsPane( - - tracker=pp_tracker, form=form, - alloc=alloc, - # XXX: ugh, so hideous... fill_bar=form.fill_bar, pnl_label=form.left_label, @@ -585,13 +611,6 @@ async def open_order_mode( limit_label=form.top_label, ) - # set startup limit value read during alloc init - order_pane.on_ui_settings_change('limit', alloc.limit()) - order_pane.on_ui_settings_change('account', pp_account) - - # make fill bar and positioning snapshot - order_pane.update_status_ui(size=startup_pp.size) - # top level abstraction which wraps all this crazyness into # a namespace.. mode = OrderMode( @@ -600,10 +619,18 @@ async def open_order_mode( lines, arrows, multistatus, - pp_tracker, - alloc=alloc, pane=order_pane, + trackers=trackers, + ) + # XXX: MUST be set + order_pane.order_mode = mode + + # select a pp to track + tracker = trackers[pp_account] + mode.current_pp = tracker + tracker.show() + tracker.hide_info() # XXX: would love to not have to do this separate from edit # fields (which are done in an async loop - see below) @@ -619,13 +646,19 @@ async def open_order_mode( ) ) + # make fill bar and positioning snapshot + order_pane.on_ui_settings_change('limit', tracker.alloc.limit()) + order_pane.on_ui_settings_change('account', pp_account) + # order_pane.update_status_ui(pp=tracker.startup_pp) + order_pane.update_status_ui(pp=tracker) + # TODO: create a mode "manager" of sorts? # -> probably just call it "UxModes" err sumthin? # so that view handlers can access it view.order_mode = mode # real-time pnl display task allocation - live_pp = mode.pp.live_pp + live_pp = mode.current_pp.live_pp size = live_pp.size if size: global _pnl_tasks @@ -697,7 +730,7 @@ async def display_pnl( ''' global _pnl_tasks - pp = order_mode.pp + pp = order_mode.current_pp live = pp.live_pp sym = live.symbol.key @@ -727,7 +760,7 @@ async def display_pnl( for tick in iterticks(quote, types): # print(f'{1/period} Hz') - size = live.size + size = order_mode.current_pp.live_pp.size if size == 0: # terminate this update task since we're # no longer in a pp @@ -738,7 +771,8 @@ async def display_pnl( # compute and display pnl status order_mode.pane.pnl_label.format( pnl=copysign(1, size) * pnl( - live.avg_price, + # live.avg_price, + order_mode.current_pp.live_pp.avg_price, tick['price'], ), ) @@ -760,7 +794,6 @@ async def process_trades_and_update_ui( ) -> None: get_index = mode.chart.get_index - tracker = mode.pp global _pnl_tasks # this is where we receive **back** messages @@ -774,16 +807,15 @@ async def process_trades_and_update_ui( if name in ( 'position', ): - # show line label once order is live - sym = mode.chart.linked.symbol if msg['symbol'].lower() in sym.key: + tracker = mode.trackers[msg['account']] tracker.live_pp.update_from_msg(msg) tracker.update_from_pp() # update order pane widgets - mode.pane.update_status_ui() + mode.pane.update_status_ui(tracker.live_pp) if ( tracker.live_pp.size and @@ -795,7 +827,7 @@ async def process_trades_and_update_ui( mode, ) # short circuit to next msg to avoid - # uncessary msg content lookups + # unnecessary msg content lookups continue resp = msg['resp'] From 149bee1058c26960ccbcf6c70568b6c75175c990 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Sep 2021 14:01:29 -0400 Subject: [PATCH 28/39] Create net-zero pps from startup vs. accounts diff --- piker/ui/order_mode.py | 100 +++++++++++++++++++++++------------------ 1 file changed, 56 insertions(+), 44 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index f61fa179..fc2251aa 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -530,7 +530,7 @@ async def open_order_mode( trackers: dict[str, PositionTracker] = {} # load account names from ``brokers.toml`` - accounts = config.load_accounts(providers=symbol.brokers) + accounts = config.load_accounts(providers=symbol.brokers).copy() if accounts: # first account listed is the one we select at startup # (aka order based selection). @@ -538,8 +538,59 @@ async def open_order_mode( else: pp_account = 'paper' - # for each account with this broker, allocate pp tracker components - for config_name, account in accounts.items(): + # NOTE: requires the backend exactly specifies + # the expected symbol key in its positions msg. + pp_msgs = position_msgs.get(symkey, ()) + + # update all pp trackers with existing data relayed + # from ``brokerd``. + for msg in pp_msgs: + + log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') + account_name = accounts.inverse.get(msg.get('account')) + + # net-zero pp + startup_pp = Position( + symbol=symbol, + size=0, + avg_price=0, + ) + startup_pp.update_from_msg(msg) + + # allocator + alloc = mk_allocator( + symbol=symbol, + accounts=accounts, + account=account_name, + + # if this startup size is greater the allocator limit, + # the limit is increased internally in this factory. + startup_pp=startup_pp, + ) + + pp_tracker = PositionTracker( + chart, + alloc, + startup_pp + ) + pp_tracker.hide() + trackers[account_name] = pp_tracker + + assert pp_tracker.startup_pp.size == pp_tracker.live_pp.size + + # TODO: do we even really need the "startup pp" or can we + # just take the max and pass that into the some state / the + # alloc? + pp_tracker.update_from_pp() + + if pp_tracker.startup_pp.size != 0: + # if no position, don't show pp tracking graphics + pp_tracker.show() + pp_tracker.hide_info() + + # fill out trackers for accounts with net-zero pps + for account_name in set(accounts) - set(trackers): + # net-zero pp startup_pp = Position( symbol=symbol, @@ -551,7 +602,7 @@ async def open_order_mode( alloc = mk_allocator( symbol=symbol, accounts=accounts, - account=config_name, + account=account_name, startup_pp=startup_pp, ) pp_tracker = PositionTracker( @@ -560,46 +611,7 @@ async def open_order_mode( startup_pp ) pp_tracker.hide() - trackers[config_name] = pp_tracker - - # TODO: preparse and account-map these msgs and do it all in ONE LOOP! - - # NOTE: requires the backend exactly specifies - # the expected symbol key in its positions msg. - pp_msgs = position_msgs.get(symkey, ()) - - # update all pp trackers with existing data relayed - # from ``brokerd``. - for msg in pp_msgs: - log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') - account_name = accounts.inverse.get(msg.get('account')) - tracker = trackers[account_name] - - # TODO: do we even really need the "startup pp" or can we - # just take the max and pass that into the some state / the - # alloc? - tracker.startup_pp.update_from_msg(msg) - tracker.live_pp.update_from_msg(msg) - - # TODO: - # if this startup size is greater the allocator limit, - # increase the limit to the current pp size which is done - # in this alloc factory.. - # tracker.alloc = mk_allocator( - # symbol=symbol, - # accounts=accounts, - # account=account_name, - # startup_pp=tracker.live_pp, - # ) - - # tracker.update_from_pp(tracker.startup_pp) - tracker.update_from_pp(tracker.live_pp) - - if tracker.startup_pp.size != 0: - # if no position, don't show pp tracking graphics - tracker.show() - - tracker.hide_info() + trackers[account_name] = pp_tracker # order pane widgets and allocation model order_pane = SettingsPane( From b6b3ca15c5bb94b570f59a1f1f07e6be2b8e52bf Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Sep 2021 14:59:42 -0400 Subject: [PATCH 29/39] Activate pnl updates from order mode method on account switches --- piker/ui/_position.py | 2 + piker/ui/order_mode.py | 93 ++++++++++++++++++++++++------------------ 2 files changed, 56 insertions(+), 39 deletions(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index c3161751..6d1c9743 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -104,6 +104,8 @@ class SettingsPane: tracker.show() tracker.hide_info() + mode.display_pnl(tracker) + # load the new account's allocator alloc = tracker.alloc diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index fc2251aa..9ab3c79c 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -96,6 +96,8 @@ class OrderMode: ''' chart: 'ChartPlotWidget' # type: ignore # noqa + nursery: trio.Nursery + quote_feed: Feed book: OrderBook lines: LineEditor arrows: ArrowEditor @@ -474,6 +476,49 @@ class OrderMode: return ids + def display_pnl( + self, + tracker: PositionTracker, + + ) -> bool: + '''Display the PnL for the current symbol and personal positioning (pp). + + If a position is open start a background task which will + real-time update the pnl label in the settings pane. + + ''' + sym = self.chart.linked.symbol + size = tracker.live_pp.size + global _pnl_tasks + if ( + size and + sym.key not in _pnl_tasks + ): + _pnl_tasks[sym.key] = True + + # immediately compute and display pnl status from last quote + self.pane.pnl_label.format( + pnl=copysign(1, size) * pnl( + tracker.live_pp.avg_price, + # last historical close price + self.quote_feed.shm.array[-1][['close']][0], + ), + ) + + log.info( + f'Starting pnl display for {tracker.alloc.account_name()}') + self.nursery.start_soon( + display_pnl, + self.quote_feed, + self, + ) + return True + + else: + # set 0% pnl + self.pane.pnl_label.format(pnl=0) + return False + @asynccontextmanager async def open_order_mode( @@ -627,6 +672,8 @@ async def open_order_mode( # a namespace.. mode = OrderMode( chart, + tn, + feed, book, lines, arrows, @@ -660,7 +707,8 @@ async def open_order_mode( # make fill bar and positioning snapshot order_pane.on_ui_settings_change('limit', tracker.alloc.limit()) - order_pane.on_ui_settings_change('account', pp_account) + # order_pane.on_ui_settings_change('account', pp_account) + # order_pane.update_status_ui(pp=tracker.startup_pp) order_pane.update_status_ui(pp=tracker) @@ -669,31 +717,8 @@ async def open_order_mode( # so that view handlers can access it view.order_mode = mode - # real-time pnl display task allocation - live_pp = mode.current_pp.live_pp - size = live_pp.size - if size: - global _pnl_tasks - - # compute and display pnl status immediately - mode.pane.pnl_label.format( - pnl=copysign(1, size) * pnl( - live_pp.avg_price, - # last historical close price - feed.shm.array[-1][['close']][0], - ), - ) - - # spawn updater task - tn.start_soon( - display_pnl, - feed, - mode, - ) - - else: - # set 0% pnl - mode.pane.pnl_label.format(pnl=0) + mode.display_pnl(mode.current_pp) + order_pane.on_ui_settings_change('account', pp_account) # Begin order-response streaming done() @@ -712,7 +737,6 @@ async def open_order_mode( ), ): - # signal to top level symbol loading task we're ready # to handle input since the ems connection is ready started.set() @@ -732,8 +756,10 @@ _pnl_tasks: dict[str, bool] = {} async def display_pnl( + feed: Feed, order_mode: OrderMode, + ) -> None: '''Real-time display the current pp's PnL in the appropriate label. @@ -744,10 +770,7 @@ async def display_pnl( pp = order_mode.current_pp live = pp.live_pp - sym = live.symbol.key - assert not _pnl_tasks.get(sym) - _pnl_tasks[sym] = True if live.size < 0: types = ('ask', 'last', 'last', 'utrade') @@ -829,15 +852,7 @@ async def process_trades_and_update_ui( # update order pane widgets mode.pane.update_status_ui(tracker.live_pp) - if ( - tracker.live_pp.size and - sym.key not in _pnl_tasks - ): - n.start_soon( - display_pnl, - feed, - mode, - ) + mode.display_pnl(tracker) # short circuit to next msg to avoid # unnecessary msg content lookups continue From 054ddf673237a924dc2e72fc9e3d9b059b30083f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Sep 2021 18:54:04 -0400 Subject: [PATCH 30/39] Send error on non-paper account requests to paperboi --- piker/clearing/_paper_engine.py | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 84e681dd..628f58b9 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -35,7 +35,7 @@ from ..data._normalize import iterticks from ..log import get_logger from ._messages import ( BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, - BrokerdFill, BrokerdPosition, + BrokerdFill, BrokerdPosition, BrokerdError ) @@ -385,6 +385,19 @@ async def handle_order_requests( action = request_msg['action'] if action in {'buy', 'sell'}: + + account = request_msg['account'] + if account != 'paper': + log.error( + 'This is a paper account, only a `paper` selection is valid' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'Paper only. No account found: `{account}` ?', + ).dict()) + continue + # validate order = BrokerdOrder(**request_msg) From c00cf12f94e006f3ee13d0289dcf0a2b495f96a9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 10 Sep 2021 18:54:34 -0400 Subject: [PATCH 31/39] Deliver ems cached pps are dict of lists --- piker/clearing/_ems.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 63644a5c..3c689ff4 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -1010,7 +1010,10 @@ async def _emsd_main( # signal to client that we're started # TODO: we could eventually send back **all** brokerd # positions here? - await ems_ctx.started(relay.positions) + await ems_ctx.started( + {sym: list(pps.values()) + for sym, pps in relay.positions.items()} + ) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates From 8886f11c62466517b35c87c27392801940da59d2 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 11 Sep 2021 10:41:52 -0400 Subject: [PATCH 32/39] Don't allow selecting accounts that haven't been loaded --- piker/ui/_position.py | 46 +++++++++++-------------------------------- 1 file changed, 11 insertions(+), 35 deletions(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 6d1c9743..fb6b143c 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -39,7 +39,6 @@ from ._lines import LevelLine, order_line from ._style import _font from ._forms import FieldsForm, FillStatusBar, QLabel from ..log import get_logger -from ..clearing._messages import Order log = get_logger(__name__) @@ -97,7 +96,17 @@ class SettingsPane: # re-assign the order mode tracker account_name = value - tracker = mode.trackers[account_name] + tracker = mode.trackers.get(account_name) + + if tracker is None: + sym = old_tracker.chart.linked.symbol.key + log.error( + f'Account `{account_name}` can not be set for {sym}' + ) + self.form.fields['account'].setCurrentText( + old_tracker.alloc.account_name()) + return + self.order_mode.current_pp = tracker assert tracker.alloc.account_name() == account_name self.form.fields['account'].setCurrentText(account_name) @@ -189,39 +198,6 @@ class SettingsPane: min(used, slots) ) - # TODO: move to order mode module! doesn't need to be a method since - # we partial in all the state - def on_level_change_update_next_order_info( - self, - - level: float, - - # these are all ``partial``-ed in at callback assignment time. - line: LevelLine, - order: Order, - tracker: PositionTracker, - - ) -> None: - '''A callback applied for each level change to the line - which will recompute the order size based on allocator - settings. this is assigned inside - ``OrderMode.line_from_order()`` - - ''' - order_info = tracker.alloc.next_order_info( - startup_pp=tracker.startup_pp, - live_pp=tracker.live_pp, - price=level, - action=order.action, - ) - line.update_labels(order_info) - - # update bound-in staged order - order.price = level - order.size = order_info['size'] - # NOTE: the account is set at order stage time - # inside ``OrderMode.line_from_order()``. - def position_line( From 7e366d18cb788cb6e91426aa7cbecee1def7a332 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 11 Sep 2021 10:42:32 -0400 Subject: [PATCH 33/39] Handle paper account loading The paper engine returns `"paper"` instead of `None` in the pp msgs so expect that. Don't bother with fills tracking for now (since we'll need either the account in the msg or a lookup table locally for oids to accounts). Change the order line update handler to a local module function, there was no reason for it to be a pane method. --- piker/ui/order_mode.py | 56 +++++++++++++++++++++++++++++++++--------- 1 file changed, 45 insertions(+), 11 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 9ab3c79c..83b193ba 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -75,6 +75,37 @@ class OrderDialog(BaseModel): underscore_attrs_are_private = False +def on_level_change_update_next_order_info( + + level: float, + + # these are all ``partial``-ed in at callback assignment time. + line: LevelLine, + order: Order, + tracker: PositionTracker, + +) -> None: + '''A callback applied for each level change to the line + which will recompute the order size based on allocator + settings. this is assigned inside + ``OrderMode.line_from_order()`` + + ''' + # NOTE: the ``Order.account`` is set at order stage time + # inside ``OrderMode.line_from_order()``. + order_info = tracker.alloc.next_order_info( + startup_pp=tracker.startup_pp, + live_pp=tracker.live_pp, + price=level, + action=order.action, + ) + line.update_labels(order_info) + + # update bound-in staged order + order.price = level + order.size = order_info['size'] + + @dataclass class OrderMode: '''Major UX mode for placing orders on a chart view providing so @@ -152,7 +183,7 @@ class OrderMode: # immediately if order.action != 'alert': line._on_level_change = partial( - self.pane.on_level_change_update_next_order_info, + on_level_change_update_next_order_info, line=line, order=order, tracker=self.current_pp, @@ -592,7 +623,10 @@ async def open_order_mode( for msg in pp_msgs: log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') - account_name = accounts.inverse.get(msg.get('account')) + account_value = msg.get('account') + account_name = accounts.inverse.get(account_value) + if not account_name and account_value == 'paper': + account_name = 'paper' # net-zero pp startup_pp = Position( @@ -634,9 +668,8 @@ async def open_order_mode( pp_tracker.hide_info() # fill out trackers for accounts with net-zero pps - for account_name in set(accounts) - set(trackers): - - # net-zero pp + zero_pp_accounts = set(accounts) - set(trackers) + for account_name in zero_pp_accounts: startup_pp = Position( symbol=symbol, size=0, @@ -709,7 +742,6 @@ async def open_order_mode( order_pane.on_ui_settings_change('limit', tracker.alloc.limit()) # order_pane.on_ui_settings_change('account', pp_account) - # order_pane.update_status_ui(pp=tracker.startup_pp) order_pane.update_status_ui(pp=tracker) # TODO: create a mode "manager" of sorts? @@ -770,7 +802,7 @@ async def display_pnl( pp = order_mode.current_pp live = pp.live_pp - sym = live.symbol.key + key = live.symbol.key if live.size < 0: types = ('ask', 'last', 'last', 'utrade') @@ -814,8 +846,8 @@ async def display_pnl( # last_tick = time.time() finally: - assert _pnl_tasks[sym] - assert _pnl_tasks.pop(sym) + assert _pnl_tasks[key] + assert _pnl_tasks.pop(key) async def process_trades_and_update_ui( @@ -850,7 +882,7 @@ async def process_trades_and_update_ui( tracker.update_from_pp() # update order pane widgets - mode.pane.update_status_ui(tracker.live_pp) + mode.pane.update_status_ui(tracker) mode.display_pnl(tracker) # short circuit to next msg to avoid @@ -942,4 +974,6 @@ async def process_trades_and_update_ui( arrow_index=get_index(details['broker_time']), ) - tracker.live_pp.fills.append(msg) + # TODO: how should we look this up? + # tracker = mode.trackers[msg['account']] + # tracker.live_pp.fills.append(msg) From 6fa8958acf95bc2faba0ecd343ebdca4ace0f733 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 11 Sep 2021 10:56:03 -0400 Subject: [PATCH 34/39] Drop extra method --- piker/ui/_position.py | 14 +------------- piker/ui/order_mode.py | 2 +- 2 files changed, 2 insertions(+), 14 deletions(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index fb6b143c..db250625 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -62,19 +62,6 @@ class SettingsPane: # encompasing high level namespace order_mode: Optional['OrderMode'] = None # typing: ignore # noqa - def on_selection_change( - self, - - text: str, - key: str, - - ) -> None: - '''Called on any order pane drop down selection change. - - ''' - log.info(f'selection input: {text}') - self.on_ui_settings_change(key, text) - def on_ui_settings_change( self, @@ -85,6 +72,7 @@ class SettingsPane: '''Called on any order pane edit field value change. ''' + log.info(f'selection input: {value}') mode = self.order_mode if key == 'account': diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 83b193ba..0460638a 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -733,7 +733,7 @@ async def open_order_mode( w.currentTextChanged.connect( partial( - order_pane.on_selection_change, + order_pane.on_ui_settings_change, key=key, ) ) From f81d47efc42882df300b9ea09b1be7b8c2e269ae Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 11 Sep 2021 13:10:20 -0400 Subject: [PATCH 35/39] Detail some comments --- piker/clearing/_allocate.py | 10 +++++++++- piker/ui/_position.py | 7 +++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py index 999f9ff9..94c7af12 100644 --- a/piker/clearing/_allocate.py +++ b/piker/clearing/_allocate.py @@ -134,6 +134,10 @@ class Allocator(BaseModel): def next_order_info( self, + # we only need a startup size for exit calcs, we can the + # determine how large slots should be if the initial pp size was + # larger then the current live one, and the live one is smaller + # then the initial config settings. startup_pp: Position, live_pp: Position, price: float, @@ -191,9 +195,13 @@ class Allocator(BaseModel): else: slot_size = u_per_slot + # TODO: ensure that the limit can never be set **lower** + # then the current pp size? It should be configured + # correctly at startup right? + # if our position is greater then our limit setting # we'll want to use slot sizes which are larger then what - # the limit would normally determine + # the limit would normally determine. order_size = max(slotted_pp, slot_size) if ( diff --git a/piker/ui/_position.py b/piker/ui/_position.py index db250625..225663bc 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -75,10 +75,10 @@ class SettingsPane: log.info(f'selection input: {value}') mode = self.order_mode + # an account switch request if key == 'account': - # an account switch request - # hide detail on the old pp + # hide details on the old selection old_tracker = mode.current_pp old_tracker.hide_info() @@ -86,6 +86,9 @@ class SettingsPane: account_name = value tracker = mode.trackers.get(account_name) + # if selection can't be found (likely never discovered with + # a ``brokerd`) then error and switch back to the last + # selection. if tracker is None: sym = old_tracker.chart.linked.symbol.key log.error( From 1fe29dc86bbd5be44cd5d752a9aa76143b82142e Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 11 Sep 2021 13:20:44 -0400 Subject: [PATCH 36/39] Revert "Drop extra method" This reverts commit 6fa8958acf95bc2faba0ecd343ebdca4ace0f733. We actually do need it since the selection widget of course won't tell you its "key" that we assign and further we'd have to use a (value, key) style invocation which isn't super pythonic. --- piker/ui/_position.py | 14 +++++++++++++- piker/ui/order_mode.py | 2 +- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 225663bc..77098f6b 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -62,6 +62,19 @@ class SettingsPane: # encompasing high level namespace order_mode: Optional['OrderMode'] = None # typing: ignore # noqa + def on_selection_change( + self, + + text: str, + key: str, + + ) -> None: + '''Called on any order pane drop down selection change. + + ''' + log.info(f'selection input: {text}') + self.on_ui_settings_change(key, text) + def on_ui_settings_change( self, @@ -72,7 +85,6 @@ class SettingsPane: '''Called on any order pane edit field value change. ''' - log.info(f'selection input: {value}') mode = self.order_mode # an account switch request diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 0460638a..83b193ba 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -733,7 +733,7 @@ async def open_order_mode( w.currentTextChanged.connect( partial( - order_pane.on_ui_settings_change, + order_pane.on_selection_change, key=key, ) ) From 2312b6aeb2afb7d3d83fa0efa4091d9019b69471 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 11 Sep 2021 18:15:42 -0400 Subject: [PATCH 37/39] Fix conftest config mod import --- tests/conftest.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index d6ebbbf2..aaa125ce 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,8 +3,8 @@ import os import pytest import tractor import trio -from piker import log -from piker.brokers import questrade, config +from piker import log, config +from piker.brokers import questrade def pytest_addoption(parser): From 21e6bee39b1cadebd7b40525a591f1e1ca750b71 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 11 Sep 2021 18:19:58 -0400 Subject: [PATCH 38/39] Fix legacy import from `QtGui` --- piker/ui/_chart.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 2ae846c8..6ac9f752 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -23,7 +23,7 @@ from typing import Tuple, Dict, Any, Optional from types import ModuleType from functools import partial -from PyQt5 import QtCore, QtGui, QtWidgets +from PyQt5 import QtCore, QtWidgets from PyQt5.QtCore import Qt from PyQt5.QtCore import QEvent from PyQt5.QtWidgets import ( @@ -277,7 +277,7 @@ class ChartnPane(QFrame): ''' sidepane: FieldsForm - hbox: QtGui.QHBoxLayout + hbox: QtWidgets.QHBoxLayout chart: Optional['ChartPlotWidget'] = None def __init__( @@ -293,7 +293,7 @@ class ChartnPane(QFrame): self.sidepane = sidepane self.chart = None - hbox = self.hbox = QtGui.QHBoxLayout(self) + hbox = self.hbox = QtWidgets.QHBoxLayout(self) hbox.setAlignment(Qt.AlignTop | Qt.AlignLeft) hbox.setContentsMargins(0, 0, 0, 0) hbox.setSpacing(3) From ef6594cfc4cc2fc372b8312b5f0fb13dfdb58edc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Sat, 11 Sep 2021 18:41:49 -0400 Subject: [PATCH 39/39] Re-factor pnl display logic into settings pane --- piker/ui/_position.py | 118 +++++++++++++++++++++++++++++++++++++++-- piker/ui/order_mode.py | 118 +---------------------------------------- 2 files changed, 117 insertions(+), 119 deletions(-) diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 77098f6b..07f8c76e 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -21,7 +21,7 @@ Position info and display from __future__ import annotations from dataclasses import dataclass from functools import partial -from math import floor +from math import floor, copysign from typing import Optional @@ -32,8 +32,10 @@ from ._anchors import ( pp_tight_and_right, # wanna keep it straight in the long run gpath_pin, ) -from ..calc import humanize +from ..calc import humanize, pnl from ..clearing._allocate import Allocator, Position +from ..data._normalize import iterticks +from ..data.feed import Feed from ._label import Label from ._lines import LevelLine, order_line from ._style import _font @@ -41,6 +43,70 @@ from ._forms import FieldsForm, FillStatusBar, QLabel from ..log import get_logger log = get_logger(__name__) +_pnl_tasks: dict[str, bool] = {} + + +async def display_pnl( + + feed: Feed, + order_mode: OrderMode, # noqa + +) -> None: + '''Real-time display the current pp's PnL in the appropriate label. + + ``ValueError`` if this task is spawned where there is a net-zero pp. + + ''' + global _pnl_tasks + + pp = order_mode.current_pp + live = pp.live_pp + key = live.symbol.key + + if live.size < 0: + types = ('ask', 'last', 'last', 'utrade') + + elif live.size > 0: + types = ('bid', 'last', 'last', 'utrade') + + else: + raise RuntimeError('No pp?!?!') + + # real-time update pnl on the status pane + try: + async with feed.stream.subscribe() as bstream: + # last_tick = time.time() + async for quotes in bstream: + + # now = time.time() + # period = now - last_tick + + for sym, quote in quotes.items(): + + for tick in iterticks(quote, types): + # print(f'{1/period} Hz') + + size = order_mode.current_pp.live_pp.size + if size == 0: + # terminate this update task since we're + # no longer in a pp + order_mode.pane.pnl_label.format(pnl=0) + return + + else: + # compute and display pnl status + order_mode.pane.pnl_label.format( + pnl=copysign(1, size) * pnl( + # live.avg_price, + order_mode.current_pp.live_pp.avg_price, + tick['price'], + ), + ) + + # last_tick = time.time() + finally: + assert _pnl_tasks[key] + assert _pnl_tasks.pop(key) @dataclass @@ -116,7 +182,7 @@ class SettingsPane: tracker.show() tracker.hide_info() - mode.display_pnl(tracker) + self.display_pnl(tracker) # load the new account's allocator alloc = tracker.alloc @@ -201,6 +267,52 @@ class SettingsPane: min(used, slots) ) + def display_pnl( + self, + tracker: PositionTracker, + + ) -> bool: + '''Display the PnL for the current symbol and personal positioning (pp). + + If a position is open start a background task which will + real-time update the pnl label in the settings pane. + + ''' + mode = self.order_mode + sym = mode.chart.linked.symbol + size = tracker.live_pp.size + feed = mode.quote_feed + global _pnl_tasks + + if ( + size and + sym.key not in _pnl_tasks + ): + _pnl_tasks[sym.key] = True + + # immediately compute and display pnl status from last quote + self.pnl_label.format( + pnl=copysign(1, size) * pnl( + tracker.live_pp.avg_price, + # last historical close price + feed.shm.array[-1][['close']][0], + ), + ) + + log.info( + f'Starting pnl display for {tracker.alloc.account_name()}') + self.order_mode.nursery.start_soon( + display_pnl, + feed, + mode, + ) + return True + + else: + # set 0% pnl + self.pnl_label.format(pnl=0) + return False + def position_line( diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 83b193ba..1a588360 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -21,7 +21,6 @@ Chart trading, the only way to scalp. from contextlib import asynccontextmanager from dataclasses import dataclass, field from functools import partial -from math import copysign from pprint import pformat import time from typing import Optional, Dict, Callable, Any @@ -32,14 +31,12 @@ import tractor import trio from .. import config -from ..calc import pnl from ..clearing._client import open_ems, OrderBook from ..clearing._allocate import ( mk_allocator, Position, ) from ..data._source import Symbol -from ..data._normalize import iterticks from ..data.feed import Feed from ..log import get_logger from ._editors import LineEditor, ArrowEditor @@ -507,49 +504,6 @@ class OrderMode: return ids - def display_pnl( - self, - tracker: PositionTracker, - - ) -> bool: - '''Display the PnL for the current symbol and personal positioning (pp). - - If a position is open start a background task which will - real-time update the pnl label in the settings pane. - - ''' - sym = self.chart.linked.symbol - size = tracker.live_pp.size - global _pnl_tasks - if ( - size and - sym.key not in _pnl_tasks - ): - _pnl_tasks[sym.key] = True - - # immediately compute and display pnl status from last quote - self.pane.pnl_label.format( - pnl=copysign(1, size) * pnl( - tracker.live_pp.avg_price, - # last historical close price - self.quote_feed.shm.array[-1][['close']][0], - ), - ) - - log.info( - f'Starting pnl display for {tracker.alloc.account_name()}') - self.nursery.start_soon( - display_pnl, - self.quote_feed, - self, - ) - return True - - else: - # set 0% pnl - self.pane.pnl_label.format(pnl=0) - return False - @asynccontextmanager async def open_order_mode( @@ -740,8 +694,6 @@ async def open_order_mode( # make fill bar and positioning snapshot order_pane.on_ui_settings_change('limit', tracker.alloc.limit()) - # order_pane.on_ui_settings_change('account', pp_account) - order_pane.update_status_ui(pp=tracker) # TODO: create a mode "manager" of sorts? @@ -749,8 +701,8 @@ async def open_order_mode( # so that view handlers can access it view.order_mode = mode - mode.display_pnl(mode.current_pp) order_pane.on_ui_settings_change('account', pp_account) + mode.pane.display_pnl(mode.current_pp) # Begin order-response streaming done() @@ -784,72 +736,6 @@ async def open_order_mode( yield mode -_pnl_tasks: dict[str, bool] = {} - - -async def display_pnl( - - feed: Feed, - order_mode: OrderMode, - -) -> None: - '''Real-time display the current pp's PnL in the appropriate label. - - Error if this task is spawned where there is a net-zero pp. - - ''' - global _pnl_tasks - - pp = order_mode.current_pp - live = pp.live_pp - key = live.symbol.key - - if live.size < 0: - types = ('ask', 'last', 'last', 'utrade') - - elif live.size > 0: - types = ('bid', 'last', 'last', 'utrade') - - else: - raise RuntimeError('No pp?!?!') - - # real-time update pnl on the status pane - try: - async with feed.stream.subscribe() as bstream: - # last_tick = time.time() - async for quotes in bstream: - - # now = time.time() - # period = now - last_tick - - for sym, quote in quotes.items(): - - for tick in iterticks(quote, types): - # print(f'{1/period} Hz') - - size = order_mode.current_pp.live_pp.size - if size == 0: - # terminate this update task since we're - # no longer in a pp - order_mode.pane.pnl_label.format(pnl=0) - return - - else: - # compute and display pnl status - order_mode.pane.pnl_label.format( - pnl=copysign(1, size) * pnl( - # live.avg_price, - order_mode.current_pp.live_pp.avg_price, - tick['price'], - ), - ) - - # last_tick = time.time() - finally: - assert _pnl_tasks[key] - assert _pnl_tasks.pop(key) - - async def process_trades_and_update_ui( n: trio.Nursery, @@ -884,7 +770,7 @@ async def process_trades_and_update_ui( # update order pane widgets mode.pane.update_status_ui(tracker) - mode.display_pnl(tracker) + mode.pane.display_pnl(tracker) # short circuit to next msg to avoid # unnecessary msg content lookups continue