From 5acd780eb62dafca3973789c7a110c827c528eb9 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 15 Jan 2021 19:41:03 -0500 Subject: [PATCH] Get live mode correct and working --- piker/_ems.py | 196 +++++++++++++++++++++-------------------- piker/data/__init__.py | 2 +- piker/ui/_chart.py | 6 +- 3 files changed, 106 insertions(+), 98 deletions(-) diff --git a/piker/_ems.py b/piker/_ems.py index aad8f9b4..f7aa2792 100644 --- a/piker/_ems.py +++ b/piker/_ems.py @@ -61,11 +61,6 @@ class OrderBook: _to_ems: trio.abc.SendChannel = _to_ems _from_order_book: trio.abc.ReceiveChannel = _from_order_book - # def on_fill(self, uuid: str) -> None: - # cmd = self._sent_orders[uuid] - # log.info(f"Order executed: {cmd}") - # self._confirmed_orders[uuid] = cmd - def send( self, uuid: str, @@ -217,10 +212,6 @@ def get_book(broker: str) -> _ExecBook: return _books.setdefault(broker, _ExecBook(broker)) -# def scan_quotes( -# quotes: dict, - - async def exec_loop( ctx: tractor.Context, broker: str, @@ -263,8 +254,8 @@ async def exec_loop( # start = time.time() for sym, quote in quotes.items(): - execs = book.orders.get((broker, sym)) - if not execs: + execs = book.orders.pop(sym, None) + if execs is None: continue for tick in quote.get('ticks', ()): @@ -283,30 +274,37 @@ async def exec_loop( if pred(price): # register broker id for ems id - order_id = await client.submit_limit( - oid=oid, + reqid = await client.submit_limit( + # oid=oid, symbol=sym, action=cmd['action'], price=round(price, 2), ) - # resp = book._broker2ems_ids.setdefault( - book._broker2ems_ids[order_id] = oid + book._broker2ems_ids[reqid] = oid resp = { - 'resp': 'submitted', + 'resp': 'dark_exec', 'name': name, - 'ems_trigger_time_ns': time.time_ns(), - # current shm array index - 'index': feed.shm._last.value - 1, + 'time_ns': time.time_ns(), 'trigger_price': price, + 'broker_reqid': reqid, + 'broker': broker, + # 'condition': True, + + # current shm array index - this needed? + 'ohlc_index': feed.shm._last.value - 1, } - await ctx.send_yield(resp) - + # remove exec-condition from set log.info(f'removing pred for {oid}') pred, name, cmd = execs.pop(oid) - log.debug(f'execs are {execs}') + await ctx.send_yield(resp) + + else: # condition scan loop complete + log.debug(f'execs are {execs}') + if execs: + book.orders[symbol] = execs # print(f'execs scan took: {time.time() - start}') # feed teardown @@ -318,10 +316,11 @@ async def exec_loop( # should probably keep the order in some kind of weird state or cancel # it outright? # status='PendingSubmit', message=''), -# status='Cancelled', message='Error 404, reqId 1550: Order held while securities are located.'), +# status='Cancelled', message='Error 404, +# reqId 1550: Order held while securities are located.'), # status='PreSubmitted', message='')], -async def receive_trade_updates( +async def process_broker_trades( ctx: tractor.Context, feed: 'Feed', # noqa book: _ExecBook, @@ -339,75 +338,55 @@ async def receive_trade_updates( first = await trades_stream.__anext__() # startup msg - assert first['trade_events'] == 'started' + assert first['local_trades'] == 'start' task_status.started() - async for trade_event in trades_stream: - event = trade_event['trade_events'] + async for msg in trades_stream: + name, ev = msg['local_trades'] + log.info(f'Received broker trade event:\n{pformat(ev)}') - try: - order = event['order'] - except KeyError: + # broker request id - must be normalized + # into error transmission by broker backend. + reqid = ev['reqid'] + oid = book._broker2ems_ids.get(reqid) - # Relay broker error messages - err = event['error'] + # make response packet to EMS client(s) + resp = {'oid': oid} - # broker request id - must be normalized - # into error transmission by broker backend. - reqid = err['brid'] - - # TODO: handle updates! - oid = book._broker2ems_ids.get(reqid) + if name in ('error',): + # TODO: figure out how this will interact with EMS clients + # for ex. on an error do we react with a dark orders + # management response, like cancelling all dark orders? # XXX should we make one when it's blank? - log.error(pformat(err['message'])) + log.error(pformat(ev['message'])) - else: - log.info(f'Received broker trade event:\n{pformat(event)}') + elif name in ('status',): - status = event['orderStatus']['status'] - reqid = order['orderId'] + status = ev['status'].lower() - # TODO: handle updates! - oid = book._broker2ems_ids.get(reqid) + if status == 'filled': + # conditional execution is fully complete + if not ev['remaining']: + log.info(f'Execution for {oid} is complete!') + await ctx.send_yield({'resp': 'executed', 'oid': oid}) + else: + # one of (submitted, cancelled) + resp['resp'] = 'broker_' + status - if status in {'Cancelled'}: - resp = {'resp': 'cancelled'} + await ctx.send_yield(resp) - elif status in {'Submitted'}: - # ack-response that order is live/submitted - # to the broker - resp = {'resp': 'submitted'} - - # elif status in {'Executed', 'Filled'}: - elif status in {'Filled'}: - - # order was filled by broker - fills = [] - for fill in event['fills']: - e = fill['execution'] - fills.append( - (e.time, e.price, e.shares, e.side) - ) - - resp = { - 'resp': 'executed', - 'fills': fills, - } - - else: # active in EMS - # ack-response that order is live in EMS - # (aka as a client side limit) - resp = {'resp': 'active'} - - # send response packet to EMS client(s) - resp['oid'] = oid + elif name in ('fill',): + # proxy through the "fill" result(s) + resp['resp'] = 'broker_filled' + resp.update(ev) + log.info(f'Fill for {oid} cleared with\n{pformat(resp)}') await ctx.send_yield(resp) @tractor.stream -async def stream_and_route( +async def _ems_main( ctx: tractor.Context, client_actor_name: str, broker: str, @@ -440,7 +419,7 @@ async def stream_and_route( # for paper mode we need to mock this trades response feed await n.start( - receive_trade_updates, + process_broker_trades, ctx, feed, book, @@ -452,32 +431,31 @@ async def stream_and_route( action = cmd['action'] oid = cmd['oid'] - sym = cmd['symbol'] - if action == 'cancel': + if action in ('cancel',): # check for live-broker order brid = book._broker2ems_ids.inverse[oid] if brid: log.info("Submitting cancel for live order") - await client.submit_cancel(oid=brid) + await client.submit_cancel(reqid=brid) # check for EMS active exec else: book.orders[symbol].pop(oid, None) await ctx.send_yield( - {'action': 'cancelled', + {'action': 'dark_cancelled', 'oid': oid} ) elif action in ('alert', 'buy', 'sell',): + sym = cmd['symbol'] trigger_price = cmd['price'] brokers = cmd['brokers'] broker = brokers[0] last = book.lasts[(broker, sym)] - # print(f'Known last is {last}') if action in ('buy', 'sell',): @@ -491,17 +469,18 @@ async def stream_and_route( # register broker id for ems id order_id = await client.submit_limit( - oid=oid, + oid=oid, # no ib support for this symbol=sym, action=action, price=round(trigger_price, 2), + size=1, ) book._broker2ems_ids[order_id] = oid # book.orders[symbol][oid] = None # XXX: the trades data broker response loop - # (``receive_trade_updates()`` above) will + # (``process_broker_trades()`` above) will # handle sending the ems side acks back to # the cmd sender from here @@ -516,30 +495,52 @@ async def stream_and_route( pred, name = mk_check(trigger_price, last) # submit execution/order to EMS scanner loop - # create list of executions on first entry book.orders.setdefault( (broker, sym), {} )[oid] = (pred, name, cmd) # ack-response that order is live here await ctx.send_yield({ - 'resp': 'ems_active', + 'resp': 'dark_submitted', 'oid': oid }) # continue and wait on next order cmd -async def _ems_main( +async def open_ems( order_mode, broker: str, symbol: Symbol, - # lines: 'LinesEditor', task_status: TaskStatus[str] = trio.TASK_STATUS_IGNORED, ) -> None: """Spawn an EMS daemon and begin sending orders and receiving alerts. + + This EMS tries to reduce most broker's terrible order entry apis to + a very simple protocol built on a few easy to grok and/or + "rantsy" premises: + + - most users will prefer "dark mode" where orders are not submitted + to a broker until and execution condition is triggered + (aka client-side "hidden orders") + + - Brokers over-complicate their apis and generally speaking hire + poor designers to create them. We're better off using creating a super + minimal, schema-simple, request-event-stream protocol to unify all the + existing piles of shit (and shocker, it'll probably just end up + looking like a decent crypto exchange's api) + + - all order types can be implemented with client-side limit orders + + - we aren't reinventing a wheel in this case since none of these + brokers are exposing FIX protocol; it is they doing the re-invention. + + + TODO: make some fancy diagrams using this: + + """ actor = tractor.current_actor() @@ -553,7 +554,7 @@ async def _ems_main( enable_modules=[__name__], ) stream = await portal.run( - stream_and_route, + _ems_main, client_actor_name=actor.name, broker=broker, symbol=symbol.key, @@ -564,29 +565,36 @@ async def _ems_main( # let parent task continue task_status.started(_to_ems) - # begin the trigger-alert stream + # Begin order-response streaming + # this is where we receive **back** messages # about executions **from** the EMS actor async for msg in stream: + log.info(f'Received order msg: {pformat(msg)}') # delete the line from view oid = msg['oid'] resp = msg['resp'] # response to 'action' request (buy/sell) - if resp in ('ems_active', 'submitted'): + if resp in ('dark_submitted', 'broker_submitted'): log.info(f"order accepted: {msg}") # show line label once order is live order_mode.on_submit(oid) - # response to 'cancel' request - elif resp in ('cancelled',): + # resp to 'cancel' request or error condition for action request + elif resp in ('broker_cancelled', 'dark_cancelled'): # delete level from view order_mode.on_cancel(oid) log.info(f'deleting line with oid: {oid}') - # response to 'action' request (buy/sell) + # response to completed 'action' request for buy/sell elif resp in ('executed',): await order_mode.on_exec(oid, msg) + + # each clearing tick is responded individually + elif resp in ('broker_filled',): + # TODO: some kinda progress system + order_mode.on_fill(oid, msg) diff --git a/piker/data/__init__.py b/piker/data/__init__.py index 782aa933..f2551993 100644 --- a/piker/data/__init__.py +++ b/piker/data/__init__.py @@ -186,7 +186,7 @@ class Feed: # the broker side must declare this key # in messages, though we could probably use # more then one? - topics=['trade_events'], + topics=['local_trades'], ) return self._trade_stream diff --git a/piker/ui/_chart.py b/piker/ui/_chart.py index 9e97617f..25636794 100644 --- a/piker/ui/_chart.py +++ b/piker/ui/_chart.py @@ -59,7 +59,7 @@ from ..log import get_logger from ._exec import run_qtractor, current_screen from ._interaction import ChartView, open_order_mode from .. import fsp -from .._ems import _ems_main +from .._ems import open_ems log = get_logger(__name__) @@ -958,8 +958,8 @@ async def _async_main( # inside the above mngr? # spawn EMS actor-service - to_ems_chan = await n.start( - _ems_main, + await n.start( + open_ems, order_mode, brokername, symbol,