From 7fb2c95ef1b3a30ad098a1e32b9eafe7bfe724c3 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Jun 2021 06:56:55 -0400 Subject: [PATCH 01/18] Factor daemon spawning logic, use it to spawn emsd --- piker/_daemon.py | 135 +++++++++++++++++++++++++------------- piker/clearing/_client.py | 26 +------- 2 files changed, 91 insertions(+), 70 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 07a584c3..799b1331 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -197,6 +197,66 @@ _data_mods = [ ] +class Brokerd: + locks = defaultdict(trio.Lock) + + +@asynccontextmanager +async def maybe_spawn_daemon( + + service_name: str, + spawn_func: Callable, + spawn_args: dict[str, Any], + # brokername: str, + loglevel: Optional[str] = None, + **kwargs, + +) -> tractor.Portal: + """ + If no ``service_name`` daemon-actor can be found, + spawn one in a local subactor and return a portal to it. + + """ + if loglevel: + get_console_log(loglevel) + + # serialize access to this section to avoid + # 2 or more tasks racing to create a daemon + lock = Brokerd.locks[service_name] + await lock.acquire() + + # attach to existing brokerd if possible + async with tractor.find_actor(service_name) as portal: + if portal is not None: + lock.release() + yield portal + return + + # ask root ``pikerd`` daemon to spawn the daemon we need if + # pikerd is not live we now become the root of the + # process tree + async with maybe_open_pikerd( + loglevel=loglevel, + **kwargs, + ) as pikerd_portal: + + if pikerd_portal is None: + # we are root so spawn brokerd directly in our tree + # the root nursery is accessed through process global state + # await spawn_brokerd(brokername, loglevel=loglevel) + await spawn_func(**spawn_args) + + else: + await pikerd_portal.run( + spawn_func, + **spawn_args, + ) + + async with tractor.wait_for_actor(service_name) as portal: + lock.release() + yield portal + + async def spawn_brokerd( brokername: str, @@ -242,10 +302,6 @@ async def spawn_brokerd( return dname -class Brokerd: - locks = defaultdict(trio.Lock) - - @asynccontextmanager async def maybe_spawn_brokerd( @@ -253,52 +309,20 @@ async def maybe_spawn_brokerd( loglevel: Optional[str] = None, **kwargs, -) -> tractor._portal.Portal: - """ - If no ``brokerd.{brokername}`` daemon-actor can be found, - spawn one in a local subactor and return a portal to it. +) -> tractor.Portal: + '''Helper to spawn a brokerd service. - """ - if loglevel: - get_console_log(loglevel) + ''' + async with maybe_spawn_daemon( - dname = f'brokerd.{brokername}' - - # serialize access to this section to avoid - # 2 or more tasks racing to create a daemon - lock = Brokerd.locks[brokername] - await lock.acquire() - - # attach to existing brokerd if possible - async with tractor.find_actor(dname) as portal: - if portal is not None: - lock.release() - yield portal - return - - # ask root ``pikerd`` daemon to spawn the daemon we need if - # pikerd is not live we now become the root of the - # process tree - async with maybe_open_pikerd( + f'brokerd.{brokername}', + spawn_func=spawn_brokerd, + spawn_args={'brokername': brokername, 'loglevel': loglevel}, loglevel=loglevel, **kwargs, - ) as pikerd_portal: - if pikerd_portal is None: - # we are root so spawn brokerd directly in our tree - # the root nursery is accessed through process global state - await spawn_brokerd(brokername, loglevel=loglevel) - - else: - await pikerd_portal.run( - spawn_brokerd, - brokername=brokername, - loglevel=loglevel, - ) - - async with tractor.wait_for_actor(dname) as portal: - lock.release() - yield portal + ) as portal: + yield portal async def spawn_emsd( @@ -328,3 +352,24 @@ async def spawn_emsd( **extra_tractor_kwargs ) return 'emsd' + + +@asynccontextmanager +async def maybe_open_emsd( + + brokername: str, + loglevel: Optional[str] = None, + **kwargs, + +) -> tractor._portal.Portal: # noqa + + async with maybe_spawn_daemon( + + 'emsd', + spawn_func=spawn_emsd, + spawn_args={'brokername': brokername, 'loglevel': loglevel}, + loglevel=loglevel, + **kwargs, + + ) as portal: + yield portal diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index e881a726..316056be 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -30,6 +30,7 @@ import tractor from ..data._source import Symbol from ..log import get_logger from ._ems import _emsd_main +from .._daemon import maybe_open_emsd log = get_logger(__name__) @@ -174,31 +175,6 @@ async def send_order_cmds(symbol_key: str): book._to_ems.send_nowait(cmd) -@asynccontextmanager -async def maybe_open_emsd( - brokername: str, -) -> tractor._portal.Portal: # noqa - - async with tractor.find_actor('emsd') as portal: - if portal is not None: - yield portal - return - - # ask remote daemon tree to spawn it - from .._daemon import spawn_emsd - - async with tractor.find_actor('pikerd') as portal: - assert portal - - name = await portal.run( - spawn_emsd, - brokername=brokername, - ) - - async with tractor.wait_for_actor(name) as portal: - yield portal - - @asynccontextmanager async def open_ems( broker: str, From 9931accc52134086c21e39da271bd0cb01fce8f4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Jun 2021 10:27:16 -0400 Subject: [PATCH 02/18] Port clearing systems to new tractor context api This avoids somewhat convoluted "hackery" making 2 one-way streams between the order client and the EMS and instead uses the new bi-directional streaming and context API from `tractor`. Add a router type to the EMS that gets setup by the initial service tree and which we'll eventually use to work toward multi-provider executions and order-trigger monitoring. Move to py3.9 style where possible throughout. --- piker/_daemon.py | 41 +++++----- piker/clearing/_client.py | 52 +++++++------ piker/clearing/_ems.py | 154 ++++++++++++++++++++++++-------------- 3 files changed, 150 insertions(+), 97 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 799b1331..5e102e4c 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -102,7 +102,9 @@ async def open_pikerd( assert _services is None # XXX: this may open a root actor as well - async with tractor.open_root_actor( + async with ( + tractor.open_root_actor( + # passed through to ``open_root_actor`` arbiter_addr=_tractor_kwargs['arbiter_addr'], name=_root_dname, @@ -113,10 +115,10 @@ async def open_pikerd( # TODO: eventually we should be able to avoid # having the root have more then permissions to # spawn other specialized daemons I think? - # enable_modules=[__name__], enable_modules=_root_modules, - - ) as _, tractor.open_nursery() as actor_nursery: + ) as _, + tractor.open_nursery() as actor_nursery, + ): async with trio.open_nursery() as service_nursery: # setup service mngr singleton instance @@ -137,6 +139,7 @@ async def open_pikerd( async def maybe_open_runtime( loglevel: Optional[str] = None, **kwargs, + ) -> None: """ Start the ``tractor`` runtime (a root actor) if none exists. @@ -159,6 +162,7 @@ async def maybe_open_runtime( async def maybe_open_pikerd( loglevel: Optional[str] = None, **kwargs, + ) -> Union[tractor._portal.Portal, Services]: """If no ``pikerd`` daemon-root-actor can be found start it and yield up (we should probably figure out returning a portal to self @@ -207,7 +211,6 @@ async def maybe_spawn_daemon( service_name: str, spawn_func: Callable, spawn_args: dict[str, Any], - # brokername: str, loglevel: Optional[str] = None, **kwargs, @@ -236,8 +239,10 @@ async def maybe_spawn_daemon( # pikerd is not live we now become the root of the # process tree async with maybe_open_pikerd( + loglevel=loglevel, **kwargs, + ) as pikerd_portal: if pikerd_portal is None: @@ -265,8 +270,6 @@ async def spawn_brokerd( ) -> tractor._portal.Portal: - from .data import _setup_persistent_brokerd - log.info(f'Spawning {brokername} broker daemon') brokermod = get_brokermod(brokername) @@ -286,13 +289,9 @@ async def spawn_brokerd( **tractor_kwargs ) - # TODO: so i think this is the perfect use case for supporting - # a cross-actor async context manager api instead of this - # shoort-and-forget task spawned in the root nursery, we'd have an - # async exit stack that we'd register the `portal.open_context()` - # call with and then have the ability to unwind the call whenevs. - # non-blocking setup of brokerd service nursery + from .data import _setup_persistent_brokerd + await _services.open_remote_ctx( portal, _setup_persistent_brokerd, @@ -327,7 +326,6 @@ async def maybe_spawn_brokerd( async def spawn_emsd( - brokername: str, loglevel: Optional[str] = None, **extra_tractor_kwargs @@ -338,10 +336,10 @@ async def spawn_emsd( """ log.info('Spawning emsd') - # TODO: raise exception when _services == None? global _services + assert _services - await _services.actor_n.start_actor( + portal = await _services.actor_n.start_actor( 'emsd', enable_modules=[ 'piker.clearing._ems', @@ -351,6 +349,15 @@ async def spawn_emsd( debug_mode=_services.debug_mode, # set by pikerd flag **extra_tractor_kwargs ) + + # non-blocking setup of clearing service + from .clearing._ems import _setup_persistent_emsd + + await _services.open_remote_ctx( + portal, + _setup_persistent_emsd, + ) + return 'emsd' @@ -367,7 +374,7 @@ async def maybe_open_emsd( 'emsd', spawn_func=spawn_emsd, - spawn_args={'brokername': brokername, 'loglevel': loglevel}, + spawn_args={'loglevel': loglevel}, loglevel=loglevel, **kwargs, diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 316056be..7d658ddb 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -36,6 +36,7 @@ from .._daemon import maybe_open_emsd log = get_logger(__name__) +# TODO: some kinda validation like this # class Order(msgspec.Struct): # action: str # price: float @@ -137,7 +138,11 @@ def get_orders( return _orders -async def send_order_cmds(symbol_key: str): +async def relay_order_cmds_from_sync_code( + symbol_key: str, + to_ems_stream: tractor.MsgStream, + +) -> None: """ Order streaming task: deliver orders transmitted from UI to downstream consumers. @@ -157,16 +162,15 @@ async def send_order_cmds(symbol_key: str): book = get_orders() orders_stream = book._from_order_book - # signal that ems connection is up and ready - book._ready_to_receive.set() - async for cmd in orders_stream: + print(cmd) if cmd['symbol'] == symbol_key: # send msg over IPC / wire log.info(f'Send order cmd:\n{pformat(cmd)}') - yield cmd + await to_ems_stream.send(cmd) + else: # XXX BRUTAL HACKZORZES !!! # re-insert for another consumer @@ -213,32 +217,32 @@ async def open_ems( - 'broker_filled' """ - actor = tractor.current_actor() - # wait for service to connect back to us signalling # ready for order commands book = get_orders() async with maybe_open_emsd(broker) as portal: - async with portal.open_stream_from( + async with ( - _emsd_main, - client_actor_name=actor.name, - broker=broker, - symbol=symbol.key, + # connect to emsd + portal.open_context( + _emsd_main, + broker=broker, + symbol=symbol.key, - ) as trades_stream: - with trio.fail_after(10): - await book._ready_to_receive.wait() + # TODO: ``first`` here should be the active orders/execs + # persistent on the ems so that loca UI's can be populated. + ) as (ctx, first), + + # open 2-way trade command stream + ctx.open_stream() as trades_stream, + ): + async with trio.open_nursery() as n: + n.start_soon( + relay_order_cmds_from_sync_code, + symbol.key, + trades_stream + ) - try: yield book, trades_stream - - finally: - # TODO: we want to eventually keep this up (by having - # the exec loop keep running in the pikerd tree) but for - # now we have to kill the context to avoid backpressure - # build-up on the shm write loop. - with trio.CancelScope(shield=True): - await trades_stream.aclose() diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index dbb0ff51..50a44426 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -21,11 +21,10 @@ In da suit parlances: "Execution management systems" from pprint import pformat import time from dataclasses import dataclass, field -from typing import ( - AsyncIterator, Dict, Callable, Tuple, -) +from typing import AsyncIterator, Callable from bidict import bidict +from pydantic import BaseModel import trio from trio_typing import TaskStatus import tractor @@ -89,11 +88,11 @@ class _DarkBook: broker: str # levels which have an executable action (eg. alert, order, signal) - orders: Dict[ + orders: dict[ str, # symbol - Dict[ + dict[ str, # uuid - Tuple[ + tuple[ Callable[[float], bool], # predicate str, # name dict, # cmd / msg type @@ -102,22 +101,13 @@ class _DarkBook: ] = field(default_factory=dict) # tracks most recent values per symbol each from data feed - lasts: Dict[ - Tuple[str, str], + lasts: dict[ + tuple[str, str], float ] = field(default_factory=dict) # mapping of broker order ids to piker ems ids - _broker2ems_ids: Dict[str, str] = field(default_factory=bidict) - - -_books: Dict[str, _DarkBook] = {} - - -def get_dark_book(broker: str) -> _DarkBook: - - global _books - return _books.setdefault(broker, _DarkBook(broker)) + _broker2ems_ids: dict[str, str] = field(default_factory=bidict) # XXX: this is in place to prevent accidental positions that are too @@ -255,10 +245,12 @@ async def exec_loop( to brokers. """ + global _router + # XXX: this should be initial price quote from target provider first_quote = await feed.receive() - book = get_dark_book(broker) + book = _router.get_dark_book(broker) book.lasts[(broker, symbol)] = first_quote[symbol]['last'] # TODO: wrap this in a more re-usable general api @@ -478,12 +470,14 @@ async def process_broker_trades( async def process_order_cmds( + ctx: tractor.Context, cmd_stream: 'tractor.ReceiveStream', # noqa symbol: str, feed: 'Feed', # noqa client: 'Client', # noqa dark_book: _DarkBook, + ) -> None: async for cmd in cmd_stream: @@ -509,6 +503,7 @@ async def process_order_cmds( try: dark_book.orders[symbol].pop(oid, None) + # TODO: move these to `tractor.MsgStream` await ctx.send_yield({ 'resp': 'dark_cancelled', 'oid': oid @@ -616,13 +611,15 @@ async def process_order_cmds( }) -@tractor.stream +@tractor.context async def _emsd_main( + ctx: tractor.Context, - client_actor_name: str, + # client_actor_name: str, broker: str, symbol: str, _mode: str = 'dark', # ('paper', 'dark', 'live') + ) -> None: """EMS (sub)actor entrypoint providing the execution management (micro)service which conducts broker @@ -649,9 +646,10 @@ async def _emsd_main( accept normalized trades responses, process and relay to ems client(s) """ - from ._client import send_order_cmds + # from ._client import send_order_cmds - dark_book = get_dark_book(broker) + global _router + dark_book = _router.get_dark_book(broker) # spawn one task per broker feed async with trio.open_nursery() as n: @@ -664,40 +662,84 @@ async def _emsd_main( ) as feed: # get a portal back to the client - async with tractor.wait_for_actor(client_actor_name) as portal: + # async with tractor.wait_for_actor(client_actor_name) as portal: - # connect back to the calling actor (the one that is - # acting as an EMS client and will submit orders) to - # receive requests pushed over a tractor stream - # using (for now) an async generator. - async with portal.open_stream_from( - send_order_cmds, - symbol_key=symbol, - ) as order_stream: + await ctx.started() - # start the condition scan loop - quote, feed, client = await n.start( - exec_loop, - ctx, - feed, - broker, - symbol, - _mode, - ) + # establish 2-way stream with requesting order-client + async with ctx.open_stream() as order_stream: - await n.start( - process_broker_trades, - ctx, - feed, - dark_book, - ) + # start the condition scan loop + quote, feed, client = await n.start( + exec_loop, + ctx, + feed, + broker, + symbol, + _mode, + ) - # start inbound order request processing - await process_order_cmds( - ctx, - order_stream, - symbol, - feed, - client, - dark_book, - ) + # begin processing order events from the target brokerd backend + await n.start( + process_broker_trades, + ctx, + feed, + dark_book, + ) + + # start inbound (from attached client) order request processing + await process_order_cmds( + ctx, + order_stream, + symbol, + feed, + client, + dark_book, + ) + + +class _Router(BaseModel): + '''Order router which manages per-broker dark books, alerts, + and clearing related data feed management. + + ''' + nursery: trio.Nursery + + feeds: dict[str, tuple[trio.CancelScope, float]] = {} + books: dict[str, _DarkBook] = {} + + class Config: + arbitrary_types_allowed = True + underscore_attrs_are_private = False + + def get_dark_book( + self, + brokername: str, + + ) -> _DarkBook: + + return self.books.setdefault(brokername, _DarkBook(brokername)) + + +_router: _Router = None + + +@tractor.context +async def _setup_persistent_emsd( + + ctx: tractor.Context, + +) -> None: + + global _router + + # spawn one task per broker feed + async with trio.open_nursery() as service_nursery: + _router = _Router(nursery=service_nursery) + + # TODO: send back the full set of persistent orders/execs persistent + await ctx.started() + + # we pin this task to keep the feeds manager active until the + # parent actor decides to tear it down + await trio.sleep_forever() From edf3af9777887f36f1259e2c45f543b60c40b9f7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Jun 2021 10:37:36 -0400 Subject: [PATCH 03/18] Drop waits to half-seconds --- piker/data/_web_bs.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py index 543ac19f..485f69c2 100644 --- a/piker/data/_web_bs.py +++ b/piker/data/_web_bs.py @@ -64,13 +64,13 @@ class NoBsWs: async def _connect( self, - tries: int = 10000, + tries: int = 1000, ) -> None: while True: try: await self._stack.aclose() except (DisconnectionTimeout, RuntimeError): - await trio.sleep(1) + await trio.sleep(0.5) else: break @@ -95,7 +95,7 @@ class NoBsWs: f'{self} connection bail with ' f'{type(err)}...retry attempt {i}' ) - await trio.sleep(1) + await trio.sleep(0.5) continue else: log.exception('ws connection fail...') From f4c9e20f0d80dae487bc3b204f3deec0433419f0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 1 Jun 2021 10:38:11 -0400 Subject: [PATCH 04/18] Avoid `numpy` type usage on the wire --- piker/data/feed.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/data/feed.py b/piker/data/feed.py index a0e2478b..0f856627 100644 --- a/piker/data/feed.py +++ b/piker/data/feed.py @@ -217,8 +217,8 @@ async def allocate_persistent_feed( times = shm.array['time'] delay_s = times[-1] - times[times != times[-1]][-1] - # pass OHLC sample rate in seconds - init_msg[symbol]['sample_rate'] = delay_s + # pass OHLC sample rate in seconds (be sure to use python int type) + init_msg[symbol]['sample_rate'] = int(delay_s) # yield back control to starting nursery task_status.started((init_msg, first_quote)) From f9238f3a8ad72353717f62bfba704691039aafa8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 3 Jun 2021 09:06:08 -0400 Subject: [PATCH 05/18] Validate client message updates --- piker/clearing/_client.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 7d658ddb..28fc54fa 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -96,9 +96,10 @@ class OrderBook: **data: dict, ) -> dict: cmd = self._sent_orders[uuid] - cmd.update(data) - self._sent_orders[uuid] = cmd - self._to_ems.send_nowait(cmd) + msg = cmd.dict() + msg.update(data) + self._sent_orders[uuid] = OrderMsg(**msg) + self._to_ems.send_nowait(msg) return cmd def cancel(self, uuid: str) -> bool: From 0bcad35c7041cda54dd72df5ae729a1ea407bc96 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Jun 2021 12:53:48 -0400 Subject: [PATCH 06/18] Set ack time on pydantic model --- piker/ui/order_mode.py | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index 22293efa..a49858f4 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -129,7 +129,7 @@ class OrderMode: line = self.lines.commit_line(uuid) req_msg = self.book._sent_orders.get(uuid) if req_msg: - req_msg['ack_time_ns'] = time.time_ns() + req_msg.ack_time_ns = time.time_ns() return line @@ -196,8 +196,10 @@ class OrderMode: def submit_exec( self, size: Optional[float] = None, + ) -> LevelLine: - """Send execution order to EMS. + """Send execution order to EMS return a level line to + represent the order on a chart. """ # register the "staged" line under the cursor @@ -226,6 +228,9 @@ class OrderMode: exec_mode=self._exec_mode, ) + # TODO: update the line once an ack event comes back + # from the EMS! + # make line graphic if order push was # sucessful line = self.lines.create_order_line( @@ -266,14 +271,6 @@ class OrderMode: price=line.value(), ) - # def on_key_press( - # self, - # key: - # mods: - # text: str, - # ) -> None: - # pass - @asynccontextmanager async def open_order_mode( From 02459cd964d067e487eb6aa896925ef9a6e42c63 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 4 Jun 2021 12:54:26 -0400 Subject: [PATCH 07/18] Use new top level portal import --- piker/brokers/ib.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 96398c44..915278c3 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -738,7 +738,7 @@ async def _trio_run_client_method( class _MethodProxy: def __init__( self, - portal: tractor._portal.Portal + portal: tractor.Portal ) -> None: self._portal = portal @@ -755,7 +755,12 @@ class _MethodProxy: ) -def get_client_proxy(portal, target=Client) -> _MethodProxy: +def get_client_proxy( + + portal: tractor.Portal, + target=Client, + +) -> _MethodProxy: proxy = _MethodProxy(portal) @@ -1197,8 +1202,10 @@ def pack_position(pos: Position) -> Dict[str, Any]: send_on_connect={'local_trades': 'start'} ) async def stream_trades( + loglevel: str = None, get_topics: Callable = None, + ) -> AsyncIterator[Dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging From 23094d862474b44a1ce5a0291ee62f2dd24b747a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jun 2021 11:45:17 -0400 Subject: [PATCH 08/18] Spec out brokerd 2-way trade dialogue messages --- piker/clearing/_messages.py | 238 ++++++++++++++++++++++++++++++++++++ 1 file changed, 238 insertions(+) create mode 100644 piker/clearing/_messages.py diff --git a/piker/clearing/_messages.py b/piker/clearing/_messages.py new file mode 100644 index 00000000..5667cb96 --- /dev/null +++ b/piker/clearing/_messages.py @@ -0,0 +1,238 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Clearing system messagingn types and protocols. + +""" +from typing import Optional, Union + +# TODO: try out just encoding/send direction for now? +# import msgspec +from pydantic import BaseModel + +# Client -> emsd + + +class Cancel(BaseModel): + '''Cancel msg for removing a dark (ems triggered) or + broker-submitted (live) trigger/order. + + ''' + action: str = 'cancel' + oid: str # uuid4 + symbol: str + + +class Order(BaseModel): + + action: str # {'buy', 'sell', 'alert'} + # internal ``emdsd`` unique "order id" + oid: str # uuid4 + symbol: str + + price: float + size: float + brokers: list[str] + + # Assigned once initial ack is received + # ack_time_ns: Optional[int] = None + + # determines whether the create execution + # will be submitted to the ems or directly to + # the backend broker + exec_mode: str # {'dark', 'live', 'paper'} + + +# Client <- emsd +# update msgs from ems which relay state change info +# from the active clearing engine. + + +class Status(BaseModel): + + name: str = 'status' + oid: str # uuid4 + time_ns: int + + # { + # 'dark_submitted', + # 'dark_cancelled', + # 'dark_triggered', + + # 'broker_submitted', + # 'broker_cancelled', + # 'broker_executed', + # 'broker_filled', + + # 'alert_submitted', + # 'alert_triggered', + + # 'position', + + # } + resp: str # "response", see above + + # symbol: str + + # trigger info + trigger_price: Optional[float] = None + # price: float + + # broker: Optional[str] = None + + # this maps normally to the ``BrokerdOrder.reqid`` below, an id + # normally allocated internally by the backend broker routing system + broker_reqid: Optional[Union[int, str]] = None + + # for relaying backend msg data "through" the ems layer + brokerd_msg: dict = {} + + +# emsd -> brokerd +# requests *sent* from ems to respective backend broker daemon + +class BrokerdCancel(BaseModel): + + action: str = 'cancel' + oid: str # piker emsd order id + time_ns: int + + # "broker request id": broker specific/internal order id if this is + # None, creates a new order otherwise if the id is valid the backend + # api must modify the existing matching order. If the broker allows + # for setting a unique order id then this value will be relayed back + # on the emsd order request stream as the ``BrokerdOrderAck.reqid`` + # field + reqid: Optional[Union[int, str]] = None + + +class BrokerdOrder(BaseModel): + + action: str # {buy, sell} + oid: str + time_ns: int + + # "broker request id": broker specific/internal order id if this is + # None, creates a new order otherwise if the id is valid the backend + # api must modify the existing matching order. If the broker allows + # for setting a unique order id then this value will be relayed back + # on the emsd order request stream as the ``BrokerdOrderAck.reqid`` + # field + reqid: Optional[Union[int, str]] = None + + symbol: str # symbol. ? + price: float + size: float + + +# emsd <- brokerd +# requests *received* to ems from broker backend + + +class BrokerdOrderAck(BaseModel): + '''Immediate reponse to a brokerd order request providing + the broker specifci unique order id. + + ''' + name: str = 'ack' + + # defined and provided by backend + reqid: Union[int, str] + + # emsd id originally sent in matching request msg + oid: str + + +class BrokerdStatus(BaseModel): + + name: str = 'status' + reqid: Union[int, str] + time_ns: int + + # { + # 'submitted', + # 'cancelled', + # 'executed', + # } + status: str + + filled: float = 0.0 + reason: str = '' + remaining: float = 0.0 + + # XXX: better design/name here? + # flag that can be set to indicate a message for an order + # event that wasn't originated by piker's emsd (eg. some external + # trading system which does it's own order control but that you + # might want to "track" using piker UIs/systems). + external: bool = False + + # XXX: not required schema as of yet + broker_details: dict = { + 'name': '', + } + + +class BrokerdFill(BaseModel): + '''A single message indicating a "fill-details" event from the broker + if avaiable. + + ''' + name: str = 'fill' + reqid: Union[int, str] + time_ns: int + + # order exeuction related + action: str + size: float + price: float + + broker_details: dict = {} # meta-data (eg. commisions etc.) + + # brokerd timestamp required for order mode arrow placement on x-axis + + # TODO: maybe int if we force ns? + # we need to normalize this somehow since backends will use their + # own format and likely across many disparate epoch clocks... + broker_time: float + + +class BrokerdError(BaseModel): + '''Optional error type that can be relayed to emsd for error handling. + + This is still a TODO thing since we're not sure how to employ it yet. + ''' + name: str = 'error' + reqid: Union[int, str] + + symbol: str + reason: str + broker_details: dict = {} + + +class BrokerdPosition(BaseModel): + '''Position update event from brokerd. + + ''' + name: str = 'position' + + broker: str + account: str + symbol: str + currency: str + size: float + avg_price: float From 0dabc6ad267834fba220dd0998af21455bde788a Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jun 2021 12:06:47 -0400 Subject: [PATCH 09/18] Port paper engine to new msgs and run in sub-actor This makes the paper engine look IPC-wise exactly like any broker-provider backend module and uses the new ``trades_dialogue()`` 2-way streaming endpoint for commanding order requests. This serves as a first step toward truly distributed forward testing since the paper engine can now be run out-of tree from `pikerd` if needed thus demonstrating how real-time clearing signals can be shared between fully distinct services. --- piker/clearing/_paper_engine.py | 290 +++++++++++++++++++++++--------- 1 file changed, 212 insertions(+), 78 deletions(-) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 740345f5..e669fd42 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -18,17 +18,28 @@ Fake trading for forward testing. """ +from contextlib import asynccontextmanager from datetime import datetime from operator import itemgetter import time -from typing import Tuple, Optional +from typing import Tuple, Optional, Callable import uuid from bidict import bidict import trio +import tractor from dataclasses import dataclass +from .. import data from ..data._normalize import iterticks +from ..log import get_logger +from ._messages import ( + BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, + BrokerdFill, +) + + +log = get_logger(__name__) @dataclass @@ -41,8 +52,8 @@ class PaperBoi: """ broker: str - _to_trade_stream: trio.abc.SendChannel - trade_stream: trio.abc.ReceiveChannel + + ems_trades_stream: tractor.MsgStream # map of paper "live" orders which be used # to simulate fills based on paper engine settings @@ -61,20 +72,20 @@ class PaperBoi: price: float, action: str, size: float, - brid: Optional[str], + reqid: Optional[str], ) -> int: """Place an order and return integer request id provided by client. """ - - if brid is None: + is_modify: bool = False + if reqid is None: reqid = str(uuid.uuid4()) else: # order is already existing, this is a modify - (oid, symbol, action, old_price) = self._reqids[brid] + (oid, symbol, action, old_price) = self._reqids[reqid] assert old_price != price - reqid = brid + is_modify = True # register order internally self._reqids[reqid] = (oid, symbol, action, price) @@ -90,22 +101,16 @@ class PaperBoi: # in the broker trades event processing loop await trio.sleep(0.05) - await self._to_trade_stream.send({ - - 'local_trades': ('status', { - - 'time_ns': time.time_ns(), - 'reqid': reqid, - - 'status': 'submitted', - 'broker': self.broker, - # 'cmd': cmd, # original request message - - 'paper_info': { - 'oid': oid, - }, - }), - }) + msg = BrokerdStatus( + status='submitted', + reqid=reqid, + broker=self.broker, + time_ns=time.time_ns(), + filled=0.0, + reason='paper_trigger', + remaining=size, + ) + await self.ems_trades_stream.send(msg.dict()) # if we're already a clearing price simulate an immediate fill if ( @@ -129,7 +134,7 @@ class PaperBoi: # and trigger by the simulated clearing task normally # running ``simulate_fills()``. - if brid is not None: + if is_modify: # remove any existing order for the old price orders[symbol].pop((oid, old_price)) @@ -144,7 +149,6 @@ class PaperBoi: ) -> None: # TODO: fake market simulation effects - # await self._to_trade_stream.send( oid, symbol, action, price = self._reqids[reqid] if action == 'buy': @@ -155,21 +159,14 @@ class PaperBoi: # TODO: net latency model await trio.sleep(0.05) - await self._to_trade_stream.send({ - - 'local_trades': ('status', { - - 'time_ns': time.time_ns(), - 'oid': oid, - 'reqid': reqid, - - 'status': 'cancelled', - 'broker': self.broker, - # 'cmd': cmd, # original request message - - 'paper': True, - }), - }) + msg = BrokerdStatus( + status='cancelled', + oid=oid, + reqid=reqid, + broker=self.broker, + time_ns=time.time_ns(), + ) + await self.ems_trades_stream.send(msg.dict()) async def fake_fill( self, @@ -191,56 +188,51 @@ class PaperBoi: # TODO: net latency model await trio.sleep(0.05) - # the trades stream expects events in the form - # {'local_trades': (event_name, msg)} - await self._to_trade_stream.send({ + msg = BrokerdFill( - 'local_trades': ('fill', { + reqid=reqid, + time_ns=time.time_ns(), - 'status': 'filled', - 'broker': self.broker, - # converted to float by us in ib backend - 'broker_time': datetime.now().timestamp(), - - 'action': action, - 'size': size, - 'price': price, - 'remaining': 0 if order_complete else remaining, - - # normally filled by real `brokerd` daemon - 'time': time.time_ns(), - 'time_ns': time.time_ns(), # cuz why not - - # fake ids - 'reqid': reqid, + action=action, + size=size, + price=price, + broker_time=datetime.now().timestamp(), + broker_details={ 'paper_info': { 'oid': oid, }, + # mocking ib + 'name': self.broker + '_paper', + }, + ) + await self.ems_trades_stream.send(msg.dict()) - # XXX: fields we might not need to emulate? - # execution id from broker - # 'execid': execu.execId, - # 'cmd': cmd, # original request message? - }), - }) if order_complete: - await self._to_trade_stream.send({ - 'local_trades': ('status', { - 'reqid': reqid, - 'status': 'filled', - 'broker': self.broker, - 'filled': size, - 'remaining': 0 if order_complete else remaining, + msg = BrokerdStatus( - # converted to float by us in ib backend - 'broker_time': datetime.now().timestamp(), + reqid=reqid, + time_ns=time.time_ns(), + + status='filled', + # broker=self.broker, + filled=size, + remaining=0 if order_complete else remaining, + + action=action, + size=size, + price=price, + + # broker=self.broker, + broker_details={ 'paper_info': { 'oid': oid, }, - }), - }) + 'name': self.broker, + }, + ) + await self.ems_trades_stream.send(msg.dict()) async def simulate_fills( @@ -327,3 +319,145 @@ async def simulate_fills( else: # prices are iterated in sorted order so we're done break + + +# class MockBrokerdMsgStream: + + +# async def MockContext(*args, **kwargs): + + +async def handle_order_requests( + + client: PaperBoi, + ems_order_stream: tractor.MsgStream, + +) -> None: + + # order_request: dict + async for request_msg in ems_order_stream: + + action = request_msg['action'] + + if action in {'buy', 'sell'}: + # validate + order = BrokerdOrder(**request_msg) + + # call our client api to submit the order + reqid = await client.submit_limit( + + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + + # XXX: by default 0 tells ``ib_insync`` methods that + # there is no existing order so ask the client to create + # a new one (which it seems to do by allocating an int + # counter - collision prone..) + reqid=order.reqid, + ) + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + + # ems order request id + oid=order.oid, + + # broker specific request id + reqid=reqid, + + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + + await client.submit_cancel( + reqid=msg.reqid + ) + + else: + log.error(f'Unknown order command: {request_msg}') + + +@tractor.context +async def trades_dialogue( + + ctx: tractor.Context, + broker: str, + symbol: str, + loglevel: str = None, + +) -> None: + + async with ( + + data.open_feed( + broker, + [symbol], + loglevel=loglevel, + ) as feed, + + ): + # TODO: load paper positions per broker from .toml config file + # and pass as symbol to position data mapping: ``dict[str, dict]`` + # await ctx.started(all_positions) + await ctx.started({}) + + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + + client = PaperBoi( + broker, + ems_stream, + _buys={}, + _sells={}, + + _reqids={}, + ) + + n.start_soon(handle_order_requests, client, ems_stream) + + # paper engine simulator clearing task + await simulate_fills(feed.stream, client) + + +@asynccontextmanager +async def open_paperboi( + broker: str, + symbol: str, + loglevel: str, + +) -> Callable: + '''Spawn a paper engine actor and yield through access to + its context. + + ''' + service_name = f'paperboi.{broker}' + + async with ( + tractor.find_actor(service_name) as portal, + tractor.open_nursery() as tn, + ): + # only spawn if no paperboi already is up + # (we likely don't need more then one proc for basic + # simulated order clearing) + if portal is None: + portal = await tn.start_actor( + service_name, + enable_modules=[__name__] + ) + + async with portal.open_context( + trades_dialogue, + broker=broker, + symbol=symbol, + loglevel=loglevel, + + ) as (ctx, first): + yield ctx, first From 6e58f31fd8f74b9e236062824d408be3c0d9461f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jun 2021 12:14:45 -0400 Subject: [PATCH 10/18] Port EMS to typed messaging + bidir streaming This moves the entire clearing system to use typed messages using `pydantic.BaseModel` such that the streamed request-response order submission protocols can be explicitly viewed in terms of message schema, flow, and sequencing. Using the explicit message formats we can now dig into simplifying and normalizing across broker provider apis to get the best uniformity and simplicity. The order submission sequence is now fully async: an order request is expected to be explicitly acked with a new message and if cancellation is requested by the client before the ack arrives, the cancel message is stashed and then later sent immediately on receipt of the order submission's ack from the backend broker. Backend brokers are now controlled using a 2-way request-response streaming dialogue which is fully api agnostic of the clearing system's core processing; This leverages the new bi-directional streaming apis from `tractor`. The clearing core (emsd) was also simplified by moving the paper engine to it's own sub-actor and making it api-symmetric with expected `brokerd` endpoints. A couple of the ems status messages were changed/added: 'dark_executed' -> 'dark_triggered' added 'alert_triggered' More cleaning of old code to come! --- piker/clearing/_client.py | 77 ++-- piker/clearing/_ems.py | 872 +++++++++++++++++++++++++------------- piker/ui/order_mode.py | 52 ++- 3 files changed, 653 insertions(+), 348 deletions(-) diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 28fc54fa..47c79636 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -19,34 +19,23 @@ Orders and execution client API. """ from contextlib import asynccontextmanager -from typing import Dict, Tuple, List +from typing import Dict from pprint import pformat from dataclasses import dataclass, field import trio import tractor -# import msgspec from ..data._source import Symbol from ..log import get_logger from ._ems import _emsd_main from .._daemon import maybe_open_emsd +from ._messages import Order, Cancel log = get_logger(__name__) -# TODO: some kinda validation like this -# class Order(msgspec.Struct): -# action: str -# price: float -# size: float -# symbol: str -# brokers: List[str] -# oid: str -# exec_mode: str - - @dataclass class OrderBook: """Buy-side (client-side ?) order book ctl and tracking. @@ -64,31 +53,34 @@ class OrderBook: _to_ems: trio.abc.SendChannel _from_order_book: trio.abc.ReceiveChannel - _sent_orders: Dict[str, dict] = field(default_factory=dict) + _sent_orders: Dict[str, Order] = field(default_factory=dict) _ready_to_receive: trio.Event = trio.Event() def send( + self, uuid: str, symbol: str, - brokers: List[str], + brokers: list[str], price: float, size: float, action: str, exec_mode: str, + ) -> dict: - cmd = { - 'action': action, - 'price': price, - 'size': size, - 'symbol': symbol, - 'brokers': brokers, - 'oid': uuid, - 'exec_mode': exec_mode, # dark or live - } - self._sent_orders[uuid] = cmd - self._to_ems.send_nowait(cmd) - return cmd + msg = Order( + action=action, + price=price, + size=size, + symbol=symbol, + brokers=brokers, + oid=uuid, + exec_mode=exec_mode, # dark or live + ) + + self._sent_orders[uuid] = msg + self._to_ems.send_nowait(msg.dict()) + return msg def update( self, @@ -98,28 +90,27 @@ class OrderBook: cmd = self._sent_orders[uuid] msg = cmd.dict() msg.update(data) - self._sent_orders[uuid] = OrderMsg(**msg) + self._sent_orders[uuid] = Order(**msg) self._to_ems.send_nowait(msg) return cmd def cancel(self, uuid: str) -> bool: - """Cancel an order (or alert) from the EMS. + """Cancel an order (or alert) in the EMS. """ cmd = self._sent_orders[uuid] - msg = { - 'action': 'cancel', - 'oid': uuid, - 'symbol': cmd['symbol'], - } - self._to_ems.send_nowait(msg) + msg = Cancel( + oid=uuid, + symbol=cmd.symbol, + ) + self._to_ems.send_nowait(msg.dict()) _orders: OrderBook = None def get_orders( - emsd_uid: Tuple[str, str] = None + emsd_uid: tuple[str, str] = None ) -> OrderBook: """" OrderBook singleton factory per actor. @@ -139,7 +130,10 @@ def get_orders( return _orders +# TODO: we can get rid of this relay loop once we move +# order_mode inputs to async code! async def relay_order_cmds_from_sync_code( + symbol_key: str, to_ems_stream: tractor.MsgStream, @@ -184,7 +178,8 @@ async def relay_order_cmds_from_sync_code( async def open_ems( broker: str, symbol: Symbol, -) -> None: + +) -> (OrderBook, tractor.MsgStream, dict): """Spawn an EMS daemon and begin sending orders and receiving alerts. @@ -232,9 +227,9 @@ async def open_ems( broker=broker, symbol=symbol.key, - # TODO: ``first`` here should be the active orders/execs - # persistent on the ems so that loca UI's can be populated. - ) as (ctx, first), + # TODO: ``first`` here should be the active orders/execs + # persistent on the ems so that loca UI's can be populated. + ) as (ctx, positions), # open 2-way trade command stream ctx.open_stream() as trades_stream, @@ -246,4 +241,4 @@ async def open_ems( trades_stream ) - yield book, trades_stream + yield book, trades_stream, positions diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 50a44426..cd0795b3 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -32,7 +32,12 @@ import tractor from .. import data from ..log import get_logger from ..data._normalize import iterticks -from ._paper_engine import PaperBoi, simulate_fills +from . import _paper_engine as paper +from ._messages import ( + Status, Order, + BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus, + BrokerdFill, BrokerdError, BrokerdPosition, +) log = get_logger(__name__) @@ -106,8 +111,9 @@ class _DarkBook: float ] = field(default_factory=dict) - # mapping of broker order ids to piker ems ids - _broker2ems_ids: dict[str, str] = field(default_factory=bidict) + # mapping of piker ems order ids to current brokerd order flow message + _ems_entries: dict[str, str] = field(default_factory=dict) + _ems2brokerd_ids: dict[str, str] = field(default_factory=bidict) # XXX: this is in place to prevent accidental positions that are too @@ -117,13 +123,20 @@ class _DarkBook: _DEFAULT_SIZE: float = 1.0 -async def execute_triggers( +async def clear_dark_triggers( + + # ctx: tractor.Context, + brokerd_orders_stream: tractor.MsgStream, + ems_client_order_stream: tractor.MsgStream, + quote_stream: tractor.ReceiveMsgStream, # noqa + broker: str, symbol: str, - stream: 'tractor.ReceiveStream', # noqa - ctx: tractor.Context, - client: 'Client', # noqa + # client: 'Client', # noqa + # order_msg_stream: 'Client', # noqa + book: _DarkBook, + ) -> None: """Core dark order trigger loop. @@ -133,7 +146,7 @@ async def execute_triggers( """ # this stream may eventually contain multiple symbols # XXX: optimize this for speed! - async for quotes in stream: + async for quotes in quote_stream: # TODO: numba all this! @@ -169,9 +182,15 @@ async def execute_triggers( # majority of iterations will be non-matches continue - action = cmd['action'] + action: str = cmd['action'] + symbol: str = cmd['symbol'] - if action != 'alert': + if action == 'alert': + # nothing to do but relay a status + # message back to the requesting ems client + resp = 'alert_triggered' + + else: # executable order submission # submit_price = price + price*percent_away @@ -181,47 +200,89 @@ async def execute_triggers( f'Dark order triggered for price {price}\n' f'Submitting order @ price {submit_price}') - reqid = await client.submit_limit( + # TODO: port to BrokerdOrder message sending + msg = BrokerdOrder( + action=cmd['action'], oid=oid, + time_ns=time.time_ns(), + # this is a brand new order request for the - # underlying broker so we set out "broker request - # id" (brid) as nothing so that the broker - # client knows that we aren't trying to modify - # an existing order. - brid=None, + # underlying broker so we set a "broker + # request id" (brid) to "nothing" so that the + # broker client knows that we aren't trying + # to modify an existing order-request. + reqid=None, symbol=sym, - action=cmd['action'], price=submit_price, size=cmd['size'], ) + await brokerd_orders_stream.send(msg.dict()) + # mark this entry as having send an order request + book._ems_entries[oid] = msg - # register broker request id to ems id - book._broker2ems_ids[reqid] = oid + resp = 'dark_triggered' - else: - # alerts have no broker request id - reqid = '' + # an internal brokerd-broker specific + # order-request id is expected to be generated - resp = { - 'resp': 'dark_executed', - 'time_ns': time.time_ns(), - 'trigger_price': price, + # reqid = await client.submit_limit( - 'cmd': cmd, # original request message + # oid=oid, - 'broker_reqid': reqid, - 'broker': broker, - 'oid': oid, # piker order id + # # this is a brand new order request for the + # # underlying broker so we set a "broker + # # request id" (brid) to "nothing" so that the + # # broker client knows that we aren't trying + # # to modify an existing order-request. + # brid=None, - } + # symbol=sym, + # action=cmd['action'], + # price=submit_price, + # size=cmd['size'], + # ) + + # # register broker request id to ems id + + # else: + # # alerts have no broker request id + # reqid = '' + + # resp = { + # 'resp': 'dark_executed', + # 'cmd': cmd, # original request message + + # 'time_ns': time.time_ns(), + # 'trigger_price': price, + + # 'broker_reqid': reqid, + # 'broker': broker, + # 'oid': oid, # piker order id + + # } + msg = Status( + oid=oid, # piker order id + resp=resp, + time_ns=time.time_ns(), + + symbol=symbol, + trigger_price=price, + + # broker_reqid=reqid, + broker_details={'name': broker}, + + cmd=cmd, # original request message + + ).dict() # remove exec-condition from set log.info(f'removing pred for {oid}') execs.pop(oid) - await ctx.send_yield(resp) + # await ctx.send_yield(resp) + await ems_client_order_stream.send(msg) else: # condition scan loop complete log.debug(f'execs are {execs}') @@ -231,78 +292,49 @@ async def execute_triggers( # print(f'execs scan took: {time.time() - start}') -async def exec_loop( +# async def start_clearing( - ctx: tractor.Context, - feed: 'Feed', # noqa - broker: str, - symbol: str, - _exec_mode: str, - task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, +# # ctx: tractor.Context, +# brokerd_order_stream: tractor.MsgStream, +# quote_stream: tractor.MsgStream, -) -> AsyncIterator[dict]: - """Main scan loop for order execution conditions and submission - to brokers. +# # client: 'Client', - """ - global _router +# # feed: 'Feed', # noqa +# broker: str, +# symbol: str, +# _exec_mode: str, - # XXX: this should be initial price quote from target provider - first_quote = await feed.receive() +# book: _DarkBook, - book = _router.get_dark_book(broker) - book.lasts[(broker, symbol)] = first_quote[symbol]['last'] +# # task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, - # TODO: wrap this in a more re-usable general api - client_factory = getattr(feed.mod, 'get_client_proxy', None) +# ) -> AsyncIterator[dict]: +# """Main scan loop for order execution conditions and submission +# to brokers. - if client_factory is not None and _exec_mode != 'paper': +# """ +# async with trio.open_nursery() as n: - # we have an order API for this broker - client = client_factory(feed._brokerd_portal) +# # trigger scan and exec loop +# n.start_soon( +# trigger_executions, - else: - # force paper mode - log.warning(f'Entering paper trading mode for {broker}') +# brokerd_order_stream, +# quote_stream, - client = PaperBoi( - broker, - *trio.open_memory_channel(100), - _buys={}, - _sells={}, +# broker, +# symbol, +# book +# # ctx, +# # client, +# ) - _reqids={}, - ) - - # for paper mode we need to mock this trades response feed - # so we pass a duck-typed feed-looking mem chan which is fed - # fill and submission events from the exec loop - feed._trade_stream = client.trade_stream - - # init the trades stream - client._to_trade_stream.send_nowait({'local_trades': 'start'}) - - _exec_mode = 'paper' - - # return control to parent task - task_status.started((first_quote, feed, client)) - - stream = feed.stream - async with trio.open_nursery() as n: - n.start_soon( - execute_triggers, - broker, - symbol, - stream, - ctx, - client, - book - ) - - if _exec_mode == 'paper': - # TODO: make this an actual broadcast channels as in: - # https://github.com/python-trio/trio/issues/987 - n.start_soon(simulate_fills, stream, client) +# # # paper engine simulator task +# # if _exec_mode == 'paper': +# # # TODO: make this an actual broadcast channels as in: +# # # https://github.com/python-trio/trio/issues/987 +# # n.start_soon(simulate_fills, quote_stream, client) # TODO: lots of cases still to handle @@ -315,11 +347,17 @@ async def exec_loop( # reqId 1550: Order held while securities are located.'), # status='PreSubmitted', message='')], -async def process_broker_trades( - ctx: tractor.Context, - feed: 'Feed', # noqa +async def translate_and_relay_brokerd_events( + + # ctx: tractor.Context, + broker: str, + ems_client_order_stream: tractor.MsgStream, + brokerd_trades_stream: tractor.MsgStream, book: _DarkBook, + + # feed: 'Feed', # noqa task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, + ) -> AsyncIterator[dict]: """Trades update loop - receive updates from broker, convert to EMS responses, transmit to ordering client(s). @@ -339,198 +377,336 @@ async def process_broker_trades( {'presubmitted', 'submitted', 'cancelled', 'inactive'} """ - broker = feed.mod.name + # broker = feed.mod.name # TODO: make this a context # in the paper engine case this is just a mem receive channel - async with feed.receive_trades_data() as trades_stream: + # async with feed.receive_trades_data() as brokerd_trades_stream: - first = await trades_stream.__anext__() + # first = await brokerd_trades_stream.__anext__() - # startup msg expected as first from broker backend - assert first['local_trades'] == 'start' - task_status.started() + # startup msg expected as first from broker backend + # assert first['local_trades'] == 'start' + # task_status.started() - async for event in trades_stream: + async for brokerd_msg in brokerd_trades_stream: - name, msg = event['local_trades'] + # name, msg = event['local_trades'] + name = brokerd_msg['name'] - log.info(f'Received broker trade event:\n{pformat(msg)}') + log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}') - if name == 'position': - msg['resp'] = 'position' + if name == 'position': + # msg['resp'] = 'position' + + # relay through position msgs immediately + await ems_client_order_stream.send( + BrokerdPosition(**brokerd_msg).dict() + ) + continue + + # Get the broker (order) request id, this **must** be normalized + # into messaging provided by the broker backend + reqid = brokerd_msg['reqid'] + + # all piker originated requests will have an ems generated oid field + oid = brokerd_msg.get( + 'oid', + book._ems2brokerd_ids.inverse.get(reqid) + ) + + if oid is None: + + # XXX: paper clearing special cases + # paper engine race case: ``Client.submit_limit()`` hasn't + # returned yet and provided an output reqid to register + # locally, so we need to retreive the oid that was already + # packed at submission since we already know it ahead of + # time + paper = brokerd_msg['broker_details'].get('paper_info') + if paper: + # paperboi keeps the ems id up front + oid = paper['oid'] + + else: + # may be an order msg specified as "external" to the + # piker ems flow (i.e. generated by some other + # external broker backend client (like tws for ib) + ext = brokerd_msg.get('external') + if ext: + log.error(f"External trade event {ext}") + + continue + else: + # check for existing live flow entry + entry = book._ems_entries.get(oid) + + # initial response to brokerd order request + if name == 'ack': + + # register the brokerd request id (that was likely + # generated internally) with our locall ems order id for + # reverse lookup later. a BrokerdOrderAck **must** be + # sent after an order request in order to establish this + # id mapping. + book._ems2brokerd_ids[oid] = reqid + + # new order which has not yet be registered into the + # local ems book, insert it now and handle 2 cases: + + # - the order has previously been requested to be + # cancelled by the ems controlling client before we + # received this ack, in which case we relay that cancel + # signal **asap** to the backend broker + if entry.action == 'cancel': + # assign newly providerd broker backend request id + entry.reqid = reqid + + # tell broker to cancel immediately + await brokerd_trades_stream.send(entry.dict()) + + # - the order is now active and will be mirrored in + # our book -> registered as live flow + else: + # update the flow with the ack msg + book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg) - # relay through - await ctx.send_yield(msg) continue - # Get the broker (order) request id, this **must** be normalized - # into messaging provided by the broker backend - reqid = msg['reqid'] + # a live flow now exists + oid = entry.oid - # make response packet to EMS client(s) - oid = book._broker2ems_ids.get(reqid) + # make response packet to EMS client(s) + # reqid = book._ems_entries.get(oid) - if oid is None: - # paper engine race case: ``Client.submit_limit()`` hasn't - # returned yet and provided an output reqid to register - # locally, so we need to retreive the oid that was already - # packed at submission since we already know it ahead of - # time - paper = msg.get('paper_info') - if paper: - oid = paper['oid'] + # # msg is for unknown emsd order id + # if oid is None: + # oid = msg['oid'] + # # XXX: paper clearing special cases + # # paper engine race case: ``Client.submit_limit()`` hasn't + # # returned yet and provided an output reqid to register + # # locally, so we need to retreive the oid that was already + # # packed at submission since we already know it ahead of + # # time + # paper = msg.get('paper_info') + # if paper: + # oid = paper['oid'] + + # else: + # msg.get('external') + # if not msg: + # log.error(f"Unknown trade event {event}") + + # continue + + # resp = { + # 'resp': None, # placeholder + # 'oid': oid + # } + resp = None + broker_details = {} + + if name in ( + 'error', + ): + # TODO: figure out how this will interact with EMS clients + # for ex. on an error do we react with a dark orders + # management response, like cancelling all dark orders? + + # This looks like a supervision policy for pending orders on + # some unexpected failure - something we need to think more + # about. In most default situations, with composed orders + # (ex. brackets), most brokers seem to use a oca policy. + + msg = BrokerdError(**brokerd_msg) + + # XXX should we make one when it's blank? + log.error(pformat(msg)) + + # TODO: getting this bs, prolly need to handle status messages + # 'Market data farm connection is OK:usfarm.nj' + + # another stupid ib error to handle + # if 10147 in message: cancel + + # don't relay message to order requester client + continue + + elif name in ( + 'status', + ): + # TODO: templating the ib statuses in comparison with other + # brokers is likely the way to go: + # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 + # short list: + # - PendingSubmit + # - PendingCancel + # - PreSubmitted (simulated orders) + # - ApiCancelled (cancelled by client before submission + # to routing) + # - Cancelled + # - Filled + # - Inactive (reject or cancelled but not by trader) + + # everyone doin camel case + msg = BrokerdStatus(**brokerd_msg) + # status = msg['status'].lower() + + if msg.status == 'filled': + + # conditional execution is fully complete, no more + # fills for the noted order + if not msg.remaining: + + resp = 'broker_executed' + + log.info(f'Execution for {oid} is complete!') + + + # just log it else: - msg.get('external') - if not msg: - log.error(f"Unknown trade event {event}") - - continue - - resp = { - 'resp': None, # placeholder - 'oid': oid - } - - if name in ( - 'error', - ): - # TODO: figure out how this will interact with EMS clients - # for ex. on an error do we react with a dark orders - # management response, like cancelling all dark orders? - - # This looks like a supervision policy for pending orders on - # some unexpected failure - something we need to think more - # about. In most default situations, with composed orders - # (ex. brackets), most brokers seem to use a oca policy. - - message = msg['message'] - - # XXX should we make one when it's blank? - log.error(pformat(message)) - - # TODO: getting this bs, prolly need to handle status messages - # 'Market data farm connection is OK:usfarm.nj' - - # another stupid ib error to handle - # if 10147 in message: cancel - - # don't relay message to order requester client - continue - - elif name in ( - 'status', - ): - # TODO: templating the ib statuses in comparison with other - # brokers is likely the way to go: - # https://interactivebrokers.github.io/tws-api/interfaceIBApi_1_1EWrapper.html#a17f2a02d6449710b6394d0266a353313 - # short list: - # - PendingSubmit - # - PendingCancel - # - PreSubmitted (simulated orders) - # - ApiCancelled (cancelled by client before submission - # to routing) - # - Cancelled - # - Filled - # - Inactive (reject or cancelled but not by trader) - - # everyone doin camel case - status = msg['status'].lower() - - if status == 'filled': - - # conditional execution is fully complete, no more - # fills for the noted order - if not msg['remaining']: - - resp['resp'] = 'broker_executed' - - log.info(f'Execution for {oid} is complete!') - - # just log it - else: - log.info(f'{broker} filled {msg}') - - else: - # one of (submitted, cancelled) - resp['resp'] = 'broker_' + status - - elif name in ( - 'fill', - ): - # proxy through the "fill" result(s) - resp['resp'] = 'broker_filled' - resp.update(msg) - - log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') - - # respond to requesting client - await ctx.send_yield(resp) + log.info(f'{broker} filled {msg}') -async def process_order_cmds( + else: + # one of {submitted, cancelled} + resp = 'broker_' + msg.status + + # pass the BrokerdStatus msg inside the broker details field + broker_details = msg.dict() + + elif name in ( + 'fill', + ): + msg = BrokerdFill(**brokerd_msg) + + # proxy through the "fill" result(s) + resp = 'broker_filled' + broker_details = msg.dict() + + log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}') + + else: + raise ValueError(f'Brokerd message {brokerd_msg} is invalid') + + # Create and relay EMS response status message + resp = Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=broker_details, + ) + # relay response to requesting EMS client + await ems_client_order_stream.send(resp.dict()) + + +async def process_client_order_cmds( + + # ctx: tractor.Context, + client_order_stream: tractor.MsgStream, # noqa + brokerd_order_stream: tractor.MsgStream, - ctx: tractor.Context, - cmd_stream: 'tractor.ReceiveStream', # noqa symbol: str, feed: 'Feed', # noqa - client: 'Client', # noqa + # client: 'Client', # noqa dark_book: _DarkBook, ) -> None: - async for cmd in cmd_stream: + # cmd: dict + async for cmd in client_order_stream: log.info(f'Received order cmd:\n{pformat(cmd)}') action = cmd['action'] oid = cmd['oid'] - - brid = dark_book._broker2ems_ids.inverse.get(oid) + reqid = dark_book._ems2brokerd_ids.inverse.get(oid) + live_entry = dark_book._ems_entries.get(oid) # TODO: can't wait for this stuff to land in 3.10 # https://www.python.org/dev/peps/pep-0636/#going-to-the-cloud-mappings if action in ('cancel',): # check for live-broker order - if brid: - log.info("Submitting cancel for live order") - await client.submit_cancel(reqid=brid) + if live_entry: + + msg = BrokerdCancel( + oid=oid, + reqid=reqid or live_entry.reqid, + time_ns=time.time_ns(), + ) + + # send cancel to brokerd immediately! + log.info("Submitting cancel for live order") + + # NOTE: cancel response will be relayed back in messages + # from corresponding broker + # await client.submit_cancel(reqid=reqid) + await brokerd_order_stream.send(msg.dict()) - # check for EMS active exec else: + # might be a cancel for order that hasn't been acked yet + # by brokerd so register a cancel for then the order + # does show up later + dark_book._ems_entries[oid] = msg + + # check for EMS active exec try: + # remove from dark book clearing dark_book.orders[symbol].pop(oid, None) - # TODO: move these to `tractor.MsgStream` - await ctx.send_yield({ - 'resp': 'dark_cancelled', - 'oid': oid - }) + # tell client side that we've cancelled the + # dark-trigger order + await client_order_stream.send( + Status( + resp='dark_cancelled', + oid=oid, + time_ns=time.time_ns(), + ).dict() + ) + except KeyError: log.exception(f'No dark order for {symbol}?') + # TODO: 3.10 struct-pattern matching and unpacking here elif action in ('alert', 'buy', 'sell',): - sym = cmd['symbol'] - trigger_price = cmd['price'] - size = cmd['size'] - brokers = cmd['brokers'] - exec_mode = cmd['exec_mode'] + msg = Order(**cmd) - broker = brokers[0] + # sym = cmd['symbol'] + # trigger_price = cmd['price'] + # size = cmd['size'] + # brokers = cmd['brokers'] + # exec_mode = cmd['exec_mode'] + + sym = msg.symbol + trigger_price = msg.price + size = msg.size + exec_mode = msg.exec_mode + broker = msg.brokers[0] if exec_mode == 'live' and action in ('buy', 'sell',): - # register broker id for ems id - order_id = await client.submit_limit( + if live_entry is not None: + # sanity check on emsd id + assert live_entry.oid == oid + + # if we already had a broker order id then + # this is likely an order update commmand. + log.info(f"Modifying order: {live_entry.reqid}") + + # TODO: port to BrokerdOrder message sending + # register broker id for ems id + msg = BrokerdOrder( oid=oid, # no ib support for oids... + time_ns=time.time_ns(), # if this is None, creates a new order # otherwise will modify any existing one - brid=brid, + reqid=reqid, symbol=sym, action=action, @@ -538,25 +714,38 @@ async def process_order_cmds( size=size, ) - if brid: - assert dark_book._broker2ems_ids[brid] == oid - - # if we already had a broker order id then - # this is likely an order update commmand. - log.info(f"Modifying order: {brid}") - - else: - dark_book._broker2ems_ids[order_id] = oid - + # send request to backend # XXX: the trades data broker response loop - # (``process_broker_trades()`` above) will - # handle sending the ems side acks back to - # the cmd sender from here + # (``translate_and_relay_brokerd_events()`` above) will + # handle relaying the ems side responses back to + # the client/cmd sender from this request + print(f'sending live order {msg}') + await brokerd_order_stream.send(msg.dict()) + + # order_id = await client.submit_limit( + + # oid=oid, # no ib support for oids... + + # # if this is None, creates a new order + # # otherwise will modify any existing one + # brid=brid, + + # symbol=sym, + # action=action, + # price=trigger_price, + # size=size, + # ) + + # an immediate response should be brokerd ack with order + # id but we register our request as part of the flow + dark_book._ems_entries[oid] = msg elif exec_mode in ('dark', 'paper') or ( action in ('alert') ): - # submit order to local EMS + # submit order to local EMS book and scan loop, + # effectively a local clearing engine, which + # scans for conditions and triggers matching executions # Auto-gen scanner predicate: # we automatically figure out what the alert check @@ -590,8 +779,10 @@ async def process_order_cmds( abs_diff_away = 0 # submit execution/order to EMS scan loop - # FYI: this may result in an override of an existing + + # NOTE: this may result in an override of an existing # dark book entry if the order id already exists + dark_book.orders.setdefault( sym, {} )[oid] = ( @@ -601,14 +792,27 @@ async def process_order_cmds( percent_away, abs_diff_away ) + # TODO: if the predicate resolves immediately send the # execution to the broker asap? Or no? # ack-response that order is live in EMS - await ctx.send_yield({ - 'resp': 'dark_submitted', - 'oid': oid - }) + # await ctx.send_yield( + # {'resp': 'dark_submitted', + # 'oid': oid} + # ) + if action == 'alert': + resp = 'alert_submitted' + else: + resp = 'dark_submitted' + + await client_order_stream.send( + Status( + resp=resp, + oid=oid, + time_ns=time.time_ns(), + ).dict() + ) @tractor.context @@ -618,7 +822,8 @@ async def _emsd_main( # client_actor_name: str, broker: str, symbol: str, - _mode: str = 'dark', # ('paper', 'dark', 'live') + _exec_mode: str = 'dark', # ('paper', 'dark', 'live') + loglevel: str = 'info', ) -> None: """EMS (sub)actor entrypoint providing the @@ -635,15 +840,23 @@ async def _emsd_main( received in a stream from that client actor and then responses are streamed back up to the original calling task in the same client. - The task tree is: + The primary ``emsd`` task tree is: + - ``_emsd_main()``: - accepts order cmds, registers execs with exec loop - - - ``exec_loop()``: - run (dark) conditions on inputs and trigger broker submissions - - - ``process_broker_trades()``: - accept normalized trades responses, process and relay to ems client(s) + sets up brokerd feed, order feed with ems client, trades dialogue with + brokderd trading api. + | + - ``start_clearing()``: + run (dark) conditions on inputs and trigger broker submissions + | + - ``translate_and_relay_brokerd_events()``: + accept normalized trades responses from brokerd, process and + relay to ems client(s); this is a effectively a "trade event + reponse" proxy-broker. + | + - ``process_client_order_cmds()``: + accepts order cmds from requesting piker clients, registers + execs with exec loop """ # from ._client import send_order_cmds @@ -651,49 +864,140 @@ async def _emsd_main( global _router dark_book = _router.get_dark_book(broker) + ems_ctx = ctx + + cached_feed = _router.feeds.get((broker, symbol)) + if cached_feed: + # TODO: use cached feeds per calling-actor + log.warning(f'Opening duplicate feed for {(broker, symbol)}') + # spawn one task per broker feed - async with trio.open_nursery() as n: + async with ( + trio.open_nursery() as n, # TODO: eventually support N-brokers - async with data.open_feed( + data.open_feed( broker, [symbol], - loglevel='info', - ) as feed: + loglevel=loglevel, + ) as feed, + ): + if not cached_feed: + _router.feeds[(broker, symbol)] = feed - # get a portal back to the client - # async with tractor.wait_for_actor(client_actor_name) as portal: + # XXX: this should be initial price quote from target provider + first_quote = await feed.receive() - await ctx.started() + # open a stream with the brokerd backend for order + # flow dialogue - # establish 2-way stream with requesting order-client - async with ctx.open_stream() as order_stream: + book = _router.get_dark_book(broker) + book.lasts[(broker, symbol)] = first_quote[symbol]['last'] + + trades_endpoint = getattr(feed.mod, 'trades_dialogue', None) + portal = feed._brokerd_portal + + if trades_endpoint is None or _exec_mode == 'paper': + + # load the paper trading engine + _exec_mode = 'paper' + log.warning(f'Entering paper trading mode for {broker}') + + # load the paper trading engine inside the brokerd + # actor to simulate the real load it'll likely be under + # when also pulling data from feeds + open_trades_endpoint = paper.open_paperboi( + broker=broker, + symbol=symbol, + loglevel=loglevel, + ) + + # for paper mode we need to mock this trades response feed + # so we pass a duck-typed feed-looking mem chan which is fed + # fill and submission events from the exec loop + # feed._trade_stream = client.trade_stream + + # init the trades stream + # client._to_trade_stream.send_nowait({'local_trades': 'start'}) + + else: + # open live brokerd trades endpoint + open_trades_endpoint = portal.open_context( + trades_endpoint, + loglevel=loglevel, + ) + + async with ( + open_trades_endpoint as (brokerd_ctx, positions), + brokerd_ctx.open_stream() as brokerd_trades_stream, + ): + + # if trades_endpoint is not None and _exec_mode != 'paper': + + # # TODO: open a bidir stream here? + # # we have an order API for this broker + # client = client_factory(feed._brokerd_portal) + + # else: + + # return control to parent task + # task_status.started((first_quote, feed, client)) + + # stream = feed.stream + + # start the real-time clearing condition scan loop and + # paper engine simulator. + + # n.start_soon( + # start_clearing, + # brokerd_trades_stream, + # feed.stream, # quote stream + # # client, + # broker, + # symbol, + # _exec_mode, + # book, + # ) + + # signal to client that we're started + # TODO: we could send back **all** brokerd positions here? + await ems_ctx.started(positions) + + # establish 2-way stream with requesting order-client and + # begin handling inbound order requests and updates + async with ems_ctx.open_stream() as ems_client_order_stream: + + # trigger scan and exec loop + n.start_soon( + clear_dark_triggers, + + brokerd_trades_stream, + ems_client_order_stream, + feed.stream, - # start the condition scan loop - quote, feed, client = await n.start( - exec_loop, - ctx, - feed, broker, symbol, - _mode, + book + # ctx, + # client, ) # begin processing order events from the target brokerd backend - await n.start( - process_broker_trades, - ctx, - feed, + n.start_soon( + + translate_and_relay_brokerd_events, + broker, + ems_client_order_stream, + brokerd_trades_stream, dark_book, ) # start inbound (from attached client) order request processing - await process_order_cmds( - ctx, - order_stream, + await process_client_order_cmds( + ems_client_order_stream, + brokerd_trades_stream, symbol, feed, - client, dark_book, ) diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index a49858f4..c3c4016a 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -127,9 +127,9 @@ class OrderMode: """ line = self.lines.commit_line(uuid) - req_msg = self.book._sent_orders.get(uuid) - if req_msg: - req_msg.ack_time_ns = time.time_ns() + # req_msg = self.book._sent_orders.get(uuid) + # if req_msg: + # req_msg.ack_time_ns = time.time_ns() return line @@ -317,10 +317,14 @@ async def start_order_mode( # spawn EMS actor-service async with ( - open_ems(brokername, symbol) as (book, trades_stream), + open_ems(brokername, symbol) as (book, trades_stream, positions), open_order_mode(symbol, chart, book) as order_mode ): + # update any exising positions + for sym, msg in positions.items(): + order_mode.on_position_update(msg) + def get_index(time: float): # XXX: not sure why the time is so off here @@ -343,16 +347,15 @@ async def start_order_mode( fmsg = pformat(msg) log.info(f'Received order msg:\n{fmsg}') - resp = msg['resp'] - - if resp in ( + name = msg['name'] + if name in ( 'position', ): # show line label once order is live order_mode.on_position_update(msg) continue - # delete the line from view + resp = msg['resp'] oid = msg['oid'] # response to 'action' request (buy/sell) @@ -375,21 +378,21 @@ async def start_order_mode( order_mode.on_cancel(oid) elif resp in ( - 'dark_executed' + 'dark_triggered' ): log.info(f'Dark order triggered for {fmsg}') - # for alerts add a triangle and remove the - # level line - if msg['cmd']['action'] == 'alert': - - # should only be one "fill" for an alert - order_mode.on_fill( - oid, - price=msg['trigger_price'], - arrow_index=get_index(time.time()) - ) - await order_mode.on_exec(oid, msg) + elif resp in ( + 'alert_triggered' + ): + # should only be one "fill" for an alert + # add a triangle and remove the level line + order_mode.on_fill( + oid, + price=msg['trigger_price'], + arrow_index=get_index(time.time()) + ) + await order_mode.on_exec(oid, msg) # response to completed 'action' request for buy/sell elif resp in ( @@ -400,12 +403,15 @@ async def start_order_mode( # each clearing tick is responded individually elif resp in ('broker_filled',): - action = msg['action'] + action = book._sent_orders[oid].action + details = msg['brokerd_msg'] # TODO: some kinda progress system order_mode.on_fill( oid, - price=msg['price'], - arrow_index=get_index(msg['broker_time']), + price=details['price'], pointing='up' if action == 'buy' else 'down', + + # TODO: put the actual exchange timestamp + arrow_index=get_index(details['broker_time']), ) From db92683ede4dd6cac310b2b416c4fd73ea37d1c8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jun 2021 14:19:55 -0400 Subject: [PATCH 11/18] Port ib orders to new msgs and bidir streaming api --- piker/brokers/ib.py | 270 +++++++++++++++++++++++++++++++------------- 1 file changed, 193 insertions(+), 77 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 915278c3..321cc4cd 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -25,7 +25,7 @@ from contextlib import asynccontextmanager from dataclasses import asdict from datetime import datetime from functools import partial -from typing import List, Dict, Any, Tuple, Optional, AsyncIterator, Callable +from typing import List, Dict, Any, Tuple, Optional, AsyncIterator import asyncio from pprint import pformat import inspect @@ -39,7 +39,8 @@ import tractor from async_generator import aclosing from ib_insync.wrapper import RequestError from ib_insync.contract import Contract, ContractDetails, Option -from ib_insync.order import Order +from ib_insync.order import Order, Trade, OrderStatus +from ib_insync.objects import Fill, Execution from ib_insync.ticker import Ticker from ib_insync.objects import Position import ib_insync as ibis @@ -53,6 +54,12 @@ from .._daemon import maybe_spawn_brokerd from ..data._source import from_df from ..data._sharedmem import ShmArray from ._util import SymbolNotFound, NoData +from ..clearing._messages import ( + BrokerdOrder, BrokerdOrderAck, BrokerdStatus, + BrokerdPosition, BrokerdCancel, + BrokerdFill, + # BrokerdError, +) log = get_logger(__name__) @@ -472,7 +479,7 @@ class Client: # XXX: by default 0 tells ``ib_insync`` methods that there is no # existing order so ask the client to create a new one (which it # seems to do by allocating an int counter - collision prone..) - brid: int = None, + reqid: int = None, ) -> int: """Place an order and return integer request id provided by client. @@ -488,7 +495,7 @@ class Client: trade = self.ib.placeOrder( contract, Order( - orderId=brid or 0, # stupid api devs.. + orderId=reqid or 0, # stupid api devs.. action=action.upper(), # BUY/SELL orderType='LMT', lmtPrice=price, @@ -582,6 +589,7 @@ class Client: self, to_trio: trio.abc.SendChannel, ) -> None: + # connect error msgs def push_err( reqId: int, @@ -589,13 +597,16 @@ class Client: errorString: str, contract: Contract, ) -> None: + log.error(errorString) + try: to_trio.send_nowait(( 'error', + # error "object" {'reqid': reqId, - 'message': errorString, + 'reason': errorString, 'contract': contract} )) except trio.BrokenResourceError: @@ -635,6 +646,8 @@ async def _aio_get_client( """Return an ``ib_insync.IB`` instance wrapped in our client API. Client instances are cached for later use. + + TODO: consider doing this with a ctx mngr eventually? """ # first check cache for existing client @@ -848,7 +861,7 @@ async def get_bars( end_dt: str = "", ) -> (dict, np.ndarray): - _err = None + _err: Optional[Exception] = None fails = 0 for _ in range(2): @@ -885,12 +898,12 @@ async def get_bars( raise NoData(f'Symbol: {sym}') break - else: log.exception( "Data query rate reached: Press `ctrl-alt-f`" "in TWS" ) + print(_err) # TODO: should probably create some alert on screen # and then somehow get that to trigger an event here @@ -937,7 +950,7 @@ async def backfill_bars( if fails is None or fails > 1: break - if out is (None, None): + if out == (None, None): # could be trying to retreive bars over weekend # TODO: add logic here to handle tradable hours and only grab # valid bars in the range @@ -1188,114 +1201,217 @@ def pack_position(pos: Position) -> Dict[str, Any]: else: symbol = con.symbol - return { - 'broker': 'ib', - 'account': pos.account, - 'symbol': symbol, - 'currency': con.currency, - 'size': float(pos.position), - 'avg_price': float(pos.avgCost) / float(con.multiplier or 1.0), - } + return BrokerdPosition( + broker='ib', + account=pos.account, + symbol=symbol, + currency=con.currency, + size=float(pos.position), + avg_price=float(pos.avgCost) / float(con.multiplier or 1.0), + ) -@tractor.msg.pub( - send_on_connect={'local_trades': 'start'} -) -async def stream_trades( +async def handle_order_requests( + ems_order_stream: tractor.MsgStream, + +) -> None: + + # request_msg: dict + async for request_msg in ems_order_stream: + log.info(f'Received order request {request_msg}') + + action = request_msg['action'] + + if action in {'buy', 'sell'}: + # validate + order = BrokerdOrder(**request_msg) + + # call our client api to submit the order + reqid = await _trio_run_client_method( + + method='submit_limit', + oid=order.oid, + symbol=order.symbol, + price=order.price, + action=order.action, + size=order.size, + + # XXX: by default 0 tells ``ib_insync`` methods that + # there is no existing order so ask the client to create + # a new one (which it seems to do by allocating an int + # counter - collision prone..) + reqid=order.reqid, + ) + + # deliver ack that order has been submitted to broker routing + await ems_order_stream.send( + BrokerdOrderAck( + # ems order request id + oid=order.oid, + # broker specific request id + reqid=reqid, + time_ns=time.time_ns(), + ).dict() + ) + + elif action == 'cancel': + msg = BrokerdCancel(**request_msg) + + await _trio_run_client_method( + method='submit_cancel', + reqid=msg.reqid + ) + + else: + log.error(f'Unknown order command: {request_msg}') + + +@tractor.context +async def trades_dialogue( + + ctx: tractor.Context, loglevel: str = None, - get_topics: Callable = None, ) -> AsyncIterator[Dict[str, Any]]: # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - stream = await _trio_run_client_method( + ib_trade_events_stream = await _trio_run_client_method( method='recv_trade_updates', ) # deliver positions to subscriber before anything else positions = await _trio_run_client_method(method='positions') + + all_positions = {} + for pos in positions: - yield {'local_trades': ('position', pack_position(pos))} + msg = pack_position(pos) + all_positions[msg.symbol] = msg.dict() + + await ctx.started(all_positions) action_map = {'BOT': 'buy', 'SLD': 'sell'} - async for event_name, item in stream: + async with ( + ctx.open_stream() as ems_stream, + trio.open_nursery() as n, + ): + # start order request handler **before** local trades event loop + n.start_soon(handle_order_requests, ems_stream) - # XXX: begin normalization of nonsense ib_insync internal - # object-state tracking representations... + async for event_name, item in ib_trade_events_stream: - if event_name == 'status': + # XXX: begin normalization of nonsense ib_insync internal + # object-state tracking representations... - # unwrap needed data from ib_insync internal objects - trade = item - status = trade.orderStatus + if event_name == 'status': - # skip duplicate filled updates - we get the deats - # from the execution details event - msg = { - 'reqid': trade.order.orderId, - 'status': status.status, - 'filled': status.filled, - 'reason': status.whyHeld, + # unwrap needed data from ib_insync internal types + trade: Trade = item + status: OrderStatus = trade.orderStatus - # this seems to not be necessarily up to date in the - # execDetails event.. so we have to send it here I guess? - 'remaining': status.remaining, - } + # skip duplicate filled updates - we get the deats + # from the execution details event + msg = BrokerdStatus( - elif event_name == 'fill': + reqid=trade.order.orderId, + time_ns=time.time_ns(), # cuz why not + status=status.status.lower(), # force lower case - trade, fill = item - execu = fill.execution + filled=status.filled, + reason=status.whyHeld, + # this seems to not be necessarily up to date in the + # execDetails event.. so we have to send it here I guess? + remaining=status.remaining, - msg = { - 'reqid': execu.orderId, - 'execid': execu.execId, + broker_details={'name': 'ib'}, + ) + + elif event_name == 'fill': + + # for wtv reason this is a separate event type + # from IB, not sure why it's needed other then for extra + # complexity and over-engineering :eyeroll:. + # we may just end up dropping these events (or + # translating them to ``Status`` msgs) if we can + # show the equivalent status events are no more latent. + + # unpack ib_insync types + # pep-0526 style: + # https://www.python.org/dev/peps/pep-0526/#global-and-local-variable-annotations + trade: Trade + fill: Fill + trade, fill = item + execu: Execution = fill.execution + + # TODO: normalize out commissions details? + details = { + 'contract': asdict(fill.contract), + 'execution': asdict(fill.execution), + 'commissions': asdict(fill.commissionReport), + } # supposedly IB server fill time - 'broker_time': execu.time, # converted to float by us - # ns from main TCP handler by us inside ``ib_insync`` override - 'time': fill.time, - 'time_ns': time.time_ns(), # cuz why not - 'action': action_map[execu.side], - 'size': execu.shares, - 'price': execu.price, - } + details['broker_time'] = execu.time + details['name'] = 'ib' - elif event_name == 'error': - msg = item + msg = BrokerdFill( + # should match the value returned from `.submit_limit()` + reqid=execu.orderId, + time_ns=time.time_ns(), # cuz why not - # f$#$% gawd dammit insync.. - con = msg['contract'] - if isinstance(con, Contract): - msg['contract'] = asdict(con) + action=action_map[execu.side], + size=execu.shares, + price=execu.price, - if msg['reqid'] == -1: - log.error(pformat(msg)) + broker_details=details, + # XXX: required by order mode currently + broker_time=details['execution']['time'], - # don't forward, it's pointless.. - continue + ) - elif event_name == 'position': - msg = pack_position(item) + elif event_name == 'error': - if msg.get('reqid', 0) < -1: - # it's a trade event generated by TWS usage. - log.warning(f"TWS triggered trade:\n{pformat(msg)}") + err: dict = item - msg['reqid'] = 'tws-' + str(-1 * msg['reqid']) + # f$#$% gawd dammit insync.. + con = err['contract'] + if isinstance(con, Contract): + err['contract'] = asdict(con) - # mark msg as from "external system" - # TODO: probably something better then this.. - msg['external'] = True + if err['reqid'] == -1: + log.error(f'TWS external order error:\n{pformat(err)}') - yield {'remote_trades': (event_name, msg)} - continue + # don't forward for now, it's unecessary.. but if we wanted to, + # msg = BrokerdError(**err) + continue - yield {'local_trades': (event_name, msg)} + elif event_name == 'position': + msg = pack_position(item) + # msg = BrokerdPosition(**item) + + # if msg.get('reqid', 0) < -1: + if getattr(msg, 'reqid', 0) < -1: + + # it's a trade event generated by TWS usage. + log.warning(f"TWS triggered trade:\n{pformat(msg)}") + + msg['reqid'] = 'tws-' + str(-1 * msg['reqid']) + + # mark msg as from "external system" + # TODO: probably something better then this.. and start + # considering multiplayer/group trades tracking + msg['external'] = True + continue + + # XXX: we always serialize to a dict for msgpack + # translations, ideally we can move to an msgspec (or other) + # encoder # that can be enabled in ``tractor`` ahead of + # time so we can pass through the message types directly. + await ems_stream.send(msg.dict()) @tractor.context From 47e7baa0c90d391b526ba6bf97f78ec47309e5ce Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jun 2021 15:56:34 -0400 Subject: [PATCH 12/18] Ensure paperboi is shield killed on teardown --- piker/clearing/_paper_engine.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index e669fd42..788e7674 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -460,4 +460,10 @@ async def open_paperboi( loglevel=loglevel, ) as (ctx, first): - yield ctx, first + try: + yield ctx, first + + finally: + # be sure to tear down the paper service on exit + with trio.CancelScope(shield=True): + await portal.cancel_actor() From 4cae470f3a1ea6727e814f74069dbb511437eafc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jun 2021 15:57:01 -0400 Subject: [PATCH 13/18] Pass "arbiter" socket correctly --- piker/cli/__init__.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/piker/cli/__init__.py b/piker/cli/__init__.py index 0cc38874..e5e9a2d1 100644 --- a/piker/cli/__init__.py +++ b/piker/cli/__init__.py @@ -9,6 +9,7 @@ import tractor from ..log import get_console_log, get_logger, colorize_json from ..brokers import get_brokermod, config +from .._daemon import _tractor_kwargs log = get_logger('cli') @@ -101,8 +102,9 @@ def cli(ctx, brokers, loglevel, tl, configdir): def services(config, tl, names): async def list_services(): + async with tractor.get_arbiter( - *tractor.current_actor()._arb_addr + *_tractor_kwargs['arbiter_addr'] ) as portal: registry = await portal.run('self', 'get_registry') json_d = {} @@ -118,6 +120,7 @@ def services(config, tl, names): list_services, name='service_query', loglevel=config['loglevel'] if tl else None, + arbiter_addr=_tractor_kwargs['arbiter_addr'], ) From 8e8a005128d663c065e8ebe73b3b951ab3ce352b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Wed, 9 Jun 2021 12:22:02 -0400 Subject: [PATCH 14/18] Fix attr accesses on msg type --- piker/brokers/ib.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index 321cc4cd..e43b5a38 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1399,7 +1399,7 @@ async def trades_dialogue( # it's a trade event generated by TWS usage. log.warning(f"TWS triggered trade:\n{pformat(msg)}") - msg['reqid'] = 'tws-' + str(-1 * msg['reqid']) + msg.reqid = 'tws-' + str(-1 * msg.reqid) # mark msg as from "external system" # TODO: probably something better then this.. and start From a1f605bd52cef7908110d13aefb4972ebd343d5b Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 8 Jun 2021 12:50:52 -0400 Subject: [PATCH 15/18] Clear out old commented code --- piker/clearing/_ems.py | 221 +++-------------------------------------- 1 file changed, 13 insertions(+), 208 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index cd0795b3..bfaaf82a 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -26,7 +26,6 @@ from typing import AsyncIterator, Callable from bidict import bidict from pydantic import BaseModel import trio -from trio_typing import TaskStatus import tractor from .. import data @@ -224,44 +223,6 @@ async def clear_dark_triggers( resp = 'dark_triggered' - # an internal brokerd-broker specific - # order-request id is expected to be generated - - # reqid = await client.submit_limit( - - # oid=oid, - - # # this is a brand new order request for the - # # underlying broker so we set a "broker - # # request id" (brid) to "nothing" so that the - # # broker client knows that we aren't trying - # # to modify an existing order-request. - # brid=None, - - # symbol=sym, - # action=cmd['action'], - # price=submit_price, - # size=cmd['size'], - # ) - - # # register broker request id to ems id - - # else: - # # alerts have no broker request id - # reqid = '' - - # resp = { - # 'resp': 'dark_executed', - # 'cmd': cmd, # original request message - - # 'time_ns': time.time_ns(), - # 'trigger_price': price, - - # 'broker_reqid': reqid, - # 'broker': broker, - # 'oid': oid, # piker order id - - # } msg = Status( oid=oid, # piker order id resp=resp, @@ -270,7 +231,6 @@ async def clear_dark_triggers( symbol=symbol, trigger_price=price, - # broker_reqid=reqid, broker_details={'name': broker}, cmd=cmd, # original request message @@ -281,7 +241,6 @@ async def clear_dark_triggers( log.info(f'removing pred for {oid}') execs.pop(oid) - # await ctx.send_yield(resp) await ems_client_order_stream.send(msg) else: # condition scan loop complete @@ -292,51 +251,6 @@ async def clear_dark_triggers( # print(f'execs scan took: {time.time() - start}') -# async def start_clearing( - -# # ctx: tractor.Context, -# brokerd_order_stream: tractor.MsgStream, -# quote_stream: tractor.MsgStream, - -# # client: 'Client', - -# # feed: 'Feed', # noqa -# broker: str, -# symbol: str, -# _exec_mode: str, - -# book: _DarkBook, - -# # task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, - -# ) -> AsyncIterator[dict]: -# """Main scan loop for order execution conditions and submission -# to brokers. - -# """ -# async with trio.open_nursery() as n: - -# # trigger scan and exec loop -# n.start_soon( -# trigger_executions, - -# brokerd_order_stream, -# quote_stream, - -# broker, -# symbol, -# book -# # ctx, -# # client, -# ) - -# # # paper engine simulator task -# # if _exec_mode == 'paper': -# # # TODO: make this an actual broadcast channels as in: -# # # https://github.com/python-trio/trio/issues/987 -# # n.start_soon(simulate_fills, quote_stream, client) - - # TODO: lots of cases still to handle # XXX: right now this is very very ad-hoc to IB # - short-sale but securities haven't been located, in this case we @@ -349,15 +263,11 @@ async def clear_dark_triggers( async def translate_and_relay_brokerd_events( - # ctx: tractor.Context, broker: str, ems_client_order_stream: tractor.MsgStream, brokerd_trades_stream: tractor.MsgStream, book: _DarkBook, - # feed: 'Feed', # noqa - task_status: TaskStatus[dict] = trio.TASK_STATUS_IGNORED, - ) -> AsyncIterator[dict]: """Trades update loop - receive updates from broker, convert to EMS responses, transmit to ordering client(s). @@ -377,27 +287,13 @@ async def translate_and_relay_brokerd_events( {'presubmitted', 'submitted', 'cancelled', 'inactive'} """ - # broker = feed.mod.name - - # TODO: make this a context - # in the paper engine case this is just a mem receive channel - # async with feed.receive_trades_data() as brokerd_trades_stream: - - # first = await brokerd_trades_stream.__anext__() - - # startup msg expected as first from broker backend - # assert first['local_trades'] == 'start' - # task_status.started() - async for brokerd_msg in brokerd_trades_stream: - # name, msg = event['local_trades'] name = brokerd_msg['name'] log.info(f'Received broker trade event:\n{pformat(brokerd_msg)}') if name == 'position': - # msg['resp'] = 'position' # relay through position msgs immediately await ems_client_order_stream.send( @@ -476,34 +372,6 @@ async def translate_and_relay_brokerd_events( # a live flow now exists oid = entry.oid - # make response packet to EMS client(s) - # reqid = book._ems_entries.get(oid) - - # # msg is for unknown emsd order id - # if oid is None: - # oid = msg['oid'] - - # # XXX: paper clearing special cases - # # paper engine race case: ``Client.submit_limit()`` hasn't - # # returned yet and provided an output reqid to register - # # locally, so we need to retreive the oid that was already - # # packed at submission since we already know it ahead of - # # time - # paper = msg.get('paper_info') - # if paper: - # oid = paper['oid'] - - # else: - # msg.get('external') - # if not msg: - # log.error(f"Unknown trade event {event}") - - # continue - - # resp = { - # 'resp': None, # placeholder - # 'oid': oid - # } resp = None broker_details = {} @@ -551,7 +419,6 @@ async def translate_and_relay_brokerd_events( # everyone doin camel case msg = BrokerdStatus(**brokerd_msg) - # status = msg['status'].lower() if msg.status == 'filled': @@ -563,12 +430,10 @@ async def translate_and_relay_brokerd_events( log.info(f'Execution for {oid} is complete!') - # just log it else: log.info(f'{broker} filled {msg}') - else: # one of {submitted, cancelled} resp = 'broker_' + msg.status @@ -604,13 +469,11 @@ async def translate_and_relay_brokerd_events( async def process_client_order_cmds( - # ctx: tractor.Context, client_order_stream: tractor.MsgStream, # noqa brokerd_order_stream: tractor.MsgStream, symbol: str, feed: 'Feed', # noqa - # client: 'Client', # noqa dark_book: _DarkBook, ) -> None: @@ -643,7 +506,6 @@ async def process_client_order_cmds( # NOTE: cancel response will be relayed back in messages # from corresponding broker - # await client.submit_cancel(reqid=reqid) await brokerd_order_stream.send(msg.dict()) else: @@ -675,12 +537,6 @@ async def process_client_order_cmds( msg = Order(**cmd) - # sym = cmd['symbol'] - # trigger_price = cmd['price'] - # size = cmd['size'] - # brokers = cmd['brokers'] - # exec_mode = cmd['exec_mode'] - sym = msg.symbol trigger_price = msg.price size = msg.size @@ -722,20 +578,6 @@ async def process_client_order_cmds( print(f'sending live order {msg}') await brokerd_order_stream.send(msg.dict()) - # order_id = await client.submit_limit( - - # oid=oid, # no ib support for oids... - - # # if this is None, creates a new order - # # otherwise will modify any existing one - # brid=brid, - - # symbol=sym, - # action=action, - # price=trigger_price, - # size=size, - # ) - # an immediate response should be brokerd ack with order # id but we register our request as part of the flow dark_book._ems_entries[oid] = msg @@ -793,14 +635,6 @@ async def process_client_order_cmds( abs_diff_away ) - # TODO: if the predicate resolves immediately send the - # execution to the broker asap? Or no? - - # ack-response that order is live in EMS - # await ctx.send_yield( - # {'resp': 'dark_submitted', - # 'oid': oid} - # ) if action == 'alert': resp = 'alert_submitted' else: @@ -846,8 +680,9 @@ async def _emsd_main( sets up brokerd feed, order feed with ems client, trades dialogue with brokderd trading api. | - - ``start_clearing()``: - run (dark) conditions on inputs and trigger broker submissions + - ``clear_dark_triggers()``: + run (dark order) conditions on inputs and trigger brokerd "live" + order submissions. | - ``translate_and_relay_brokerd_events()``: accept normalized trades responses from brokerd, process and @@ -899,6 +734,10 @@ async def _emsd_main( if trades_endpoint is None or _exec_mode == 'paper': + # for paper mode we need to mock this trades response feed + # so we load bidir stream to a new sub-actor running a + # paper-simulator clearing engine. + # load the paper trading engine _exec_mode = 'paper' log.warning(f'Entering paper trading mode for {broker}') @@ -912,14 +751,6 @@ async def _emsd_main( loglevel=loglevel, ) - # for paper mode we need to mock this trades response feed - # so we pass a duck-typed feed-looking mem chan which is fed - # fill and submission events from the exec loop - # feed._trade_stream = client.trade_stream - - # init the trades stream - # client._to_trade_stream.send_nowait({'local_trades': 'start'}) - else: # open live brokerd trades endpoint open_trades_endpoint = portal.open_context( @@ -931,36 +762,9 @@ async def _emsd_main( open_trades_endpoint as (brokerd_ctx, positions), brokerd_ctx.open_stream() as brokerd_trades_stream, ): - - # if trades_endpoint is not None and _exec_mode != 'paper': - - # # TODO: open a bidir stream here? - # # we have an order API for this broker - # client = client_factory(feed._brokerd_portal) - - # else: - - # return control to parent task - # task_status.started((first_quote, feed, client)) - - # stream = feed.stream - - # start the real-time clearing condition scan loop and - # paper engine simulator. - - # n.start_soon( - # start_clearing, - # brokerd_trades_stream, - # feed.stream, # quote stream - # # client, - # broker, - # symbol, - # _exec_mode, - # book, - # ) - # signal to client that we're started - # TODO: we could send back **all** brokerd positions here? + # TODO: we could eventually send back **all** brokerd + # positions here? await ems_ctx.started(positions) # establish 2-way stream with requesting order-client and @@ -978,14 +782,15 @@ async def _emsd_main( broker, symbol, book - # ctx, - # client, ) # begin processing order events from the target brokerd backend + # by receiving order submission response messages, + # normalizing them to EMS messages and relaying back to + # the piker order client. n.start_soon( - translate_and_relay_brokerd_events, + broker, ems_client_order_stream, brokerd_trades_stream, From b06cb5bb5aa3f939c308ba180fafdcabb12f4a00 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Jun 2021 08:24:10 -0400 Subject: [PATCH 16/18] Comments clean and improvments --- piker/_daemon.py | 1 - piker/clearing/_client.py | 3 +- piker/clearing/_ems.py | 92 +++++++++++++++++---------------- piker/clearing/_paper_engine.py | 8 --- piker/ui/order_mode.py | 4 -- 5 files changed, 49 insertions(+), 59 deletions(-) diff --git a/piker/_daemon.py b/piker/_daemon.py index 5e102e4c..009adad5 100644 --- a/piker/_daemon.py +++ b/piker/_daemon.py @@ -248,7 +248,6 @@ async def maybe_spawn_daemon( if pikerd_portal is None: # we are root so spawn brokerd directly in our tree # the root nursery is accessed through process global state - # await spawn_brokerd(brokername, loglevel=loglevel) await spawn_func(**spawn_args) else: diff --git a/piker/clearing/_client.py b/piker/clearing/_client.py index 47c79636..fcceeaac 100644 --- a/piker/clearing/_client.py +++ b/piker/clearing/_client.py @@ -223,12 +223,11 @@ async def open_ems( # connect to emsd portal.open_context( + _emsd_main, broker=broker, symbol=symbol.key, - # TODO: ``first`` here should be the active orders/execs - # persistent on the ems so that loca UI's can be populated. ) as (ctx, positions), # open 2-way trade command stream diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index bfaaf82a..0fe63db2 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -124,15 +124,12 @@ _DEFAULT_SIZE: float = 1.0 async def clear_dark_triggers( - # ctx: tractor.Context, brokerd_orders_stream: tractor.MsgStream, ems_client_order_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa broker: str, symbol: str, - # client: 'Client', # noqa - # order_msg_stream: 'Client', # noqa book: _DarkBook, @@ -189,8 +186,7 @@ async def clear_dark_triggers( # message back to the requesting ems client resp = 'alert_triggered' - else: - # executable order submission + else: # executable order submission # submit_price = price + price*percent_away submit_price = price + abs_diff_away @@ -199,18 +195,17 @@ async def clear_dark_triggers( f'Dark order triggered for price {price}\n' f'Submitting order @ price {submit_price}') - # TODO: port to BrokerdOrder message sending msg = BrokerdOrder( action=cmd['action'], oid=oid, time_ns=time.time_ns(), - - # this is a brand new order request for the + # this **creates** new order request for the # underlying broker so we set a "broker - # request id" (brid) to "nothing" so that the - # broker client knows that we aren't trying - # to modify an existing order-request. + # request id" (``reqid`` kwarg) to ``None`` + # so that the broker client knows that we + # aren't trying to modify an existing + # order-request and instead create a new one. reqid=None, symbol=sym, @@ -218,13 +213,21 @@ async def clear_dark_triggers( size=cmd['size'], ) await brokerd_orders_stream.send(msg.dict()) - # mark this entry as having send an order request + + # mark this entry as having sent an order + # request. the entry will be replaced once the + # target broker replies back with + # a ``BrokerdOrderAck`` msg including the + # allocated unique ``BrokerdOrderAck.reqid`` key + # generated by the broker's own systems. book._ems_entries[oid] = msg + # our internal status value for client-side + # triggered "dark orders" resp = 'dark_triggered' msg = Status( - oid=oid, # piker order id + oid=oid, # ems order id resp=resp, time_ns=time.time_ns(), @@ -340,11 +343,11 @@ async def translate_and_relay_brokerd_events( # initial response to brokerd order request if name == 'ack': - # register the brokerd request id (that was likely - # generated internally) with our locall ems order id for - # reverse lookup later. a BrokerdOrderAck **must** be - # sent after an order request in order to establish this - # id mapping. + # register the brokerd request id (that was generated + # / created internally by the broker backend) with our + # local ems order id for reverse lookup later. + # a ``BrokerdOrderAck`` **must** be sent after an order + # request in order to establish this id mapping. book._ems2brokerd_ids[oid] = reqid # new order which has not yet be registered into the @@ -455,16 +458,17 @@ async def translate_and_relay_brokerd_events( else: raise ValueError(f'Brokerd message {brokerd_msg} is invalid') - # Create and relay EMS response status message - resp = Status( - oid=oid, - resp=resp, - time_ns=time.time_ns(), - broker_reqid=reqid, - brokerd_msg=broker_details, + # Create and relay response status message + # to requesting EMS client + await ems_client_order_stream.send( + Status( + oid=oid, + resp=resp, + time_ns=time.time_ns(), + broker_reqid=reqid, + brokerd_msg=broker_details, + ).dict() ) - # relay response to requesting EMS client - await ems_client_order_stream.send(resp.dict()) async def process_client_order_cmds( @@ -509,9 +513,9 @@ async def process_client_order_cmds( await brokerd_order_stream.send(msg.dict()) else: - # might be a cancel for order that hasn't been acked yet - # by brokerd so register a cancel for then the order - # does show up later + # this might be a cancel for an order that hasn't been + # acked yet by a brokerd, so register a cancel for when + # the order ack does show up later dark_book._ems_entries[oid] = msg # check for EMS active exec @@ -552,10 +556,9 @@ async def process_client_order_cmds( # if we already had a broker order id then # this is likely an order update commmand. - log.info(f"Modifying order: {live_entry.reqid}") + log.info( + f"Modifying live {broker} order: {live_entry.reqid}") - # TODO: port to BrokerdOrder message sending - # register broker id for ems id msg = BrokerdOrder( oid=oid, # no ib support for oids... time_ns=time.time_ns(), @@ -575,7 +578,7 @@ async def process_client_order_cmds( # (``translate_and_relay_brokerd_events()`` above) will # handle relaying the ems side responses back to # the client/cmd sender from this request - print(f'sending live order {msg}') + log.info(f'Sending live order to {broker}:\n{pformat(msg)}') await brokerd_order_stream.send(msg.dict()) # an immediate response should be brokerd ack with order @@ -653,14 +656,13 @@ async def process_client_order_cmds( async def _emsd_main( ctx: tractor.Context, - # client_actor_name: str, broker: str, symbol: str, _exec_mode: str = 'dark', # ('paper', 'dark', 'live') loglevel: str = 'info', ) -> None: - """EMS (sub)actor entrypoint providing the + '''EMS (sub)actor entrypoint providing the execution management (micro)service which conducts broker order control on behalf of clients. @@ -693,12 +695,13 @@ async def _emsd_main( accepts order cmds from requesting piker clients, registers execs with exec loop - """ - # from ._client import send_order_cmds - + ''' global _router dark_book = _router.get_dark_book(broker) + # TODO: would be nice if in tractor we can require either a ctx arg, + # or a named arg with ctx in it and a type annotation of + # tractor.Context instead of strictly requiring a ctx arg. ems_ctx = ctx cached_feed = _router.feeds.get((broker, symbol)) @@ -742,9 +745,9 @@ async def _emsd_main( _exec_mode = 'paper' log.warning(f'Entering paper trading mode for {broker}') - # load the paper trading engine inside the brokerd - # actor to simulate the real load it'll likely be under - # when also pulling data from feeds + # load the paper trading engine as a subactor of this emsd + # actor to simulate the real IPC load it'll have when also + # pulling data from feeds open_trades_endpoint = paper.open_paperboi( broker=broker, symbol=symbol, @@ -814,7 +817,7 @@ class _Router(BaseModel): ''' nursery: trio.Nursery - feeds: dict[str, tuple[trio.CancelScope, float]] = {} + feeds: dict[tuple[str, str], data.feed.Feed] = {} books: dict[str, _DarkBook] = {} class Config: @@ -842,8 +845,9 @@ async def _setup_persistent_emsd( global _router - # spawn one task per broker feed + # open a root "service nursery" for the ``emsd`` actor async with trio.open_nursery() as service_nursery: + _router = _Router(nursery=service_nursery) # TODO: send back the full set of persistent orders/execs persistent diff --git a/piker/clearing/_paper_engine.py b/piker/clearing/_paper_engine.py index 788e7674..b71be312 100644 --- a/piker/clearing/_paper_engine.py +++ b/piker/clearing/_paper_engine.py @@ -216,7 +216,6 @@ class PaperBoi: time_ns=time.time_ns(), status='filled', - # broker=self.broker, filled=size, remaining=0 if order_complete else remaining, @@ -224,7 +223,6 @@ class PaperBoi: size=size, price=price, - # broker=self.broker, broker_details={ 'paper_info': { 'oid': oid, @@ -321,12 +319,6 @@ async def simulate_fills( break -# class MockBrokerdMsgStream: - - -# async def MockContext(*args, **kwargs): - - async def handle_order_requests( client: PaperBoi, diff --git a/piker/ui/order_mode.py b/piker/ui/order_mode.py index c3c4016a..da775821 100644 --- a/piker/ui/order_mode.py +++ b/piker/ui/order_mode.py @@ -127,10 +127,6 @@ class OrderMode: """ line = self.lines.commit_line(uuid) - # req_msg = self.book._sent_orders.get(uuid) - # if req_msg: - # req_msg.ack_time_ns = time.time_ns() - return line def on_fill( From a9cbacd8aa571779956fafb2b5d9ad04fd01aee8 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Jun 2021 08:24:51 -0400 Subject: [PATCH 17/18] Move details assignements to static declaration --- piker/brokers/ib.py | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/piker/brokers/ib.py b/piker/brokers/ib.py index e43b5a38..2e0b92cb 100644 --- a/piker/brokers/ib.py +++ b/piker/brokers/ib.py @@ -1352,12 +1352,10 @@ async def trades_dialogue( 'contract': asdict(fill.contract), 'execution': asdict(fill.execution), 'commissions': asdict(fill.commissionReport), + 'broker_time': execu.time, # supposedly IB server fill time + 'name': 'ib', } - # supposedly IB server fill time - details['broker_time'] = execu.time - details['name'] = 'ib' - msg = BrokerdFill( # should match the value returned from `.submit_limit()` reqid=execu.orderId, @@ -1369,7 +1367,7 @@ async def trades_dialogue( broker_details=details, # XXX: required by order mode currently - broker_time=details['execution']['time'], + broker_time=details['broker_time'], ) @@ -1391,9 +1389,7 @@ async def trades_dialogue( elif event_name == 'position': msg = pack_position(item) - # msg = BrokerdPosition(**item) - # if msg.get('reqid', 0) < -1: if getattr(msg, 'reqid', 0) < -1: # it's a trade event generated by TWS usage. From a9cc3210d8ee03d6dc55242c1913c799ee860cff Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 10 Jun 2021 08:37:21 -0400 Subject: [PATCH 18/18] Grr pydantic being a weirdo --- piker/clearing/_ems.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/clearing/_ems.py b/piker/clearing/_ems.py index 0fe63db2..2378465d 100644 --- a/piker/clearing/_ems.py +++ b/piker/clearing/_ems.py @@ -21,7 +21,7 @@ In da suit parlances: "Execution management systems" from pprint import pformat import time from dataclasses import dataclass, field -from typing import AsyncIterator, Callable +from typing import AsyncIterator, Callable, Any from bidict import bidict from pydantic import BaseModel @@ -817,7 +817,7 @@ class _Router(BaseModel): ''' nursery: trio.Nursery - feeds: dict[tuple[str, str], data.feed.Feed] = {} + feeds: dict[tuple[str, str], Any] = {} books: dict[str, _DarkBook] = {} class Config: