diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 28fc54fa..47c79636 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -19,34 +19,23 @@ Orders and execution client API. """ from contextlib import asynccontextmanager -from typing import Dict, Tuple, List +from typing import Dict from pprint import pformat from dataclasses import dataclass, field import trio import tractor -# import msgspec from ..data._source import Symbol from ..log import get_logger from ._ems import _emsd_main from .._daemon import maybe_open_emsd +from ._messages import Order, Cancel log = get_logger(__name__) -# TODO: some kinda validation like this -# class Order(msgspec.Struct): -# action: str -# price: float -# size: float -# symbol: str -# brokers: List[str] -# oid: str -# exec_mode: str - - @dataclass class OrderBook: """Buy-side (client-side ?) order book ctl and tracking. @@ -64,31 +53,34 @@ class OrderBook: _to_ems: trio.abc.SendChannel _from_order_book: trio.abc.ReceiveChannel - _sent_orders: Dict[str, dict] = field(default_factory=dict) + _sent_orders: Dict[str, Order] = field(default_factory=dict) _ready_to_receive: trio.Event = trio.Event() def send( + self, uuid: str, symbol: str, - brokers: List[str], + brokers: list[str], price: float, size: float, action: str, exec_mode: str, + ) -> dict: - cmd = { - 'action': action, - 'price': price, - 'size': size, - 'symbol': symbol, - 'brokers': brokers, - 'oid': uuid, - 'exec_mode': exec_mode, # dark or live - } - self._sent_orders[uuid] = cmd - self._to_ems.send_nowait(cmd) - return cmd + msg = Order( + action=action, + price=price, + size=size, + symbol=symbol, + brokers=brokers, + oid=uuid, + exec_mode=exec_mode, # dark or live + ) + + self._sent_orders[uuid] = msg + self._to_ems.send_nowait(msg.dict()) + return msg def update( self, @@ -98,28 +90,27 @@ class OrderBook: cmd = self._sent_orders[uuid] msg = cmd.dict() msg.update(data) - self._sent_orders[uuid] = OrderMsg(**msg) + self._sent_orders[uuid] = Order(**msg) self._to_ems.send_nowait(msg) return cmd def cancel(self, uuid: str) -> bool: - """Cancel an order (or alert) from the EMS. + """Cancel an order (or alert) in the EMS. """ cmd = self._sent_orders[uuid] - msg = { - 'action': 'cancel', - 'oid': uuid, - 'symbol': cmd['symbol'], - } - self._to_ems.send_nowait(msg) + msg = Cancel( + oid=uuid, + symbol=cmd.symbol, + ) + self._to_ems.send_nowait(msg.dict()) _orders: OrderBook = None def get_orders( - emsd_uid: Tuple[str, str] = None + emsd_uid: tuple[str, str] = None ) -> OrderBook: """" OrderBook singleton factory per actor. @@ -139,7 +130,10 @@ def get_orders( return _orders +# TODO: we can get rid of this relay loop once we move +# order_mode inputs to async code! async def relay_order_cmds_from_sync_code( + symbol_key: str, to_ems_stream: tractor.MsgStream, @@ -184,7 +178,8 @@ async def relay_order_cmds_from_sync_code( async def open_ems( broker: str, symbol: Symbol, -) -> None: + +) -> (OrderBook, tractor.MsgStream, dict): """Spawn an EMS daemon and begin sending orders and receiving alerts. @@ -232,9 +227,9 @@ async def open_ems( broker=broker, symbol=symbol.key, - # TODO: ``first`` here should be the active orders/execs - # persistent on the ems so that loca UI's can be populated. - ) as (ctx, first), + # TODO: ``first`` here should be the active orders/execs + # persistent on the ems so that loca UI's can be populated. + ) as (ctx, positions), # open 2-way trade command stream ctx.open_stream() as trades_stream, @@ -246,4 +241,4 @@ async def open_ems( trades_stream ) - yield book, trades_stream + yield book, trades_stream, positions diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 50a44426..cd0795b3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -32,7 +32,12 @@ import tractor from .. import data from ..log import get_logger from ..data._normalize import iterticks -from ._paper_engine import PaperBoi, simulate_fills +from . import _paper_engine as paper +from ._messages import ( + Status, Order, + BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, + BrokerdFill, BrokerdError, BrokerdPosition, +) log = get_logger(__name__) @@ -106,8 +111,9 @@ class _DarkBook: float ] = field(default_factory=dict) - # mapping of broker order ids to piker ems ids - _broker2ems_ids: dict[str, str] = field(default_factory=bidict) + # mapping of piker ems order ids to current brokerd order flow message + _ems_entries: dict[str, str] = field(default_factory=dict) + _ems2brokerd_ids: dict[str, str] = field(default_factory=bidict) # XXX: this is in place to prevent accidental positions that are too @@ -117,13 +123,20 @@ class _DarkBook: _DEFAULT_SIZE: float = 1.0 -async def execute_triggers( +async def clear_dark_triggers( + + # ctx: tractor.Context, + brokerd_orders_stream: tractor.MsgStream, + ems_client_order_stream: tractor.MsgStream, + quote_stream: tractor.ReceiveMsgStream, # noqa + broker: str, symbol: str, - stream: 'tractor.ReceiveStream', # noqa - ctx: tractor.Context, - client: 'Client', # noqa + # client: 'Client', # noqa + # order_msg_stream: 'Client', # noqa + book: _DarkBook, + ) -> None: """Core dark order trigger loop. @@ -133,7 +146,7 @@ async def execute_triggers( """ # this stream may eventually contain multiple symbols # XXX: optimize this for speed! - async for quotes in stream: + async for quotes in quote_stream: # TODO: numba all this! @@ -169,9 +182,15 @@ async def execute_triggers( # majority of iterations will be non-matches continue - action = cmd['action'] + action: str = cmd['action'] + symbol: str = cmd['symbol'] - if action != 'alert': + if action == 'alert': + # nothing to do but relay a status + # message back to the requesting ems client + resp = 'alert_triggered' + + else: # executable order submission # submit_price = price + price*percent_away @@ -181,47 +200,89 @@ async def execute_triggers( f'Dark order triggered for price {price}\n' f'Submitting order @ price {submit_price}') - reqid = await client.submit_limit( + # TODO: port to BrokerdOrder message sending + msg = BrokerdOrder( + action=cmd['action'], oid=oid, + time_ns=time.time_ns(), + # this is a brand new order request for the - # underlying broker so we set out "broker request - # id" (brid) as nothing so that the broker - # client knows that we aren't trying to modify - # an existing order. - brid=None, + # underlying broker so we set a "broker + # request id" (brid) to "nothing" so that the + # broker client knows that we aren't trying + # to modify an existing order-request. + reqid=None, symbol=sym, - action=cmd['action'], price=submit_price, size=cmd['size'], ) + await brokerd_orders_stream.send(msg.dict()) + # mark this entry as having send an order request + book._ems_entries[oid] = msg - # register broker request id to ems id - book._broker2ems_ids[reqid] = oid + resp = 'dark_triggered' - else: - # alerts have no broker request id - reqid = '' + # an internal brokerd-broker specific + # order-request id is expected to be generated - resp = { - 'resp': 'dark_executed', - 'time_ns': time.time_ns(), - 'trigger_price': price, + # reqid = await client.submit_limit( - 'cmd': cmd, # original request message + # oid=oid, - 'broker_reqid': reqid, - 'broker': broker, - 'oid': oid, # piker order id + # # this is a brand new order request for the + # # underlying broker so we set a "broker + # # request id" (brid) to "nothing" so that the + # # broker client knows that we aren't trying + # # to modify an existing order-request. + # brid=None, - } + # symbol=sym, + # action=cmd['action'], + # price=submit_price, + # size=cmd['size'], + # ) + + # # register broker request id to ems id + + # else: + # # alerts have no broker request id + # reqid = '' + + # resp = { + # 'resp': 'dark_executed', + # 'cmd': cmd, # original request message + + # 'time_ns': time.time_ns(), + # 'trigger_price': price, + + # 'broker_reqid': reqid, + # 'broker': broker, + # 'oid': oid, # piker order id + + # } + msg = Status( + oid=oid, # piker order id + resp=resp, + time_ns=time.time_ns(), + + symbol=symbol, + trigger_price=price, + + # broker_reqid=reqid, + broker_details={'name': broker}, + + cmd=cmd, # original request message + + ).dict() # remove exec-condition from set log.info(f'removing pred for {oid}') execs.pop(oid) - await ctx.send_yield(resp) + # await ctx.send_yield(resp) + await ems_client_order_stream.send(msg) else: # condition scan loop complete log.debug(f'execs are {execs}') @@ -231,78 +292,49 @@ async def execute_triggers( # print(f'execs scan took: {time.time() - start}') -async def exec_loop( +# async def start_clearing( - ctx: tractor.Context, - feed: 'Feed', # noqa - broker: str, - symbol: str, - _exec_mode: str, - task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, +# # ctx: tractor.Context, +# brokerd_order_stream: tractor.MsgStream, +# quote_stream: tractor.MsgStream, -) -> AsyncIterator[dict]: - """Main scan loop for order execution conditions and submission - to brokers. +# # client: 'Client', - """ - global _router +# # feed: 'Feed', # noqa +# broker: str, +# symbol: str, +# _exec_mode: str, - # XXX: this should be initial price quote from target provider - first_quote = await feed.receive() +# book: _DarkBook, - book = _router.get_dark_book(broker) - book.lasts[(broker, symbol)] = first_quote[symbol]['last'] +# # task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, - # TODO: wrap this in a more re-usable general api - client_factory = getattr(feed.mod, 'get_client_proxy', None) +# ) -> AsyncIterator[dict]: +# """Main scan loop for order execution conditions and submission +# to brokers. - if client_factory is not None and _exec_mode != 'paper': +# """ +# async with trio.open_nursery() as n: - # we have an order API for this broker - client = client_factory(feed._brokerd_portal) +# # trigger scan and exec loop +# n.start_soon( +# trigger_executions, - else: - # force paper mode - log.warning(f'Entering paper trading mode for {broker}') +# brokerd_order_stream, +# quote_stream, - client = PaperBoi( - broker, - *trio.open_memory_channel(100), - _buys={}, - _sells={}, +# broker, +# symbol, +# book +# # ctx, +# # client, +# ) - _reqids={}, - ) - - # for paper mode we need to mock this trades response feed - # so we pass a duck-typed feed-looking mem chan which is fed - # fill and submission events from the exec loop - feed._trade_stream = client.trade_stream - - # init the trades stream - client._to_trade_stream.send_nowait({'local_trades': 'start'}) - - _exec_mode = 'paper' - - # return control to parent task - task_status.started((first_quote, feed, client)) - - stream = feed.stream - async with trio.open_nursery() as n: - n.start_soon( - execute_triggers, - broker, - symbol, - stream, - ctx, - client, - book - ) - - if _exec_mode == 'paper': - # TODO: make this an actual broadcast channels as in: - # https://github.com/python-trio/trio/issues/987 - n.start_soon(simulate_fills, stream, client) +# # # paper engine simulator task +# # if _exec_mode == 'paper': +# # # TODO: make this an actual broadcast channels as in: +# # # https://github.com/python-trio/trio/issues/987 +# # n.start_soon(simulate_fills, quote_stream, client) # TODO: lots of cases still to handle @@ -315,11 +347,17 @@ async def exec_loop( # reqId 1550: Order held while securities are located.'), # status='PreSubmitted', message='')], -async def process_broker_trades( - ctx: tractor.Context, - feed: 'Feed', # noqa +async def translate_and_relay_brokerd_events( + + # ctx: tractor.Context, + broker: str, + ems_client_order_stream: tractor.MsgStream, + brokerd_trades_stream: tractor.MsgStream, book: _DarkBook, + + # feed: 'Feed', # noqa task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, + ) -> AsyncIterator[dict]: """Trades update loop - receive updates from broker, convert to EMS responses, transmit to ordering client(s). @@ -339,198 +377,336 @@ async def process_broker_trades( {'presubmitted', 'submitted', 'cancelled', 'inactive'} """ - broker = feed.mod.name + # broker = feed.mod.name # TODO: make this a context # in the paper engine case this is just a mem receive channel - async with feed.receive_trades_data() as trades_stream: + # async with feed.receive_trades_data() as brokerd_trades_stream: - first = await trades_stream.__anext__() + # first = await brokerd_trades_stream.__anext__() - # startup msg expected as first from broker backend - assert first['local_trades'] == 'start' - task_status.started() + # startup msg expected as first from broker backend + # assert first['local_trades'] == 'start' + # task_status.started() - async for event in trades_stream: + async for brokerd_msg in brokerd_trades_stream: - name, msg = event['local_trades'] + # name, msg = event['local_trades'] + name = brokerd_msg['name'] - log.info(f'Received broker trade event:\n{pformat(msg)}') + log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}') - if name == 'position': - msg['resp'] = 'position' + if name == 'position': + # msg['resp'] = 'position' + + # relay through position msgs immediately + await ems_client_order_stream.send( + BrokerdPosition(**brokerd_msg).dict() + ) + continue + + # Get the broker (order) request id, this **must** be normalized + # into messaging provided by the broker backend + reqid = brokerd_msg['reqid'] + + # all piker originated requests will have an ems generated oid field + oid = brokerd_msg.get( + 'oid', + book._ems2brokerd_ids.inverse.get(reqid) + ) + + if oid is None: + + # XXX: paper clearing special cases + # paper engine race case: ``Client.submit_limit()`` hasn't + # returned yet and provided an output reqid to register + # locally, so we need to retreive the oid that was already + # packed at submission since we already know it ahead of + # time + paper = brokerd_msg['broker_details'].get('paper_info') + if paper: + # paperboi keeps the ems id up front + oid = paper['oid'] + + else: + # may be an order msg specified as "external" to the + # piker ems flow (i.e. generated by some other + # external broker backend client (like tws for ib) + ext = brokerd_msg.get('external') + if ext: + log.error(f"External trade event {ext}") + + continue + else: + # check for existing live flow entry + entry = book._ems_entries.get(oid) + + # initial response to brokerd order request + if name == 'ack': + + # register the brokerd request id (that was likely + # generated internally) with our locall ems order id for + # reverse lookup later. a BrokerdOrderAck **must** be + # sent after an order request in order to establish this + # id mapping. + book._ems2brokerd_ids[oid] = reqid + + # new order which has not yet be registered into the + # local ems book, insert it now and handle 2 cases: + + # - the order has previously been requested to be + # cancelled by the ems controlling client before we + # received this ack, in which case we relay that cancel + # signal **asap** to the backend broker + if entry.action == 'cancel': + # assign newly providerd broker backend request id + entry.reqid = reqid + + # tell broker to cancel immediately + await brokerd_trades_stream.send(entry.dict()) + + # - the order is now active and will be mirrored in + # our book -> registered as live flow + else: + # update the flow with the ack msg + book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) - # relay through - await ctx.send_yield(msg) continue - # Get the broker (order) request id, this **must** be normalized - # into messaging provided by the broker backend - reqid = msg['reqid'] + # a live flow now exists + oid = entry.oid - # make response packet to EMS client(s) - oid = book._broker2ems_ids.get(reqid) + # make response packet to EMS client(s) + # reqid = book._ems_entries.get(oid) - if oid is None: - # paper engine race case: ``Client.submit_limit()`` hasn't - # returned yet and provided an output reqid to register - # locally, so we need to retreive the oid that was already - # packed at submission since we already know it ahead of - # time - paper = msg.get('paper_info') - if paper: - oid = paper['oid'] + # # msg is for unknown emsd order id + # if oid is None: + # oid = msg['oid'] + # # XXX: paper clearing special cases + # # paper engine race case: ``Client.submit_limit()`` hasn't + # # returned yet and provided an output reqid to register + # # locally, so we need to retreive the oid that was already + # # packed at submission since we already know it ahead of + # # time + # paper = msg.get('paper_info') + # if paper: + # oid = paper['oid'] + + # else: + # msg.get('external') + # if not msg: + # log.error(f"Unknown trade event {event}") + + # continue + + # resp = { + # 'resp': None, # placeholder + # 'oid': oid + # } + resp = None + broker_details = {} + + if name in ( + 'error', + ): + # TODO: figure out how this will interact with EMS clients + # for ex. on an error do we react with a dark orders + # management response, like cancelling all dark orders? + + # This looks like a supervision policy for pending orders on + # some unexpected failure - something we need to think more + # about. In most default situations, with composed orders + # (ex. brackets), most brokers seem to use a oca policy. + + msg = BrokerdError(**brokerd_msg) + + # XXX should we make one when it's blank? + log.error(pformat(msg)) + + # TODO: getting this bs, prolly need to handle status messages + # 'Market data farm connection is OK:usfarm.nj' + + # another stupid ib error to handle + # if 10147 in message: cancel + + # don't relay message to order requester client + continue + + elif name in ( + 'status', + ): + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) + + # everyone doin camel case + msg = BrokerdStatus(**brokerd_msg) + # status = msg['status'].lower() + + if msg.status == 'filled': + + # conditional execution is fully complete, no more + # fills for the noted order + if not msg.remaining: + + resp = 'broker_executed' + + log.info(f'Execution for {oid} is complete!') + + + # just log it else: - msg.get('external') - if not msg: - log.error(f"Unknown trade event {event}") - - continue - - resp = { - 'resp': None, # placeholder - 'oid': oid - } - - if name in ( - 'error', - ): - # TODO: figure out how this will interact with EMS clients - # for ex. on an error do we react with a dark orders - # management response, like cancelling all dark orders? - - # This looks like a supervision policy for pending orders on - # some unexpected failure - something we need to think more - # about. In most default situations, with composed orders - # (ex. brackets), most brokers seem to use a oca policy. - - message = msg['message'] - - # XXX should we make one when it's blank? - log.error(pformat(message)) - - # TODO: getting this bs, prolly need to handle status messages - # 'Market data farm connection is OK:usfarm.nj' - - # another stupid ib error to handle - # if 10147 in message: cancel - - # don't relay message to order requester client - continue - - elif name in ( - 'status', - ): - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - - # everyone doin camel case - status = msg['status'].lower() - - if status == 'filled': - - # conditional execution is fully complete, no more - # fills for the noted order - if not msg['remaining']: - - resp['resp'] = 'broker_executed' - - log.info(f'Execution for {oid} is complete!') - - # just log it - else: - log.info(f'{broker} filled {msg}') - - else: - # one of (submitted, cancelled) - resp['resp'] = 'broker_' + status - - elif name in ( - 'fill', - ): - # proxy through the "fill" result(s) - resp['resp'] = 'broker_filled' - resp.update(msg) - - log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') - - # respond to requesting client - await ctx.send_yield(resp) + log.info(f'{broker} filled {msg}') -async def process_order_cmds( + else: + # one of {submitted, cancelled} + resp = 'broker_' + msg.status + + # pass the BrokerdStatus msg inside the broker details field + broker_details = msg.dict() + + elif name in ( + 'fill', + ): + msg = BrokerdFill(**brokerd_msg) + + # proxy through the "fill" result(s) + resp = 'broker_filled' + broker_details = msg.dict() + + log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') + + else: + raise ValueError(f'Brokerd message {brokerd_msg} is invalid') + + # Create and relay EMS response status message + resp = Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=broker_details, + ) + # relay response to requesting EMS client + await ems_client_order_stream.send(resp.dict()) + + +async def process_client_order_cmds( + + # ctx: tractor.Context, + client_order_stream: tractor.MsgStream, # noqa + brokerd_order_stream: tractor.MsgStream, - ctx: tractor.Context, - cmd_stream: 'tractor.ReceiveStream', # noqa symbol: str, feed: 'Feed', # noqa - client: 'Client', # noqa + # client: 'Client', # noqa dark_book: _DarkBook, ) -> None: - async for cmd in cmd_stream: + # cmd: dict + async for cmd in client_order_stream: log.info(f'Received order cmd:\n{pformat(cmd)}') action = cmd['action'] oid = cmd['oid'] - - brid = dark_book._broker2ems_ids.inverse.get(oid) + reqid = dark_book._ems2brokerd_ids.inverse.get(oid) + live_entry = dark_book._ems_entries.get(oid) # TODO: can't wait for this stuff to land in 3.10 # https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings if action in ('cancel',): # check for live-broker order - if brid: - log.info("Submitting cancel for live order") - await client.submit_cancel(reqid=brid) + if live_entry: + + msg = BrokerdCancel( + oid=oid, + reqid=reqid or live_entry.reqid, + time_ns=time.time_ns(), + ) + + # send cancel to brokerd immediately! + log.info("Submitting cancel for live order") + + # NOTE: cancel response will be relayed back in messages + # from corresponding broker + # await client.submit_cancel(reqid=reqid) + await brokerd_order_stream.send(msg.dict()) - # check for EMS active exec else: + # might be a cancel for order that hasn't been acked yet + # by brokerd so register a cancel for then the order + # does show up later + dark_book._ems_entries[oid] = msg + + # check for EMS active exec try: + # remove from dark book clearing dark_book.orders[symbol].pop(oid, None) - # TODO: move these to `tractor.MsgStream` - await ctx.send_yield({ - 'resp': 'dark_cancelled', - 'oid': oid - }) + # tell client side that we've cancelled the + # dark-trigger order + await client_order_stream.send( + Status( + resp='dark_cancelled', + oid=oid, + time_ns=time.time_ns(), + ).dict() + ) + except KeyError: log.exception(f'No dark order for {symbol}?') + # TODO: 3.10 struct-pattern matching and unpacking here elif action in ('alert', 'buy', 'sell',): - sym = cmd['symbol'] - trigger_price = cmd['price'] - size = cmd['size'] - brokers = cmd['brokers'] - exec_mode = cmd['exec_mode'] + msg = Order(**cmd) - broker = brokers[0] + # sym = cmd['symbol'] + # trigger_price = cmd['price'] + # size = cmd['size'] + # brokers = cmd['brokers'] + # exec_mode = cmd['exec_mode'] + + sym = msg.symbol + trigger_price = msg.price + size = msg.size + exec_mode = msg.exec_mode + broker = msg.brokers[0] if exec_mode == 'live' and action in ('buy', 'sell',): - # register broker id for ems id - order_id = await client.submit_limit( + if live_entry is not None: + # sanity check on emsd id + assert live_entry.oid == oid + + # if we already had a broker order id then + # this is likely an order update commmand. + log.info(f"Modifying order: {live_entry.reqid}") + + # TODO: port to BrokerdOrder message sending + # register broker id for ems id + msg = BrokerdOrder( oid=oid, # no ib support for oids... + time_ns=time.time_ns(), # if this is None, creates a new order # otherwise will modify any existing one - brid=brid, + reqid=reqid, symbol=sym, action=action, @@ -538,25 +714,38 @@ async def process_order_cmds( size=size, ) - if brid: - assert dark_book._broker2ems_ids[brid] == oid - - # if we already had a broker order id then - # this is likely an order update commmand. - log.info(f"Modifying order: {brid}") - - else: - dark_book._broker2ems_ids[order_id] = oid - + # send request to backend # XXX: the trades data broker response loop - # (``process_broker_trades()`` above) will - # handle sending the ems side acks back to - # the cmd sender from here + # (``translate_and_relay_brokerd_events()`` above) will + # handle relaying the ems side responses back to + # the client/cmd sender from this request + print(f'sending live order {msg}') + await brokerd_order_stream.send(msg.dict()) + + # order_id = await client.submit_limit( + + # oid=oid, # no ib support for oids... + + # # if this is None, creates a new order + # # otherwise will modify any existing one + # brid=brid, + + # symbol=sym, + # action=action, + # price=trigger_price, + # size=size, + # ) + + # an immediate response should be brokerd ack with order + # id but we register our request as part of the flow + dark_book._ems_entries[oid] = msg elif exec_mode in ('dark', 'paper') or ( action in ('alert') ): - # submit order to local EMS + # submit order to local EMS book and scan loop, + # effectively a local clearing engine, which + # scans for conditions and triggers matching executions # Auto-gen scanner predicate: # we automatically figure out what the alert check @@ -590,8 +779,10 @@ async def process_order_cmds( abs_diff_away = 0 # submit execution/order to EMS scan loop - # FYI: this may result in an override of an existing + + # NOTE: this may result in an override of an existing # dark book entry if the order id already exists + dark_book.orders.setdefault( sym, {} )[oid] = ( @@ -601,14 +792,27 @@ async def process_order_cmds( percent_away, abs_diff_away ) + # TODO: if the predicate resolves immediately send the # execution to the broker asap? Or no? # ack-response that order is live in EMS - await ctx.send_yield({ - 'resp': 'dark_submitted', - 'oid': oid - }) + # await ctx.send_yield( + # {'resp': 'dark_submitted', + # 'oid': oid} + # ) + if action == 'alert': + resp = 'alert_submitted' + else: + resp = 'dark_submitted' + + await client_order_stream.send( + Status( + resp=resp, + oid=oid, + time_ns=time.time_ns(), + ).dict() + ) @tractor.context @@ -618,7 +822,8 @@ async def _emsd_main( # client_actor_name: str, broker: str, symbol: str, - _mode: str = 'dark', # ('paper', 'dark', 'live') + _exec_mode: str = 'dark', # ('paper', 'dark', 'live') + loglevel: str = 'info', ) -> None: """EMS (sub)actor entrypoint providing the @@ -635,15 +840,23 @@ async def _emsd_main( received in a stream from that client actor and then responses are streamed back up to the original calling task in the same client. - The task tree is: + The primary ``emsd`` task tree is: + - ``_emsd_main()``: - accepts order cmds, registers execs with exec loop - - - ``exec_loop()``: - run (dark) conditions on inputs and trigger broker submissions - - - ``process_broker_trades()``: - accept normalized trades responses, process and relay to ems client(s) + sets up brokerd feed, order feed with ems client, trades dialogue with + brokderd trading api. + | + - ``start_clearing()``: + run (dark) conditions on inputs and trigger broker submissions + | + - ``translate_and_relay_brokerd_events()``: + accept normalized trades responses from brokerd, process and + relay to ems client(s); this is a effectively a "trade event + reponse" proxy-broker. + | + - ``process_client_order_cmds()``: + accepts order cmds from requesting piker clients, registers + execs with exec loop """ # from ._client import send_order_cmds @@ -651,49 +864,140 @@ async def _emsd_main( global _router dark_book = _router.get_dark_book(broker) + ems_ctx = ctx + + cached_feed = _router.feeds.get((broker, symbol)) + if cached_feed: + # TODO: use cached feeds per calling-actor + log.warning(f'Opening duplicate feed for {(broker, symbol)}') + # spawn one task per broker feed - async with trio.open_nursery() as n: + async with ( + trio.open_nursery() as n, # TODO: eventually support N-brokers - async with data.open_feed( + data.open_feed( broker, [symbol], - loglevel='info', - ) as feed: + loglevel=loglevel, + ) as feed, + ): + if not cached_feed: + _router.feeds[(broker, symbol)] = feed - # get a portal back to the client - # async with tractor.wait_for_actor(client_actor_name) as portal: + # XXX: this should be initial price quote from target provider + first_quote = await feed.receive() - await ctx.started() + # open a stream with the brokerd backend for order + # flow dialogue - # establish 2-way stream with requesting order-client - async with ctx.open_stream() as order_stream: + book = _router.get_dark_book(broker) + book.lasts[(broker, symbol)] = first_quote[symbol]['last'] + + trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) + portal = feed._brokerd_portal + + if trades_endpoint is None or _exec_mode == 'paper': + + # load the paper trading engine + _exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine inside the brokerd + # actor to simulate the real load it'll likely be under + # when also pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + broker=broker, + symbol=symbol, + loglevel=loglevel, + ) + + # for paper mode we need to mock this trades response feed + # so we pass a duck-typed feed-looking mem chan which is fed + # fill and submission events from the exec loop + # feed._trade_stream = client.trade_stream + + # init the trades stream + # client._to_trade_stream.send_nowait({'local_trades': 'start'}) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + async with ( + open_trades_endpoint as (brokerd_ctx, positions), + brokerd_ctx.open_stream() as brokerd_trades_stream, + ): + + # if trades_endpoint is not None and _exec_mode != 'paper': + + # # TODO: open a bidir stream here? + # # we have an order API for this broker + # client = client_factory(feed._brokerd_portal) + + # else: + + # return control to parent task + # task_status.started((first_quote, feed, client)) + + # stream = feed.stream + + # start the real-time clearing condition scan loop and + # paper engine simulator. + + # n.start_soon( + # start_clearing, + # brokerd_trades_stream, + # feed.stream, # quote stream + # # client, + # broker, + # symbol, + # _exec_mode, + # book, + # ) + + # signal to client that we're started + # TODO: we could send back **all** brokerd positions here? + await ems_ctx.started(positions) + + # establish 2-way stream with requesting order-client and + # begin handling inbound order requests and updates + async with ems_ctx.open_stream() as ems_client_order_stream: + + # trigger scan and exec loop + n.start_soon( + clear_dark_triggers, + + brokerd_trades_stream, + ems_client_order_stream, + feed.stream, - # start the condition scan loop - quote, feed, client = await n.start( - exec_loop, - ctx, - feed, broker, symbol, - _mode, + book + # ctx, + # client, ) # begin processing order events from the target brokerd backend - await n.start( - process_broker_trades, - ctx, - feed, + n.start_soon( + + translate_and_relay_brokerd_events, + broker, + ems_client_order_stream, + brokerd_trades_stream, dark_book, ) # start inbound (from attached client) order request processing - await process_order_cmds( - ctx, - order_stream, + await process_client_order_cmds( + ems_client_order_stream, + brokerd_trades_stream, symbol, feed, - client, dark_book, ) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index a49858f4..c3c4016a 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -127,9 +127,9 @@ class OrderMode: """ line = self.lines.commit_line(uuid) - req_msg = self.book._sent_orders.get(uuid) - if req_msg: - req_msg.ack_time_ns = time.time_ns() + # req_msg = self.book._sent_orders.get(uuid) + # if req_msg: + # req_msg.ack_time_ns = time.time_ns() return line @@ -317,10 +317,14 @@ async def start_order_mode( # spawn EMS actor-service async with ( - open_ems(brokername, symbol) as (book, trades_stream), + open_ems(brokername, symbol) as (book, trades_stream, positions), open_order_mode(symbol, chart, book) as order_mode ): + # update any exising positions + for sym, msg in positions.items(): + order_mode.on_position_update(msg) + def get_index(time: float): # XXX: not sure why the time is so off here @@ -343,16 +347,15 @@ async def start_order_mode( fmsg = pformat(msg) log.info(f'Received order msg:\n{fmsg}') - resp = msg['resp'] - - if resp in ( + name = msg['name'] + if name in ( 'position', ): # show line label once order is live order_mode.on_position_update(msg) continue - # delete the line from view + resp = msg['resp'] oid = msg['oid'] # response to 'action' request (buy/sell) @@ -375,21 +378,21 @@ async def start_order_mode( order_mode.on_cancel(oid) elif resp in ( - 'dark_executed' + 'dark_triggered' ): log.info(f'Dark order triggered for {fmsg}') - # for alerts add a triangle and remove the - # level line - if msg['cmd']['action'] == 'alert': - - # should only be one "fill" for an alert - order_mode.on_fill( - oid, - price=msg['trigger_price'], - arrow_index=get_index(time.time()) - ) - await order_mode.on_exec(oid, msg) + elif resp in ( + 'alert_triggered' + ): + # should only be one "fill" for an alert + # add a triangle and remove the level line + order_mode.on_fill( + oid, + price=msg['trigger_price'], + arrow_index=get_index(time.time()) + ) + await order_mode.on_exec(oid, msg) # response to completed 'action' request for buy/sell elif resp in ( @@ -400,12 +403,15 @@ async def start_order_mode( # each clearing tick is responded individually elif resp in ('broker_filled',): - action = msg['action'] + action = book._sent_orders[oid].action + details = msg['brokerd_msg'] # TODO: some kinda progress system order_mode.on_fill( oid, - price=msg['price'], - arrow_index=get_index(msg['broker_time']), + price=details['price'], pointing='up' if action == 'buy' else 'down', + + # TODO: put the actual exchange timestamp + arrow_index=get_index(details['broker_time']), )