diff --git a/config/brokers.toml b/config/brokers.toml index e14fdf20..7d288648 100644 --- a/config/brokers.toml +++ b/config/brokers.toml @@ -14,12 +14,14 @@ private_key = "" [ib] host = "127.0.0.1" -[ib.accounts] -margin = "" -registered = "" -paper = "" +ports.gw = 4002 +ports.tws = 7497 +ports.order = ["gw", "tws",] -[ib.ports] -gw = 4002 -tws = 7497 -order = [ "gw", "tws",] +accounts.margin = "X0000000" +accounts.ira = "X0000000" +accounts.paper = "XX0000000" + +# the order in which accounts will be selected (if found through +# `brokerd`) when a new symbol is loaded +accounts_order = ['paper', 'margin', 'ira'] diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 121ad428..d40857a5 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -53,7 +53,7 @@ from ib_insync.client import Client as ib_Client from fuzzywuzzy import process as fuzzy import numpy as np -from . import config +from .. import config from ..log import get_logger, get_console_log from .._daemon import maybe_spawn_brokerd from ..data._source import from_df @@ -62,8 +62,7 @@ from ._util import SymbolNotFound, NoData from ..clearing._messages import ( BrokerdOrder, BrokerdOrderAck, BrokerdStatus, BrokerdPosition, BrokerdCancel, - BrokerdFill, - # BrokerdError, + BrokerdFill, BrokerdError, ) @@ -196,8 +195,8 @@ _adhoc_futes_set = { 'mgc.nymex', 'xagusd.cmdty', # silver spot - 'ni.nymex', # silver futes - 'qi.comex', # mini-silver futes + 'ni.nymex', # silver futes + 'qi.comex', # mini-silver futes } # exchanges we don't support at the moment due to not knowing @@ -220,15 +219,18 @@ class Client: Note: this client requires running inside an ``asyncio`` loop. """ + _contracts: dict[str, Contract] = {} + def __init__( self, + ib: ibis.IB, + ) -> None: self.ib = ib self.ib.RaiseRequestErrors = True # contract cache - self._contracts: dict[str, Contract] = {} self._feeds: dict[str, trio.abc.SendChannel] = {} # NOTE: the ib.client here is "throttled" to 45 rps by default @@ -504,7 +506,7 @@ class Client: return contract, ticker, details # async to be consistent for the client proxy, and cuz why not. - async def submit_limit( + def submit_limit( self, # ignored since ib doesn't support defining your # own order id @@ -513,7 +515,7 @@ class Client: price: float, action: str, size: int, - account: str = '', # if blank the "default" tws account is used + account: str, # if blank the "default" tws account is used # XXX: by default 0 tells ``ib_insync`` methods that there is no # existing order so ask the client to create a new one (which it @@ -536,6 +538,7 @@ class Client: Order( orderId=reqid or 0, # stupid api devs.. action=action.upper(), # BUY/SELL + # lookup the literal account number by name here. account=account, orderType='LMT', lmtPrice=price, @@ -552,7 +555,7 @@ class Client: # their own weird client int counting ids.. return trade.order.orderId - async def submit_cancel( + def submit_cancel( self, reqid: str, ) -> None: @@ -569,6 +572,7 @@ class Client: async def recv_trade_updates( self, to_trio: trio.abc.SendChannel, + ) -> None: """Stream a ticker using the std L1 api. """ @@ -659,9 +663,10 @@ class Client: self.ib.errorEvent.connect(push_err) - async def positions( + def positions( self, account: str = '', + ) -> list[Position]: """ Retrieve position info for ``account``. @@ -695,8 +700,11 @@ def get_config() -> dict[str, Any]: return section +_accounts2clients: dict[str, Client] = {} + + @asynccontextmanager -async def _aio_get_client( +async def load_aio_clients( host: str = '127.0.0.1', port: int = None, @@ -710,91 +718,126 @@ async def _aio_get_client( TODO: consider doing this with a ctx mngr eventually? ''' + global _accounts2clients + global _client_cache + conf = get_config() + ib = None + client = None - # first check cache for existing client + # attempt to get connection info from config; if no .toml entry + # exists, we try to load from a default localhost connection. + host = conf.get('host', '127.0.0.1') + ports = conf.get( + 'ports', + + # default order is to check for gw first + { + 'gw': 4002, + 'tws': 7497, + 'order': ['gw', 'tws'] + } + ) + order = ports['order'] + + accounts_def = config.load_accounts(['ib']) + + try_ports = [ports[key] for key in order] + ports = try_ports if port is None else [port] + + we_connected = [] + # allocate new and/or reload disconnected but cached clients try: - if port: - client = _client_cache[(host, port)] - else: - # grab first cached client - client = list(_client_cache.values())[0] - - if not client.ib.isConnected(): - # we have a stale client to re-allocate - raise KeyError - - yield client - - except (KeyError, IndexError): - - # TODO: in case the arbiter has no record - # of existing brokerd we need to broadcast for one. - - if client_id is None: - # if this is a persistent brokerd, try to allocate a new id for - # each client - client_id = next(_client_ids) - - ib = NonShittyIB() - - # attempt to get connection info from config; if no .toml entry - # exists, we try to load from a default localhost connection. - host = conf.get('host', '127.0.0.1') - ports = conf.get( - 'ports', - - # default order is to check for gw first - { - 'gw': 4002, - 'tws': 7497, - 'order': ['gw', 'tws'] - } - ) - order = ports['order'] - - try_ports = [ports[key] for key in order] - ports = try_ports if port is None else [port] - # TODO: support multiple clients allowing for execution on # multiple accounts (including a paper instance running on the # same machine) and switching between accounts in the EMs _err = None + # (re)load any and all clients that can be found + # from connection details in ``brokers.toml``. for port in ports: - try: - log.info(f"Connecting to the EYEBEE on port {port}!") - await ib.connectAsync(host, port, clientId=client_id) - break - except ConnectionRefusedError as ce: - _err = ce - log.warning(f'Failed to connect on {port}') + client = _client_cache.get((host, port)) + accounts_found: dict[str, Client] = {} + if not client or not client.ib.isConnected(): + try: + ib = NonShittyIB() + + # if this is a persistent brokerd, try to allocate + # a new id for each client + client_id = next(_client_ids) + + log.info(f"Connecting to the EYEBEE on port {port}!") + await ib.connectAsync(host, port, clientId=client_id) + + # create and cache client + client = Client(ib) + + # Pre-collect all accounts available for this + # connection and map account names to this client + # instance. + pps = ib.positions() + if pps: + for pp in pps: + accounts_found[ + accounts_def.inverse[pp.account] + ] = client + + # if there are no positions or accounts + # without positions we should still register + # them for this client + for value in ib.accountValues(): + acct = value.account + if acct not in accounts_found: + accounts_found[ + accounts_def.inverse[acct] + ] = client + + log.info( + f'Loaded accounts for client @ {host}:{port}\n' + f'{pformat(accounts_found)}' + ) + + # update all actor-global caches + log.info(f"Caching client for {(host, port)}") + _client_cache[(host, port)] = client + we_connected.append(client) + _accounts2clients.update(accounts_found) + + except ConnectionRefusedError as ce: + _err = ce + log.warning(f'Failed to connect on {port}') else: - raise ConnectionRefusedError(_err) + if not _client_cache: + raise ConnectionRefusedError(_err) - # create and cache - try: - client = Client(ib) + # retreive first loaded client + clients = list(_client_cache.values()) + if clients: + client = clients[0] - _client_cache[(host, port)] = client - log.debug(f"Caching client for {(host, port)}") + yield client, _client_cache, _accounts2clients - yield client - - except BaseException: - ib.disconnect() - raise + except BaseException: + for client in we_connected: + client.ib.disconnect() + raise async def _aio_run_client_method( meth: str, to_trio=None, from_trio=None, + client=None, **kwargs, ) -> None: - async with _aio_get_client() as client: + async with load_aio_clients() as ( + _client, + clients, + accts2clients, + ): + client = client or _client async_meth = getattr(client, meth) # handle streaming methods @@ -808,7 +851,9 @@ async def _aio_run_client_method( async def _trio_run_client_method( method: str, + client: Optional[Client] = None, **kwargs, + ) -> None: """Asyncio entry point to run tasks against the ``ib_insync`` api. @@ -828,12 +873,12 @@ async def _trio_run_client_method( ): kwargs['_treat_as_stream'] = True - result = await tractor.to_asyncio.run_task( + return await tractor.to_asyncio.run_task( _aio_run_client_method, meth=method, + client=client, **kwargs ) - return result class _MethodProxy: @@ -1081,8 +1126,11 @@ async def _setup_quote_stream( """ global _quote_streams - async with _aio_get_client() as client: - + async with load_aio_clients() as ( + client, + clients, + accts2clients, + ): contract = contract or (await client.find_contract(symbol)) ticker: Ticker = client.ib.reqMktData(contract, ','.join(opts)) @@ -1277,8 +1325,6 @@ async def stream_quotes( calc_price=calc_price ) - # con = quote['contract'] - # topic = '.'.join((con['symbol'], suffix)).lower() quote['symbol'] = topic await send_chan.send({topic: quote}) @@ -1295,12 +1341,21 @@ def pack_position(pos: Position) -> dict[str, Any]: symbol = con.localSymbol.replace(' ', '') else: - symbol = con.symbol + symbol = con.symbol.lower() + + exch = (con.primaryExchange or con.exchange).lower() + symkey = '.'.join((symbol, exch)) + + if not exch: + # attempt to lookup the symbol from our + # hacked set.. + for sym in _adhoc_futes_set: + if symbol in sym: + symkey = sym + break + + # TODO: options contracts into a sane format.. - symkey = '.'.join([ - symbol.lower(), - (con.primaryExchange or con.exchange).lower(), - ]) return BrokerdPosition( broker='ib', account=pos.account, @@ -1314,28 +1369,57 @@ def pack_position(pos: Position) -> dict[str, Any]: async def handle_order_requests( ems_order_stream: tractor.MsgStream, + accounts_def: dict[str, str], ) -> None: + global _accounts2clients + # request_msg: dict async for request_msg in ems_order_stream: log.info(f'Received order request {request_msg}') action = request_msg['action'] + account = request_msg['account'] + + acct_number = accounts_def.get(account) + if not acct_number: + log.error( + f'An IB account number for name {account} is not found?\n' + 'Make sure you have all TWS and GW instances running.' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'No account found: `{account}` ?', + ).dict()) + continue + + client = _accounts2clients.get(account) + if not client: + log.error( + f'An IB client for account name {account} is not found.\n' + 'Make sure you have all TWS and GW instances running.' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'No api client loaded for account: `{account}` ?', + ).dict()) + continue if action in {'buy', 'sell'}: # validate order = BrokerdOrder(**request_msg) # call our client api to submit the order - reqid = await _trio_run_client_method( - - method='submit_limit', + reqid = client.submit_limit( oid=order.oid, symbol=order.symbol, price=order.price, action=order.action, size=order.size, + account=acct_number, # XXX: by default 0 tells ``ib_insync`` methods that # there is no existing order so ask the client to create @@ -1352,16 +1436,13 @@ async def handle_order_requests( # broker specific request id reqid=reqid, time_ns=time.time_ns(), + account=account, ).dict() ) elif action == 'cancel': msg = BrokerdCancel(**request_msg) - - await _trio_run_client_method( - method='submit_cancel', - reqid=msg.reqid - ) + client.submit_cancel(reqid=msg.reqid) else: log.error(f'Unknown order command: {request_msg}') @@ -1378,166 +1459,204 @@ async def trades_dialogue( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - ib_trade_events_stream = await _trio_run_client_method( - method='recv_trade_updates', - ) + accounts_def = config.load_accounts(['ib']) + + global _accounts2clients + global _client_cache # deliver positions to subscriber before anything else - positions = await _trio_run_client_method(method='positions') - all_positions = {} - for pos in positions: - msg = pack_position(pos) - all_positions[msg.symbol] = msg.dict() + clients: list[tuple[Client, trio.MemoryReceiveChannel]] = [] + for account, client in _accounts2clients.items(): + + # each client to an api endpoint will have it's own event stream + trade_event_stream = await _trio_run_client_method( + method='recv_trade_updates', + client=client, + ) + clients.append((client, trade_event_stream)) + + for client in _client_cache.values(): + for pos in client.positions(): + msg = pack_position(pos) + all_positions.setdefault( + msg.symbol, [] + ).append(msg.dict()) await ctx.started(all_positions) - action_map = {'BOT': 'buy', 'SLD': 'sell'} - async with ( ctx.open_stream() as ems_stream, trio.open_nursery() as n, ): # start order request handler **before** local trades event loop - n.start_soon(handle_order_requests, ems_stream) + n.start_soon(handle_order_requests, ems_stream, accounts_def) - # TODO: for some reason we can receive a ``None`` here when the - # ib-gw goes down? Not sure exactly how that's happening looking - # at the eventkit code above but we should probably handle it... - async for event_name, item in ib_trade_events_stream: - print(f' ib sending {item}') + # allocate event relay tasks for each client connection + for client, stream in clients: + n.start_soon( + deliver_trade_events, + stream, + ems_stream, + accounts_def + ) - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) + # block until cancelled + await trio.sleep_forever() - # XXX: here's some other sucky cases from the api - # - short-sale but securities haven't been located, in this - # case we should probably keep the order in some kind of - # weird state or cancel it outright? - # status='PendingSubmit', message=''), - # status='Cancelled', message='Error 404, - # reqId 1550: Order held while securities are located.'), - # status='PreSubmitted', message='')], +async def deliver_trade_events( - if event_name == 'status': + trade_event_stream: trio.MemoryReceiveChannel, + ems_stream: tractor.MsgStream, + accounts_def: dict[str, str], - # XXX: begin normalization of nonsense ib_insync internal - # object-state tracking representations... +) -> None: + '''Format and relay all trade events for a given client to the EMS. - # unwrap needed data from ib_insync internal types - trade: Trade = item - status: OrderStatus = trade.orderStatus + ''' + action_map = {'BOT': 'buy', 'SLD': 'sell'} - # skip duplicate filled updates - we get the deats - # from the execution details event - msg = BrokerdStatus( + # TODO: for some reason we can receive a ``None`` here when the + # ib-gw goes down? Not sure exactly how that's happening looking + # at the eventkit code above but we should probably handle it... + async for event_name, item in trade_event_stream: - reqid=trade.order.orderId, - time_ns=time.time_ns(), # cuz why not + log.info(f'ib sending {event_name}:\n{pformat(item)}') - # everyone doin camel case.. - status=status.status.lower(), # force lower case + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) - filled=status.filled, - reason=status.whyHeld, + # XXX: here's some other sucky cases from the api + # - short-sale but securities haven't been located, in this + # case we should probably keep the order in some kind of + # weird state or cancel it outright? - # this seems to not be necessarily up to date in the - # execDetails event.. so we have to send it here I guess? - remaining=status.remaining, + # status='PendingSubmit', message=''), + # status='Cancelled', message='Error 404, + # reqId 1550: Order held while securities are located.'), + # status='PreSubmitted', message='')], - broker_details={'name': 'ib'}, - ) + if event_name == 'status': - elif event_name == 'fill': + # XXX: begin normalization of nonsense ib_insync internal + # object-state tracking representations... - # for wtv reason this is a separate event type - # from IB, not sure why it's needed other then for extra - # complexity and over-engineering :eyeroll:. - # we may just end up dropping these events (or - # translating them to ``Status`` msgs) if we can - # show the equivalent status events are no more latent. + # unwrap needed data from ib_insync internal types + trade: Trade = item + status: OrderStatus = trade.orderStatus - # unpack ib_insync types - # pep-0526 style: - # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations - trade: Trade - fill: Fill - trade, fill = item - execu: Execution = fill.execution + # skip duplicate filled updates - we get the deats + # from the execution details event + msg = BrokerdStatus( - # TODO: normalize out commissions details? - details = { - 'contract': asdict(fill.contract), - 'execution': asdict(fill.execution), - 'commissions': asdict(fill.commissionReport), - 'broker_time': execu.time, # supposedly server fill time - 'name': 'ib', - } + reqid=trade.order.orderId, + time_ns=time.time_ns(), # cuz why not + account=accounts_def.inverse[trade.order.account], - msg = BrokerdFill( - # should match the value returned from `.submit_limit()` - reqid=execu.orderId, - time_ns=time.time_ns(), # cuz why not + # everyone doin camel case.. + status=status.status.lower(), # force lower case - action=action_map[execu.side], - size=execu.shares, - price=execu.price, + filled=status.filled, + reason=status.whyHeld, - broker_details=details, - # XXX: required by order mode currently - broker_time=details['broker_time'], + # this seems to not be necessarily up to date in the + # execDetails event.. so we have to send it here I guess? + remaining=status.remaining, - ) + broker_details={'name': 'ib'}, + ) - elif event_name == 'error': + elif event_name == 'fill': - err: dict = item + # for wtv reason this is a separate event type + # from IB, not sure why it's needed other then for extra + # complexity and over-engineering :eyeroll:. + # we may just end up dropping these events (or + # translating them to ``Status`` msgs) if we can + # show the equivalent status events are no more latent. - # f$#$% gawd dammit insync.. - con = err['contract'] - if isinstance(con, Contract): - err['contract'] = asdict(con) + # unpack ib_insync types + # pep-0526 style: + # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations + trade: Trade + fill: Fill + trade, fill = item + execu: Execution = fill.execution - if err['reqid'] == -1: - log.error(f'TWS external order error:\n{pformat(err)}') + # TODO: normalize out commissions details? + details = { + 'contract': asdict(fill.contract), + 'execution': asdict(fill.execution), + 'commissions': asdict(fill.commissionReport), + 'broker_time': execu.time, # supposedly server fill time + 'name': 'ib', + } - # don't forward for now, it's unecessary.. but if we wanted to, - # msg = BrokerdError(**err) - continue + msg = BrokerdFill( + # should match the value returned from `.submit_limit()` + reqid=execu.orderId, + time_ns=time.time_ns(), # cuz why not - elif event_name == 'position': - msg = pack_position(item) + action=action_map[execu.side], + size=execu.shares, + price=execu.price, - if getattr(msg, 'reqid', 0) < -1: + broker_details=details, + # XXX: required by order mode currently + broker_time=details['broker_time'], - # it's a trade event generated by TWS usage. - log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + ) - msg.reqid = 'tws-' + str(-1 * msg.reqid) + elif event_name == 'error': - # mark msg as from "external system" - # TODO: probably something better then this.. and start - # considering multiplayer/group trades tracking - msg.broker_details['external_src'] = 'tws' - continue + err: dict = item - # XXX: we always serialize to a dict for msgpack - # translations, ideally we can move to an msgspec (or other) - # encoder # that can be enabled in ``tractor`` ahead of - # time so we can pass through the message types directly. - await ems_stream.send(msg.dict()) + # f$#$% gawd dammit insync.. + con = err['contract'] + if isinstance(con, Contract): + err['contract'] = asdict(con) + + if err['reqid'] == -1: + log.error(f'TWS external order error:\n{pformat(err)}') + + # TODO: what schema for this msg if we're going to make it + # portable across all backends? + # msg = BrokerdError(**err) + continue + + elif event_name == 'position': + msg = pack_position(item) + + if getattr(msg, 'reqid', 0) < -1: + + # it's a trade event generated by TWS usage. + log.info(f"TWS triggered trade\n{pformat(msg.dict())}") + + msg.reqid = 'tws-' + str(-1 * msg.reqid) + + # mark msg as from "external system" + # TODO: probably something better then this.. and start + # considering multiplayer/group trades tracking + msg.broker_details['external_src'] = 'tws' + continue + + # XXX: we always serialize to a dict for msgpack + # translations, ideally we can move to an msgspec (or other) + # encoder # that can be enabled in ``tractor`` ahead of + # time so we can pass through the message types directly. + await ems_stream.send(msg.dict()) @tractor.context diff --git a/piker/brokers/questrade.py b/piker/brokers/questrade.py index 7a06ce76..30ba049e 100644 --- a/piker/brokers/questrade.py +++ b/piker/brokers/questrade.py @@ -43,7 +43,7 @@ import asks from ..calc import humanize, percent_change from .._cacheables import open_cached_client, async_lifo_cache -from . import config +from .. import config from ._util import resproc, BrokerError, SymbolNotFound from ..log import get_logger, colorize_json, get_console_log from . import get_brokermod diff --git a/piker/clearing/_allocate.py b/piker/clearing/_allocate.py new file mode 100644 index 00000000..94c7af12 --- /dev/null +++ b/piker/clearing/_allocate.py @@ -0,0 +1,328 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Position allocation logic and protocols. + +''' +from enum import Enum +from typing import Optional + +from bidict import bidict +from pydantic import BaseModel, validator + +from ..data._source import Symbol +from ._messages import BrokerdPosition, Status + + +class Position(BaseModel): + '''Basic pp (personal position) model with attached fills history. + + This type should be IPC wire ready? + + ''' + symbol: Symbol + + # last size and avg entry price + size: float + avg_price: float # TODO: contextual pricing + + # ordered record of known constituent trade messages + fills: list[Status] = [] + + def update_from_msg( + self, + msg: BrokerdPosition, + + ) -> None: + + # XXX: better place to do this? + symbol = self.symbol + + lot_size_digits = symbol.lot_size_digits + avg_price, size = ( + round(msg['avg_price'], ndigits=symbol.tick_size_digits), + round(msg['size'], ndigits=lot_size_digits), + ) + + self.avg_price = avg_price + self.size = size + + +_size_units = bidict({ + 'currency': '$ size', + 'units': '# units', + # TODO: but we'll need a `.get_accounts()` or something + # 'percent_of_port': '% of port', +}) +SizeUnit = Enum( + 'SizeUnit', + _size_units, +) + + +class Allocator(BaseModel): + + class Config: + validate_assignment = True + copy_on_model_validation = False + arbitrary_types_allowed = True + + # required to get the account validator lookup working? + extra = 'allow' + underscore_attrs_are_private = False + + symbol: Symbol + accounts: bidict[str, Optional[str]] + account: Optional[str] = 'paper' + + @validator('account', pre=False) + def set_account(cls, v, values): + if v: + return values['accounts'][v] + + size_unit: SizeUnit = 'currency' + _size_units: dict[str, Optional[str]] = _size_units + + @validator('size_unit') + def lookup_key(cls, v): + # apply the corresponding enum key for the text "description" value + return v.name + + # TODO: if we ever want ot support non-uniform entry-slot-proportion + # "sizes" + # disti_weight: str = 'uniform' + + units_limit: float + currency_limit: float + slots: int + + def step_sizes( + self, + ) -> (float, float): + '''Return the units size for each unit type as a tuple. + + ''' + slots = self.slots + return ( + self.units_limit / slots, + self.currency_limit / slots, + ) + + def limit(self) -> float: + if self.size_unit == 'currency': + return self.currency_limit + else: + return self.units_limit + + def account_name(self) -> str: + return self.accounts.inverse[self.account] + + def next_order_info( + self, + + # we only need a startup size for exit calcs, we can the + # determine how large slots should be if the initial pp size was + # larger then the current live one, and the live one is smaller + # then the initial config settings. + startup_pp: Position, + live_pp: Position, + price: float, + action: str, + + ) -> dict: + '''Generate order request info for the "next" submittable order + depending on position / order entry config. + + ''' + sym = self.symbol + ld = sym.lot_size_digits + + size_unit = self.size_unit + live_size = live_pp.size + abs_live_size = abs(live_size) + abs_startup_size = abs(startup_pp.size) + + u_per_slot, currency_per_slot = self.step_sizes() + + if size_unit == 'units': + slot_size = u_per_slot + l_sub_pp = self.units_limit - abs_live_size + + elif size_unit == 'currency': + live_cost_basis = abs_live_size * live_pp.avg_price + slot_size = currency_per_slot / price + l_sub_pp = (self.currency_limit - live_cost_basis) / price + + # an entry (adding-to or starting a pp) + if ( + action == 'buy' and live_size > 0 or + action == 'sell' and live_size < 0 or + live_size == 0 + ): + + order_size = min(slot_size, l_sub_pp) + + # an exit (removing-from or going to net-zero pp) + else: + # when exiting a pp we always try to slot the position + # in the instrument's units, since doing so in a derived + # size measure (eg. currency value, percent of port) would + # result in a mis-mapping of slots sizes in unit terms + # (i.e. it would take *more* slots to exit at a profit and + # *less* slots to exit at a loss). + pp_size = max(abs_startup_size, abs_live_size) + slotted_pp = pp_size / self.slots + + if size_unit == 'currency': + # compute the "projected" limit's worth of units at the + # current pp (weighted) price: + slot_size = currency_per_slot / live_pp.avg_price + + else: + slot_size = u_per_slot + + # TODO: ensure that the limit can never be set **lower** + # then the current pp size? It should be configured + # correctly at startup right? + + # if our position is greater then our limit setting + # we'll want to use slot sizes which are larger then what + # the limit would normally determine. + order_size = max(slotted_pp, slot_size) + + if ( + abs_live_size < slot_size or + + # NOTE: front/back "loading" heurstic: + # if the remaining pp is in between 0-1.5x a slot's + # worth, dump the whole position in this last exit + # therefore conducting so called "back loading" but + # **without** going past a net-zero pp. if the pp is + # > 1.5x a slot size, then front load: exit a slot's and + # expect net-zero to be acquired on the final exit. + slot_size < pp_size < round((1.5*slot_size), ndigits=ld) + ): + order_size = abs_live_size + + slots_used = 1.0 # the default uniform policy + if order_size < slot_size: + # compute a fractional slots size to display + slots_used = self.slots_used( + Position(symbol=sym, size=order_size, avg_price=price) + ) + + return { + 'size': abs(round(order_size, ndigits=ld)), + 'size_digits': ld, + + # TODO: incorporate multipliers for relevant derivatives + 'fiat_size': round(order_size * price, ndigits=2), + 'slots_used': slots_used, + + # update line LHS label with account name + 'account': self.account_name(), + } + + def slots_used( + self, + pp: Position, + + ) -> float: + '''Calc and return the number of slots used by this ``Position``. + + ''' + abs_pp_size = abs(pp.size) + + if self.size_unit == 'currency': + # live_currency_size = size or (abs_pp_size * pp.avg_price) + live_currency_size = abs_pp_size * pp.avg_price + prop = live_currency_size / self.currency_limit + + else: + # return (size or abs_pp_size) / alloc.units_limit + prop = abs_pp_size / self.units_limit + + # TODO: REALLY need a way to show partial slots.. + # for now we round at the midway point between slots + return round(prop * self.slots) + + +def mk_allocator( + + symbol: Symbol, + accounts: dict[str, str], + startup_pp: Position, + + # default allocation settings + defaults: dict[str, float] = { + 'account': None, # select paper by default + 'size_unit': _size_units['currency'], + 'units_limit': 400, + 'currency_limit': 5e3, + 'slots': 4, + }, + **kwargs, + +) -> Allocator: + + if kwargs: + defaults.update(kwargs) + + # load and retreive user settings for default allocations + # ``config.toml`` + user_def = { + 'currency_limit': 5e3, + 'slots': 4, + } + + defaults.update(user_def) + + alloc = Allocator( + symbol=symbol, + accounts=accounts, + **defaults, + ) + + asset_type = symbol.type_key + + # specific configs by asset class / type + + if asset_type in ('future', 'option', 'futures_option'): + + # since it's harder to know how currency "applies" in this case + # given leverage properties + alloc.size_unit = '# units' + + # set units limit to slots size thus making make the next + # entry step 1.0 + alloc.units_limit = alloc.slots + + # if the current position is already greater then the limit + # settings, increase the limit to the current position + if alloc.size_unit == 'currency': + startup_size = startup_pp.size * startup_pp.avg_price + + if startup_size > alloc.currency_limit: + alloc.currency_limit = round(startup_size, ndigits=2) + + else: + startup_size = startup_pp.size + + if startup_size > alloc.units_limit: + alloc.units_limit = startup_size + + return alloc diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index f5eeff87..3c689ff4 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -201,6 +201,7 @@ async def clear_dark_triggers( msg = BrokerdOrder( action=cmd['action'], oid=oid, + account=cmd['account'], time_ns=time.time_ns(), # this **creates** new order request for the @@ -259,8 +260,15 @@ async def clear_dark_triggers( @dataclass class TradesRelay: + + # for now we keep only a single connection open with + # each ``brokerd`` for simplicity. brokerd_dialogue: tractor.MsgStream - positions: dict[str, float] + + # map of symbols to dicts of accounts to pp msgs + positions: dict[str, dict[str, BrokerdPosition]] + + # count of connected ems clients for this ``brokerd`` consumers: int = 0 @@ -513,10 +521,13 @@ async def translate_and_relay_brokerd_events( pos_msg = BrokerdPosition(**brokerd_msg).dict() - # keep up to date locally in ``emsd`` - relay.positions.setdefault(pos_msg['symbol'], {}).update(pos_msg) + # XXX: this will be useful for automatic strats yah? + # keep pps per account up to date locally in ``emsd`` mem + relay.positions.setdefault(pos_msg['symbol'], {}).setdefault( + pos_msg['account'], {} + ).update(pos_msg) - # relay through position msgs immediately by + # fan-out-relay position msgs immediately by # broadcasting updates on all client streams for client_stream in router.clients: await client_stream.send(pos_msg) @@ -621,8 +632,11 @@ async def translate_and_relay_brokerd_events( # another stupid ib error to handle # if 10147 in message: cancel + resp = 'broker_errored' + broker_details = msg.dict() + # don't relay message to order requester client - continue + # continue elif name in ( 'status', @@ -741,6 +755,7 @@ async def process_client_order_cmds( oid=oid, reqid=reqid, time_ns=time.time_ns(), + account=live_entry.account, ) # NOTE: cancel response will be relayed back in messages @@ -814,6 +829,7 @@ async def process_client_order_cmds( action=action, price=trigger_price, size=size, + account=msg.account, ) # send request to backend @@ -994,7 +1010,10 @@ async def _emsd_main( # signal to client that we're started # TODO: we could eventually send back **all** brokerd # positions here? - await ems_ctx.started(relay.positions) + await ems_ctx.started( + {sym: list(pps.values()) + for sym, pps in relay.positions.items()} + ) # establish 2-way stream with requesting order-client and # begin handling inbound order requests and updates @@ -1016,6 +1035,7 @@ async def _emsd_main( try: _router.clients.add(ems_client_order_stream) + # main entrypoint, run here until cancelled. await process_client_order_cmds( ems_client_order_stream, @@ -1035,7 +1055,7 @@ async def _emsd_main( dialogues = _router.dialogues - for oid, client_stream in dialogues.items(): + for oid, client_stream in dialogues.copy().items(): if client_stream == ems_client_order_stream: diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py index 126326ab..e7fadccd 100644 --- a/piker/clearing/_messages.py +++ b/piker/clearing/_messages.py @@ -45,6 +45,7 @@ class Order(BaseModel): # internal ``emdsd`` unique "order id" oid: str # uuid4 symbol: Union[str, Symbol] + account: str # should we set a default as '' ? price: float size: float @@ -86,6 +87,7 @@ class Status(BaseModel): # 'broker_cancelled', # 'broker_executed', # 'broker_filled', + # 'broker_errored', # 'alert_submitted', # 'alert_triggered', @@ -118,6 +120,7 @@ class BrokerdCancel(BaseModel): oid: str # piker emsd order id time_ns: int + account: str # "broker request id": broker specific/internal order id if this is # None, creates a new order otherwise if the id is valid the backend # api must modify the existing matching order. If the broker allows @@ -131,6 +134,7 @@ class BrokerdOrder(BaseModel): action: str # {buy, sell} oid: str + account: str time_ns: int # "broker request id": broker specific/internal order id if this is @@ -162,6 +166,7 @@ class BrokerdOrderAck(BaseModel): # emsd id originally sent in matching request msg oid: str + account: str = '' class BrokerdStatus(BaseModel): @@ -170,6 +175,9 @@ class BrokerdStatus(BaseModel): reqid: Union[int, str] time_ns: int + # XXX: should be best effort set for every update + account: str = '' + # { # 'submitted', # 'cancelled', @@ -224,7 +232,11 @@ class BrokerdError(BaseModel): This is still a TODO thing since we're not sure how to employ it yet. ''' name: str = 'error' - reqid: Union[int, str] + oid: str + + # if no brokerd order request was actually submitted (eg. we errored + # at the ``pikerd`` layer) then there will be ``reqid`` allocated. + reqid: Union[int, str] = '' symbol: str reason: str diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 84e681dd..628f58b9 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -35,7 +35,7 @@ from ..data._normalize import iterticks from ..log import get_logger from ._messages import ( BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, - BrokerdFill, BrokerdPosition, + BrokerdFill, BrokerdPosition, BrokerdError ) @@ -385,6 +385,19 @@ async def handle_order_requests( action = request_msg['action'] if action in {'buy', 'sell'}: + + account = request_msg['account'] + if account != 'paper': + log.error( + 'This is a paper account, only a `paper` selection is valid' + ) + await ems_order_stream.send(BrokerdError( + oid=request_msg['oid'], + symbol=request_msg['symbol'], + reason=f'Paper only. No account found: `{account}` ?', + ).dict()) + continue + # validate order = BrokerdOrder(**request_msg) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index e5e9a2d1..22022e84 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -8,8 +8,9 @@ import trio import tractor from ..log import get_console_log, get_logger, colorize_json -from ..brokers import get_brokermod, config +from ..brokers import get_brokermod from .._daemon import _tractor_kwargs +from .. import config log = get_logger('cli') diff --git a/piker/brokers/config.py b/piker/config.py similarity index 78% rename from piker/brokers/config.py rename to piker/config.py index 1fbd8ce1..70d9c9c5 100644 --- a/piker/brokers/config.py +++ b/piker/config.py @@ -22,10 +22,11 @@ from os.path import dirname import shutil from typing import Optional +from bidict import bidict import toml import click -from ..log import get_logger +from .log import get_logger log = get_logger('broker-config') @@ -104,19 +105,29 @@ def write( return toml.dump(config, cf) -def load_accounts() -> dict[str, Optional[str]]: +def load_accounts( - # our default paper engine entry - accounts: dict[str, Optional[str]] = {'paper': None} + providers: Optional[list[str]] = None + +) -> bidict[str, Optional[str]]: conf, path = load() - section = conf.get('accounts') - if section is None: - log.warning('No accounts config found?') - - else: - for brokername, account_labels in section.items(): - for name, value in account_labels.items(): - accounts[f'{brokername}.{name}'] = value + accounts = bidict() + for provider_name, section in conf.items(): + accounts_section = section.get('accounts') + if ( + providers is None or + providers and provider_name in providers + ): + if accounts_section is None: + log.warning(f'No accounts named for {provider_name}?') + continue + else: + for label, value in accounts_section.items(): + accounts[ + f'{provider_name}.{label}' + ] = value + # our default paper engine entry + accounts['paper'] = None return accounts diff --git a/piker/data/_source.py b/piker/data/_source.py index 1a8c635d..46302508 100644 --- a/piker/data/_source.py +++ b/piker/data/_source.py @@ -106,6 +106,7 @@ class Symbol(BaseModel): mult = 1 / self.tick_size return round(value * mult) / mult + @validate_arguments def mk_symbol( diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 2ae846c8..6ac9f752 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -23,7 +23,7 @@ from typing import Tuple, Dict, Any, Optional from types import ModuleType from functools import partial -from PyQt5 import QtCore, QtGui, QtWidgets +from PyQt5 import QtCore, QtWidgets from PyQt5.QtCore import Qt from PyQt5.QtCore import QEvent from PyQt5.QtWidgets import ( @@ -277,7 +277,7 @@ class ChartnPane(QFrame): ''' sidepane: FieldsForm - hbox: QtGui.QHBoxLayout + hbox: QtWidgets.QHBoxLayout chart: Optional['ChartPlotWidget'] = None def __init__( @@ -293,7 +293,7 @@ class ChartnPane(QFrame): self.sidepane = sidepane self.chart = None - hbox = self.hbox = QtGui.QHBoxLayout(self) + hbox = self.hbox = QtWidgets.QHBoxLayout(self) hbox.setAlignment(Qt.AlignTop | Qt.AlignLeft) hbox.setContentsMargins(0, 0, 0, 0) hbox.setSpacing(3) diff --git a/piker/ui/_forms.py b/piker/ui/_forms.py index b504a408..9e7e40ea 100644 --- a/piker/ui/_forms.py +++ b/piker/ui/_forms.py @@ -47,7 +47,7 @@ from PyQt5.QtWidgets import ( from ._event import open_handlers from ._style import hcolor, _font, _font_small, DpiAwareFont from ._label import FormatLabel -from .. import brokers +from .. import config class FontAndChartAwareLineEdit(QLineEdit): @@ -382,21 +382,21 @@ def mk_form( form._font_size = font_size or _font_small.px_size # generate sub-components from schema dict - for key, config in fields_schema.items(): - wtype = config['type'] - label = str(config.get('label', key)) + for key, conf in fields_schema.items(): + wtype = conf['type'] + label = str(conf.get('label', key)) # plain (line) edit field if wtype == 'edit': w = form.add_edit_field( key, label, - config['default_value'] + conf['default_value'] ) # drop-down selection elif wtype == 'select': - values = list(config['default_value']) + values = list(conf['default_value']) w = form.add_select_field( key, label, @@ -417,8 +417,6 @@ async def open_form_input_handling( ) -> FieldsForm: - # assert form.model, f'{form} must define a `.model`' - async with open_handlers( list(form.fields.values()), @@ -635,7 +633,7 @@ def mk_order_pane_layout( # font_size: int = _font_small.px_size - 2 font_size: int = _font.px_size - 2 - accounts = brokers.config.load_accounts() + accounts = config.load_accounts() # TODO: maybe just allocate the whole fields form here # and expect an async ctx entry? diff --git a/piker/ui/_interaction.py b/piker/ui/_interaction.py index 022566d4..d33d553e 100644 --- a/piker/ui/_interaction.py +++ b/piker/ui/_interaction.py @@ -198,7 +198,7 @@ async def handle_viewmode_kb_inputs( Qt.Key_P, } ): - pp_pane = order_mode.pp.pane + pp_pane = order_mode.current_pp.pane if pp_pane.isHidden(): pp_pane.show() else: @@ -213,7 +213,7 @@ async def handle_viewmode_kb_inputs( if order_keys_pressed: # show the pp size label - order_mode.pp.show() + order_mode.current_pp.show() # TODO: show pp config mini-params in status bar widget # mode.pp_config.show() @@ -259,20 +259,23 @@ async def handle_viewmode_kb_inputs( ) and key in NUMBER_LINE ): - # hot key to set order slots size + # hot key to set order slots size. + # change edit field to current number line value, + # update the pp allocator bar, unhighlight the + # field when ctrl is released. num = int(text) pp_pane = order_mode.pane pp_pane.on_ui_settings_change('slots', num) edit = pp_pane.form.fields['slots'] edit.selectAll() + # un-highlight on ctrl release on_next_release = edit.deselect - pp_pane.update_status_ui() else: # none active # hide pp label - order_mode.pp.hide_info() + order_mode.current_pp.hide_info() # if none are pressed, remove "staged" level # line under cursor position diff --git a/piker/ui/_label.py b/piker/ui/_label.py index 42ab0776..c9a0b2a0 100644 --- a/piker/ui/_label.py +++ b/piker/ui/_label.py @@ -224,6 +224,7 @@ class Label: def show(self) -> None: self.txt.show() + self.txt.update() def hide(self) -> None: self.txt.hide() diff --git a/piker/ui/_lines.py b/piker/ui/_lines.py index 5572e3c2..acd1e88a 100644 --- a/piker/ui/_lines.py +++ b/piker/ui/_lines.py @@ -665,7 +665,7 @@ def order_line( # display the order pos size, which is some multiple # of the user defined base unit size fmt_str=( - '{size:.{size_digits}f}u{fiat_text}' + '{account_text}{size:.{size_digits}f}u{fiat_text}' ), color=line.color, ) @@ -679,13 +679,23 @@ def order_line( if not fiat_size: return '' - return f' -> ${humanize(fiat_size)}' + return f' ~ ${humanize(fiat_size)}' + + def maybe_show_account_name(fields: dict) -> str: + account = fields.get('account') + if not account: + return '' + + return f'{account}: ' + label.fields = { 'size': size, 'size_digits': 0, 'fiat_size': None, 'fiat_text': maybe_show_fiat_text, + 'account': None, + 'account_text': maybe_show_account_name, } label.orient_v = orient_v diff --git a/piker/ui/_position.py b/piker/ui/_position.py index 3d5364ad..07f8c76e 100644 --- a/piker/ui/_position.py +++ b/piker/ui/_position.py @@ -20,234 +20,93 @@ Position info and display """ from __future__ import annotations from dataclasses import dataclass -from enum import Enum from functools import partial -from math import floor +from math import floor, copysign from typing import Optional -from bidict import bidict from pyqtgraph import functions as fn -from pydantic import BaseModel, validator from ._annotate import LevelMarker from ._anchors import ( pp_tight_and_right, # wanna keep it straight in the long run gpath_pin, ) -from ..calc import humanize -from ..clearing._messages import BrokerdPosition, Status -from ..data._source import Symbol +from ..calc import humanize, pnl +from ..clearing._allocate import Allocator, Position +from ..data._normalize import iterticks +from ..data.feed import Feed from ._label import Label from ._lines import LevelLine, order_line from ._style import _font from ._forms import FieldsForm, FillStatusBar, QLabel from ..log import get_logger -from ..clearing._messages import Order log = get_logger(__name__) +_pnl_tasks: dict[str, bool] = {} -class Position(BaseModel): - '''Basic pp (personal position) model with attached fills history. +async def display_pnl( - This type should be IPC wire ready? + feed: Feed, + order_mode: OrderMode, # noqa + +) -> None: + '''Real-time display the current pp's PnL in the appropriate label. + + ``ValueError`` if this task is spawned where there is a net-zero pp. ''' - symbol: Symbol + global _pnl_tasks - # last size and avg entry price - size: float - avg_price: float # TODO: contextual pricing + pp = order_mode.current_pp + live = pp.live_pp + key = live.symbol.key - # ordered record of known constituent trade messages - fills: list[Status] = [] + if live.size < 0: + types = ('ask', 'last', 'last', 'utrade') + elif live.size > 0: + types = ('bid', 'last', 'last', 'utrade') -_size_units = bidict({ - 'currency': '$ size', - 'units': '# units', - # TODO: but we'll need a `.get_accounts()` or something - # 'percent_of_port': '% of port', -}) -SizeUnit = Enum( - 'SizeUnit', - _size_units, -) + else: + raise RuntimeError('No pp?!?!') + # real-time update pnl on the status pane + try: + async with feed.stream.subscribe() as bstream: + # last_tick = time.time() + async for quotes in bstream: -class Allocator(BaseModel): + # now = time.time() + # period = now - last_tick - class Config: - validate_assignment = True - copy_on_model_validation = False - arbitrary_types_allowed = True + for sym, quote in quotes.items(): - # required to get the account validator lookup working? - extra = 'allow' - # underscore_attrs_are_private = False + for tick in iterticks(quote, types): + # print(f'{1/period} Hz') - symbol: Symbol + size = order_mode.current_pp.live_pp.size + if size == 0: + # terminate this update task since we're + # no longer in a pp + order_mode.pane.pnl_label.format(pnl=0) + return - account: Optional[str] = 'paper' - _accounts: bidict[str, Optional[str]] + else: + # compute and display pnl status + order_mode.pane.pnl_label.format( + pnl=copysign(1, size) * pnl( + # live.avg_price, + order_mode.current_pp.live_pp.avg_price, + tick['price'], + ), + ) - @validator('account', pre=True) - def set_account(cls, v, values): - if v: - return values['_accounts'][v] - - size_unit: SizeUnit = 'currency' - _size_units: dict[str, Optional[str]] = _size_units - - @validator('size_unit') - def lookup_key(cls, v): - # apply the corresponding enum key for the text "description" value - return v.name - - # TODO: if we ever want ot support non-uniform entry-slot-proportion - # "sizes" - # disti_weight: str = 'uniform' - - units_limit: float - currency_limit: float - slots: int - - def step_sizes( - self, - ) -> (float, float): - '''Return the units size for each unit type as a tuple. - - ''' - slots = self.slots - return ( - self.units_limit / slots, - self.currency_limit / slots, - ) - - def limit(self) -> float: - if self.size_unit == 'currency': - return self.currency_limit - else: - return self.units_limit - - def next_order_info( - self, - - startup_pp: Position, - live_pp: Position, - price: float, - action: str, - - ) -> dict: - '''Generate order request info for the "next" submittable order - depending on position / order entry config. - - ''' - sym = self.symbol - ld = sym.lot_size_digits - - size_unit = self.size_unit - live_size = live_pp.size - abs_live_size = abs(live_size) - abs_startup_size = abs(startup_pp.size) - - u_per_slot, currency_per_slot = self.step_sizes() - - if size_unit == 'units': - slot_size = u_per_slot - l_sub_pp = self.units_limit - abs_live_size - - elif size_unit == 'currency': - live_cost_basis = abs_live_size * live_pp.avg_price - slot_size = currency_per_slot / price - l_sub_pp = (self.currency_limit - live_cost_basis) / price - - # an entry (adding-to or starting a pp) - if ( - action == 'buy' and live_size > 0 or - action == 'sell' and live_size < 0 or - live_size == 0 - ): - - order_size = min(slot_size, l_sub_pp) - - # an exit (removing-from or going to net-zero pp) - else: - # when exiting a pp we always try to slot the position - # in the instrument's units, since doing so in a derived - # size measure (eg. currency value, percent of port) would - # result in a mis-mapping of slots sizes in unit terms - # (i.e. it would take *more* slots to exit at a profit and - # *less* slots to exit at a loss). - pp_size = max(abs_startup_size, abs_live_size) - slotted_pp = pp_size / self.slots - - if size_unit == 'currency': - # compute the "projected" limit's worth of units at the - # current pp (weighted) price: - slot_size = currency_per_slot / live_pp.avg_price - - else: - slot_size = u_per_slot - - # if our position is greater then our limit setting - # we'll want to use slot sizes which are larger then what - # the limit would normally determine - order_size = max(slotted_pp, slot_size) - - if ( - abs_live_size < slot_size or - - # NOTE: front/back "loading" heurstic: - # if the remaining pp is in between 0-1.5x a slot's - # worth, dump the whole position in this last exit - # therefore conducting so called "back loading" but - # **without** going past a net-zero pp. if the pp is - # > 1.5x a slot size, then front load: exit a slot's and - # expect net-zero to be acquired on the final exit. - slot_size < pp_size < round((1.5*slot_size), ndigits=ld) - ): - order_size = abs_live_size - - slots_used = 1.0 # the default uniform policy - if order_size < slot_size: - # compute a fractional slots size to display - slots_used = self.slots_used( - Position(symbol=sym, size=order_size, avg_price=price) - ) - - return { - 'size': abs(round(order_size, ndigits=ld)), - 'size_digits': ld, - - # TODO: incorporate multipliers for relevant derivatives - 'fiat_size': round(order_size * price, ndigits=2), - 'slots_used': slots_used, - } - - def slots_used( - self, - pp: Position, - - ) -> float: - '''Calc and return the number of slots used by this ``Position``. - - ''' - abs_pp_size = abs(pp.size) - - if self.size_unit == 'currency': - # live_currency_size = size or (abs_pp_size * pp.avg_price) - live_currency_size = abs_pp_size * pp.avg_price - prop = live_currency_size / self.currency_limit - - else: - # return (size or abs_pp_size) / alloc.units_limit - prop = abs_pp_size / self.units_limit - - # TODO: REALLY need a way to show partial slots.. - # for now we round at the midway point between slots - return round(prop * self.slots) + # last_tick = time.time() + finally: + assert _pnl_tasks[key] + assert _pnl_tasks.pop(key) @dataclass @@ -256,10 +115,6 @@ class SettingsPane: order entry sizes and position limits per tradable instrument. ''' - # config for and underlying validation model - tracker: PositionTracker - alloc: Allocator - # input fields form: FieldsForm @@ -270,9 +125,8 @@ class SettingsPane: pnl_label: QLabel limit_label: QLabel - def transform_to(self, size_unit: str) -> None: - if self.alloc.size_unit == size_unit: - return + # encompasing high level namespace + order_mode: Optional['OrderMode'] = None # typing: ignore # noqa def on_selection_change( self, @@ -284,8 +138,7 @@ class SettingsPane: '''Called on any order pane drop down selection change. ''' - print(f'selection input: {text}') - setattr(self.alloc, key, text) + log.info(f'selection input: {text}') self.on_ui_settings_change(key, text) def on_ui_settings_change( @@ -298,11 +151,49 @@ class SettingsPane: '''Called on any order pane edit field value change. ''' - print(f'settings change: {key}: {value}') - alloc = self.alloc + mode = self.order_mode + + # an account switch request + if key == 'account': + + # hide details on the old selection + old_tracker = mode.current_pp + old_tracker.hide_info() + + # re-assign the order mode tracker + account_name = value + tracker = mode.trackers.get(account_name) + + # if selection can't be found (likely never discovered with + # a ``brokerd`) then error and switch back to the last + # selection. + if tracker is None: + sym = old_tracker.chart.linked.symbol.key + log.error( + f'Account `{account_name}` can not be set for {sym}' + ) + self.form.fields['account'].setCurrentText( + old_tracker.alloc.account_name()) + return + + self.order_mode.current_pp = tracker + assert tracker.alloc.account_name() == account_name + self.form.fields['account'].setCurrentText(account_name) + tracker.show() + tracker.hide_info() + + self.display_pnl(tracker) + + # load the new account's allocator + alloc = tracker.alloc + + else: + tracker = mode.current_pp + alloc = tracker.alloc + size_unit = alloc.size_unit - # write any passed settings to allocator + # WRITE any settings to current pp's allocator if key == 'limit': if size_unit == 'currency': alloc.currency_limit = float(value) @@ -317,20 +208,18 @@ class SettingsPane: # the current settings in the new units pass - elif key == 'account': - print(f'TODO: change account -> {value}') - - else: + elif key != 'account': raise ValueError(f'Unknown setting {key}') - # read out settings and update UI + # READ out settings and update UI + log.info(f'settings change: {key}: {value}') suffix = {'currency': ' $', 'units': ' u'}[size_unit] limit = alloc.limit() # TODO: a reverse look up from the position to the equivalent # account(s), if none then look to user config for default? - self.update_status_ui() + self.update_status_ui(pp=tracker) step_size, currency_per_slot = alloc.step_sizes() @@ -356,68 +245,16 @@ class SettingsPane: # UI in some way? return True - def init_status_ui( - self, - ): - alloc = self.alloc - asset_type = alloc.symbol.type_key - # form = self.form - - # TODO: pull from piker.toml - # default config - slots = 4 - currency_limit = 5e3 - - startup_pp = self.tracker.startup_pp - - alloc.slots = slots - alloc.currency_limit = currency_limit - - # default entry sizing - if asset_type in ('stock', 'crypto', 'forex'): - - alloc.size_unit = '$ size' - - elif asset_type in ('future', 'option', 'futures_option'): - - # since it's harder to know how currency "applies" in this case - # given leverage properties - alloc.size_unit = '# units' - - # set units limit to slots size thus making make the next - # entry step 1.0 - alloc.units_limit = slots - - # if the current position is already greater then the limit - # settings, increase the limit to the current position - if alloc.size_unit == 'currency': - startup_size = startup_pp.size * startup_pp.avg_price - - if startup_size > alloc.currency_limit: - alloc.currency_limit = round(startup_size, ndigits=2) - - limit_text = alloc.currency_limit - - else: - startup_size = startup_pp.size - - if startup_size > alloc.units_limit: - alloc.units_limit = startup_size - - limit_text = alloc.units_limit - - self.on_ui_settings_change('limit', limit_text) - self.update_status_ui(size=startup_size) - def update_status_ui( self, - size: float = None, + + pp: PositionTracker, ) -> None: - alloc = self.alloc + alloc = pp.alloc slots = alloc.slots - used = alloc.slots_used(self.tracker.live_pp) + used = alloc.slots_used(pp.live_pp) # calculate proportion of position size limit # that exists and display in fill bar @@ -430,31 +267,51 @@ class SettingsPane: min(used, slots) ) - def on_level_change_update_next_order_info( + def display_pnl( self, + tracker: PositionTracker, - level: float, - line: LevelLine, - order: Order, + ) -> bool: + '''Display the PnL for the current symbol and personal positioning (pp). - ) -> None: - '''A callback applied for each level change to the line - which will recompute the order size based on allocator - settings. this is assigned inside - ``OrderMode.line_from_order()`` + If a position is open start a background task which will + real-time update the pnl label in the settings pane. ''' - order_info = self.alloc.next_order_info( - startup_pp=self.tracker.startup_pp, - live_pp=self.tracker.live_pp, - price=level, - action=order.action, - ) - line.update_labels(order_info) + mode = self.order_mode + sym = mode.chart.linked.symbol + size = tracker.live_pp.size + feed = mode.quote_feed + global _pnl_tasks - # update bound-in staged order - order.price = level - order.size = order_info['size'] + if ( + size and + sym.key not in _pnl_tasks + ): + _pnl_tasks[sym.key] = True + + # immediately compute and display pnl status from last quote + self.pnl_label.format( + pnl=copysign(1, size) * pnl( + tracker.live_pp.avg_price, + # last historical close price + feed.shm.array[-1][['close']][0], + ), + ) + + log.info( + f'Starting pnl display for {tracker.alloc.account_name()}') + self.order_mode.nursery.start_soon( + display_pnl, + feed, + mode, + ) + return True + + else: + # set 0% pnl + self.pnl_label.format(pnl=0) + return False def position_line( @@ -522,8 +379,8 @@ def position_line( class PositionTracker: - '''Track and display a real-time position for a single symbol - on a chart. + '''Track and display real-time positions for a single symbol + over multiple accounts on a single chart. Graphically composed of a level line and marker as well as labels for indcating current position information. Updates are made to the @@ -532,11 +389,12 @@ class PositionTracker: ''' # inputs chart: 'ChartPlotWidget' # noqa - alloc: Allocator - # allocated + alloc: Allocator startup_pp: Position live_pp: Position + + # allocated pp_label: Label size_label: Label line: Optional[LevelLine] = None @@ -547,17 +405,15 @@ class PositionTracker: self, chart: 'ChartPlotWidget', # noqa alloc: Allocator, + startup_pp: Position, ) -> None: self.chart = chart + self.alloc = alloc - self.live_pp = Position( - symbol=chart.linked.symbol, - size=0, - avg_price=0, - ) - self.startup_pp = self.live_pp.copy() + self.startup_pp = startup_pp + self.live_pp = startup_pp.copy() view = chart.getViewBox() @@ -622,9 +478,8 @@ class PositionTracker: self.pp_label.update() self.size_label.update() - def update_from_pp_msg( + def update_from_pp( self, - msg: BrokerdPosition, position: Optional[Position] = None, ) -> None: @@ -632,23 +487,13 @@ class PositionTracker: EMS ``BrokerdPosition`` msg. ''' - # XXX: better place to do this? - symbol = self.chart.linked.symbol - lot_size_digits = symbol.lot_size_digits - avg_price, size = ( - round(msg['avg_price'], ndigits=symbol.tick_size_digits), - round(msg['size'], ndigits=lot_size_digits), - ) - # live pp updates pp = position or self.live_pp - pp.avg_price = avg_price - pp.size = size self.update_line( - avg_price, - size, - lot_size_digits, + pp.avg_price, + pp.size, + self.chart.linked.symbol.lot_size_digits, ) # label updates @@ -656,11 +501,11 @@ class PositionTracker: self.alloc.slots_used(pp), ndigits=1) self.size_label.render() - if size == 0: + if pp.size == 0: self.hide() else: - self._level_marker.level = avg_price + self._level_marker.level = pp.avg_price # these updates are critical to avoid lag on view/scene changes self._level_marker.update() # trigger paint @@ -681,7 +526,6 @@ class PositionTracker: def show(self) -> None: if self.live_pp.size: - self.line.show() self.line.show_labels() @@ -740,7 +584,6 @@ class PositionTracker: return arrow - # TODO: per account lines on a single (or very related) symbol def update_line( self, price: float, @@ -776,7 +619,10 @@ class PositionTracker: line.update_labels({ 'size': size, 'size_digits': size_digits, - 'fiat_size': round(price * size, ndigits=2) + 'fiat_size': round(price * size, ndigits=2), + + # TODO: per account lines on a single (or very related) symbol + 'account': self.alloc.account_name(), }) line.show() diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 85555bad..1a588360 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -21,27 +21,30 @@ Chart trading, the only way to scalp. from contextlib import asynccontextmanager from dataclasses import dataclass, field from functools import partial -from math import copysign from pprint import pformat import time from typing import Optional, Dict, Callable, Any import uuid -from bidict import bidict from pydantic import BaseModel import tractor import trio -from .. import brokers -from ..calc import pnl +from .. import config from ..clearing._client import open_ems, OrderBook +from ..clearing._allocate import ( + mk_allocator, + Position, +) from ..data._source import Symbol -from ..data._normalize import iterticks from ..data.feed import Feed from ..log import get_logger from ._editors import LineEditor, ArrowEditor from ._lines import order_line, LevelLine -from ._position import PositionTracker, SettingsPane, Allocator, _size_units +from ._position import ( + PositionTracker, + SettingsPane, +) from ._window import MultiStatus from ..clearing._messages import Order from ._forms import open_form_input_handling @@ -69,6 +72,37 @@ class OrderDialog(BaseModel): underscore_attrs_are_private = False +def on_level_change_update_next_order_info( + + level: float, + + # these are all ``partial``-ed in at callback assignment time. + line: LevelLine, + order: Order, + tracker: PositionTracker, + +) -> None: + '''A callback applied for each level change to the line + which will recompute the order size based on allocator + settings. this is assigned inside + ``OrderMode.line_from_order()`` + + ''' + # NOTE: the ``Order.account`` is set at order stage time + # inside ``OrderMode.line_from_order()``. + order_info = tracker.alloc.next_order_info( + startup_pp=tracker.startup_pp, + live_pp=tracker.live_pp, + price=level, + action=order.action, + ) + line.update_labels(order_info) + + # update bound-in staged order + order.price = level + order.size = order_info['size'] + + @dataclass class OrderMode: '''Major UX mode for placing orders on a chart view providing so @@ -90,16 +124,18 @@ class OrderMode: ''' chart: 'ChartPlotWidget' # type: ignore # noqa + nursery: trio.Nursery + quote_feed: Feed book: OrderBook lines: LineEditor arrows: ArrowEditor multistatus: MultiStatus - pp: PositionTracker - allocator: 'Allocator' # noqa pane: SettingsPane + trackers: dict[str, PositionTracker] + # switched state, the current position + current_pp: Optional[PositionTracker] = None active: bool = False - name: str = 'order' dialogs: dict[str, OrderDialog] = field(default_factory=dict) @@ -144,9 +180,10 @@ class OrderMode: # immediately if order.action != 'alert': line._on_level_change = partial( - self.pane.on_level_change_update_next_order_info, + on_level_change_update_next_order_info, line=line, order=order, + tracker=self.current_pp, ) else: @@ -185,6 +222,7 @@ class OrderMode: order = self._staged_order = Order( action=action, price=price, + account=self.current_pp.alloc.account_name(), size=0, symbol=symbol, brokers=symbol.brokers, @@ -490,7 +528,7 @@ async def open_order_mode( book: OrderBook trades_stream: tractor.MsgStream - positions: dict + position_msgs: dict # spawn EMS actor-service async with ( @@ -498,9 +536,9 @@ async def open_order_mode( open_ems(brokername, symbol) as ( book, trades_stream, - positions + position_msgs ), - trio.open_nursery() as n, + trio.open_nursery() as tn, ): log.info(f'Opening order mode for {brokername}.{symbol.key}') @@ -511,37 +549,135 @@ async def open_order_mode( lines = LineEditor(chart=chart) arrows = ArrowEditor(chart, {}) - # load account names from ``brokers.toml`` - accounts = bidict(brokers.config.load_accounts()) - - # allocator - alloc = Allocator( - symbol=symbol, - account=None, # select paper by default - _accounts=accounts, - size_unit=_size_units['currency'], - units_limit=400, - currency_limit=5e3, - slots=4, - ) - + # allocation and account settings side pane form = chart.sidepane - form.model = alloc - pp_tracker = PositionTracker(chart, alloc) - pp_tracker.hide() + # symbol id + symbol = chart.linked.symbol + symkey = symbol.key + + # map of per-provider account keys to position tracker instances + trackers: dict[str, PositionTracker] = {} + + # load account names from ``brokers.toml`` + accounts = config.load_accounts(providers=symbol.brokers).copy() + if accounts: + # first account listed is the one we select at startup + # (aka order based selection). + pp_account = next(iter(accounts.keys())) + else: + pp_account = 'paper' + + # NOTE: requires the backend exactly specifies + # the expected symbol key in its positions msg. + pp_msgs = position_msgs.get(symkey, ()) + + # update all pp trackers with existing data relayed + # from ``brokerd``. + for msg in pp_msgs: + + log.info(f'Loading pp for {symkey}:\n{pformat(msg)}') + account_value = msg.get('account') + account_name = accounts.inverse.get(account_value) + if not account_name and account_value == 'paper': + account_name = 'paper' + + # net-zero pp + startup_pp = Position( + symbol=symbol, + size=0, + avg_price=0, + ) + startup_pp.update_from_msg(msg) + + # allocator + alloc = mk_allocator( + symbol=symbol, + accounts=accounts, + account=account_name, + + # if this startup size is greater the allocator limit, + # the limit is increased internally in this factory. + startup_pp=startup_pp, + ) + + pp_tracker = PositionTracker( + chart, + alloc, + startup_pp + ) + pp_tracker.hide() + trackers[account_name] = pp_tracker + + assert pp_tracker.startup_pp.size == pp_tracker.live_pp.size + + # TODO: do we even really need the "startup pp" or can we + # just take the max and pass that into the some state / the + # alloc? + pp_tracker.update_from_pp() + + if pp_tracker.startup_pp.size != 0: + # if no position, don't show pp tracking graphics + pp_tracker.show() + pp_tracker.hide_info() + + # fill out trackers for accounts with net-zero pps + zero_pp_accounts = set(accounts) - set(trackers) + for account_name in zero_pp_accounts: + startup_pp = Position( + symbol=symbol, + size=0, + avg_price=0, + ) + + # allocator + alloc = mk_allocator( + symbol=symbol, + accounts=accounts, + account=account_name, + startup_pp=startup_pp, + ) + pp_tracker = PositionTracker( + chart, + alloc, + startup_pp + ) + pp_tracker.hide() + trackers[account_name] = pp_tracker # order pane widgets and allocation model order_pane = SettingsPane( - tracker=pp_tracker, form=form, - alloc=alloc, + # XXX: ugh, so hideous... fill_bar=form.fill_bar, pnl_label=form.left_label, step_label=form.bottom_label, limit_label=form.top_label, ) + # top level abstraction which wraps all this crazyness into + # a namespace.. + mode = OrderMode( + chart, + tn, + feed, + book, + lines, + arrows, + multistatus, + pane=order_pane, + trackers=trackers, + + ) + # XXX: MUST be set + order_pane.order_mode = mode + + # select a pp to track + tracker = trackers[pp_account] + mode.current_pp = tracker + tracker.show() + tracker.hide_info() + # XXX: would love to not have to do this separate from edit # fields (which are done in an async loop - see below) # connect selection signals (from drop down widgets) @@ -556,77 +692,17 @@ async def open_order_mode( ) ) - # top level abstraction which wraps all this crazyness into - # a namespace.. - mode = OrderMode( - chart, - book, - lines, - arrows, - multistatus, - pp_tracker, - allocator=alloc, - pane=order_pane, - ) + # make fill bar and positioning snapshot + order_pane.on_ui_settings_change('limit', tracker.alloc.limit()) + order_pane.update_status_ui(pp=tracker) # TODO: create a mode "manager" of sorts? # -> probably just call it "UxModes" err sumthin? # so that view handlers can access it view.order_mode = mode - our_sym = mode.chart.linked._symbol.key - - # update any exising position - pp_msg = None - for sym, msg in positions.items(): - if sym.lower() in our_sym: - pp_msg = msg - break - - # make fill bar and positioning snapshot - # XXX: this need to be called *before* the first - # pp tracker update(s) below to ensure the limit size unit has - # been correctly set prior to updating the line's pp size label - # (the one on the RHS). - # TODO: should probably split out the alloc config from the UI - # config startup steps.. - order_pane.init_status_ui() - - # we should probably make the allocator config - # and explitict helper func call that takes in the aloc and - # the postion / symbol info then take that alloc ref and - # update the pp_tracker and pp_pane? - if pp_msg: - pp_tracker.update_from_pp_msg(msg) - - order_pane.update_status_ui() - - live_pp = mode.pp.live_pp - size = live_pp.size - if size: - global _zero_pp - _zero_pp = False - - # compute and display pnl status immediately - mode.pane.pnl_label.format( - pnl=copysign(1, size) * pnl( - live_pp.avg_price, - # last historical close price - feed.shm.array[-1][['close']][0], - ), - ) - - # spawn updater task - n.start_soon( - display_pnl, - feed, - mode, - ) - - else: - # set 0% pnl - mode.pane.pnl_label.format(pnl=0) - + order_pane.on_ui_settings_change('account', pp_account) + mode.pane.display_pnl(mode.current_pp) # Begin order-response streaming done() @@ -645,14 +721,13 @@ async def open_order_mode( ), ): - # signal to top level symbol loading task we're ready # to handle input since the ems connection is ready started.set() - n.start_soon( + tn.start_soon( process_trades_and_update_ui, - n, + tn, feed, mode, trades_stream, @@ -661,67 +736,6 @@ async def open_order_mode( yield mode -_zero_pp: bool = True - - -async def display_pnl( - feed: Feed, - order_mode: OrderMode, -) -> None: - '''Real-time display the current pp's PnL in the appropriate label. - - Error if this task is spawned where there is a net-zero pp. - - ''' - global _zero_pp - assert not _zero_pp - - pp = order_mode.pp - live = pp.live_pp - - if live.size < 0: - types = ('ask', 'last', 'last', 'utrade') - - elif live.size > 0: - types = ('bid', 'last', 'last', 'utrade') - - else: - raise RuntimeError('No pp?!?!') - - # real-time update pnl on the status pane - async with feed.stream.subscribe() as bstream: - # last_tick = time.time() - async for quotes in bstream: - - # now = time.time() - # period = now - last_tick - - for sym, quote in quotes.items(): - - for tick in iterticks(quote, types): - # print(f'{1/period} Hz') - - size = live.size - - if size == 0: - # terminate this update task since we're - # no longer in a pp - _zero_pp = True - order_mode.pane.pnl_label.format(pnl=0) - return - - else: - # compute and display pnl status - order_mode.pane.pnl_label.format( - pnl=copysign(1, size) * pnl( - live.avg_price, - tick['price'], - ), - ) - - # last_tick = time.time() - - async def process_trades_and_update_ui( n: trio.Nursery, @@ -733,8 +747,7 @@ async def process_trades_and_update_ui( ) -> None: get_index = mode.chart.get_index - tracker = mode.pp - global _zero_pp + global _pnl_tasks # this is where we receive **back** messages # about executions **from** the EMS actor @@ -747,24 +760,19 @@ async def process_trades_and_update_ui( if name in ( 'position', ): - # show line label once order is live - sym = mode.chart.linked.symbol if msg['symbol'].lower() in sym.key: - tracker.update_from_pp_msg(msg) + + tracker = mode.trackers[msg['account']] + tracker.live_pp.update_from_msg(msg) + tracker.update_from_pp() # update order pane widgets - mode.pane.update_status_ui() + mode.pane.update_status_ui(tracker) - if mode.pp.live_pp.size and _zero_pp: - _zero_pp = False - n.start_soon( - display_pnl, - feed, - mode, - ) + mode.pane.display_pnl(tracker) # short circuit to next msg to avoid - # uncessary msg content lookups + # unnecessary msg content lookups continue resp = msg['resp'] @@ -795,10 +803,13 @@ async def process_trades_and_update_ui( elif resp in ( 'broker_cancelled', 'broker_inactive', + 'broker_errored', 'dark_cancelled' ): # delete level line from view mode.on_cancel(oid) + broker_msg = msg['brokerd_msg'] + log.warning(f'Order {oid} failed with:\n{pformat(broker_msg)}') elif resp in ( 'dark_triggered' @@ -849,4 +860,6 @@ async def process_trades_and_update_ui( arrow_index=get_index(details['broker_time']), ) - tracker.live_pp.fills.append(msg) + # TODO: how should we look this up? + # tracker = mode.trackers[msg['account']] + # tracker.live_pp.fills.append(msg) diff --git a/tests/conftest.py b/tests/conftest.py index d6ebbbf2..aaa125ce 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -3,8 +3,8 @@ import os import pytest import tractor import trio -from piker import log -from piker.brokers import questrade, config +from piker import log, config +from piker.brokers import questrade def pytest_addoption(parser):