diff --git a/piker/data/_web_bs.py b/piker/data/_web_bs.py new file mode 100644 index 00000000..543ac19f --- /dev/null +++ b/piker/data/_web_bs.py @@ -0,0 +1,142 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +ToOlS fOr CoPInG wITh "tHE wEB" protocols. + +""" +from contextlib import asynccontextmanager, AsyncExitStack +from types import ModuleType +from typing import Any, Callable +import json + +import trio +import trio_websocket +from trio_websocket._impl import ( + ConnectionClosed, + DisconnectionTimeout, + ConnectionRejected, + HandshakeError, + ConnectionTimeout, +) + +from ..log import get_logger + +log = get_logger(__name__) + + +class NoBsWs: + """Make ``trio_websocket`` sockets stay up no matter the bs. + + """ + recon_errors = ( + ConnectionClosed, + DisconnectionTimeout, + ConnectionRejected, + HandshakeError, + ConnectionTimeout, + ) + + def __init__( + self, + url: str, + stack: AsyncExitStack, + fixture: Callable, + serializer: ModuleType = json, + ): + self.url = url + self.fixture = fixture + self._stack = stack + self._ws: 'WebSocketConnection' = None # noqa + + async def _connect( + self, + tries: int = 10000, + ) -> None: + while True: + try: + await self._stack.aclose() + except (DisconnectionTimeout, RuntimeError): + await trio.sleep(1) + else: + break + + last_err = None + for i in range(tries): + try: + self._ws = await self._stack.enter_async_context( + trio_websocket.open_websocket_url(self.url) + ) + # rerun user code fixture + ret = await self._stack.enter_async_context( + self.fixture(self) + ) + assert ret is None + + log.info(f'Connection success: {self.url}') + return self._ws + + except self.recon_errors as err: + last_err = err + log.error( + f'{self} connection bail with ' + f'{type(err)}...retry attempt {i}' + ) + await trio.sleep(1) + continue + else: + log.exception('ws connection fail...') + raise last_err + + async def send_msg( + self, + data: Any, + ) -> None: + while True: + try: + return await self._ws.send_message(json.dumps(data)) + except self.recon_errors: + await self._connect() + + async def recv_msg( + self, + ) -> Any: + while True: + try: + return json.loads(await self._ws.get_message()) + except self.recon_errors: + await self._connect() + + +@asynccontextmanager +async def open_autorecon_ws( + url: str, + + # TODO: proper type annot smh + fixture: Callable, +): + """Apparently we can QoS for all sorts of reasons..so catch em. + + """ + async with AsyncExitStack() as stack: + ws = NoBsWs(url, stack, fixture=fixture) + await ws._connect() + + try: + yield ws + + finally: + await stack.aclose()