From d0e3f5a51c43733c3a5d962da8065b10185585dc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 27 May 2021 17:14:04 -0400 Subject: [PATCH] Port binance and kraken to "reliable" ws API --- piker/brokers/binance.py | 131 ++++++++----------------------------- piker/brokers/kraken.py | 136 ++++++++------------------------------- 2 files changed, 54 insertions(+), 213 deletions(-) diff --git a/piker/brokers/binance.py b/piker/brokers/binance.py index c2de7ded..655800dc 100644 --- a/piker/brokers/binance.py +++ b/piker/brokers/binance.py @@ -18,36 +18,26 @@ Binance backend """ -from contextlib import asynccontextmanager, AsyncExitStack -from types import ModuleType +from contextlib import asynccontextmanager from typing import List, Dict, Any, Tuple, Union, Optional -import json import time -import trio_websocket +import trio from trio_typing import TaskStatus -from trio_websocket._impl import ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, -) - import arrow import asks from fuzzywuzzy import process as fuzzy import numpy as np -import trio import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel - +import wsproto from .api import open_cached_client from ._util import resproc, SymbolNotFound from ..log import get_logger, get_console_log from ..data import ShmArray +from ..data._web_bs import open_autorecon_ws log = get_logger(__name__) @@ -378,93 +368,6 @@ def make_sub(pairs: List[str], sub_name: str, uid: int) -> Dict[str, str]: } -class AutoReconWs: - """Make ``trio_websocketw` sockets stay up no matter the bs. - - """ - recon_errors = ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, - ) - - def __init__( - self, - url: str, - stack: AsyncExitStack, - serializer: ModuleType = json, - ): - self.url = url - self._stack = stack - self._ws: 'WebSocketConnection' = None # noqa - - async def _connect( - self, - tries: int = 10000, - ) -> None: - try: - await self._stack.aclose() - except (DisconnectionTimeout, RuntimeError): - await trio.sleep(1) - - last_err = None - for i in range(tries): - try: - self._ws = await self._stack.enter_async_context( - trio_websocket.open_websocket_url(self.url) - ) - log.info(f'Connection success: {self.url}') - return - except self.recon_errors as err: - last_err = err - log.error( - f'{self} connection bail with ' - f'{type(err)}...retry attempt {i}' - ) - await trio.sleep(1) - continue - else: - log.exception('ws connection fail...') - raise last_err - - async def send_msg( - self, - data: Any, - ) -> None: - while True: - try: - return await self._ws.send_message(json.dumps(data)) - except self.recon_errors: - await self._connect() - - async def recv_msg( - self, - ) -> Any: - while True: - try: - return json.loads(await self._ws.get_message()) - except self.recon_errors: - await self._connect() - - -@asynccontextmanager -async def open_autorecon_ws(url): - """Apparently we can QoS for all sorts of reasons..so catch em. - - """ - async with AsyncExitStack() as stack: - ws = AutoReconWs(url, stack) - - await ws._connect() - try: - yield ws - - finally: - await stack.aclose() - - async def backfill_bars( sym: str, shm: ShmArray, # type: ignore # noqa @@ -527,8 +430,8 @@ async def stream_quotes( }, } - async with open_autorecon_ws('wss://stream.binance.com/ws') as ws: - + @asynccontextmanager + async def subscribe(ws: wsproto.WSConnection): # setup subs # trade data (aka L1) @@ -546,6 +449,28 @@ async def stream_quotes( res = await ws.recv_msg() assert res['id'] == uid + yield + + subs = [] + for sym in symbols: + subs.append("{sym}@aggTrade") + subs.append("{sym}@bookTicker") + + # unsub from all pairs on teardown + await ws.send_msg({ + "method": "UNSUBSCRIBE", + "params": subs, + "id": uid, + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + async with open_autorecon_ws( + 'wss://stream.binance.com/ws', + fixture=subscribe, + ) as ws: + # pull a first quote and deliver msg_gen = stream_messages(ws) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 7e9afd5c..bb543936 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -18,37 +18,27 @@ Kraken backend. """ -from contextlib import asynccontextmanager, AsyncExitStack +from contextlib import asynccontextmanager from dataclasses import asdict, field -from types import ModuleType from typing import List, Dict, Any, Tuple, Optional -import json import time -import trio_websocket from trio_typing import TaskStatus -from trio_websocket._impl import ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, -) - +import trio import arrow import asks from fuzzywuzzy import process as fuzzy import numpy as np -import trio import tractor from pydantic.dataclasses import dataclass from pydantic import BaseModel - +import wsproto from .api import open_cached_client from ._util import resproc, SymbolNotFound, BrokerError from ..log import get_logger, get_console_log from ..data import ShmArray +from ..data._web_bs import open_autorecon_ws log = get_logger(__name__) @@ -399,100 +389,6 @@ def make_sub(pairs: List[str], data: Dict[str, Any]) -> Dict[str, str]: } -class AutoReconWs: - """Make ``trio_websocketw` sockets stay up no matter the bs. - - TODO: - apply any more insights from this: - https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds - - """ - recon_errors = ( - ConnectionClosed, - DisconnectionTimeout, - ConnectionRejected, - HandshakeError, - ConnectionTimeout, - ) - - def __init__( - self, - url: str, - stack: AsyncExitStack, - serializer: ModuleType = json, - ): - self.url = url - self._stack = stack - self._ws: 'WebSocketConnection' = None # noqa - - async def _connect( - self, - tries: int = 10000, - ) -> None: - while True: - try: - await self._stack.aclose() - except (DisconnectionTimeout, RuntimeError): - await trio.sleep(1) - else: - break - - last_err = None - for i in range(tries): - try: - self._ws = await self._stack.enter_async_context( - trio_websocket.open_websocket_url(self.url) - ) - log.info(f'Connection success: {self.url}') - return - except self.recon_errors as err: - last_err = err - log.error( - f'{self} connection bail with ' - f'{type(err)}...retry attempt {i}' - ) - await trio.sleep(1) - continue - else: - log.exception('ws connection fail...') - raise last_err - - async def send_msg( - self, - data: Any, - ) -> None: - while True: - try: - return await self._ws.send_message(json.dumps(data)) - except self.recon_errors: - await self._connect() - - async def recv_msg( - self, - ) -> Any: - while True: - try: - return json.loads(await self._ws.get_message()) - except self.recon_errors: - await self._connect() - - -@asynccontextmanager -async def open_autorecon_ws(url): - """Apparently we can QoS for all sorts of reasons..so catch em. - - """ - async with AsyncExitStack() as stack: - ws = AutoReconWs(url, stack) - - await ws._connect() - try: - yield ws - - finally: - await stack.aclose() - - async def backfill_bars( sym: str, @@ -561,8 +457,8 @@ async def stream_quotes( }, } - async with open_autorecon_ws('wss://ws.kraken.com/') as ws: - + @asynccontextmanager + async def subscribe(ws: wsproto.WSConnection): # XXX: setup subs # https://docs.kraken.com/websockets/#message-subscribe # specific logic for this in kraken's shitty sync client: @@ -584,8 +480,28 @@ async def stream_quotes( {'name': 'spread'} # 'depth': 10} ) + # pull a first quote and deliver await ws.send_msg(l1_sub) + yield + + # unsub from all pairs on teardown + await ws.send_msg({ + 'pair': ws_pairs.values(), + 'event': 'unsubscribe', + 'subscription': ['ohlc', 'spread'], + }) + + # XXX: do we need to ack the unsub? + # await ws.recv_msg() + + # see the tips on reonnection logic: + # https://support.kraken.com/hc/en-us/articles/360044504011-WebSocket-API-unexpected-disconnections-from-market-data-feeds + async with open_autorecon_ws( + 'wss://ws.kraken.com/', + fixture=subscribe, + ) as ws: + # pull a first quote and deliver msg_gen = stream_messages(ws)