From c26f4d9877967f43631be1fa42903feca8585a9c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Thu, 22 Apr 2021 21:23:43 -0400 Subject: [PATCH] Add kraken fuzzy symbol search --- piker/brokers/kraken.py | 73 ++++++++++++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 11 deletions(-) diff --git a/piker/brokers/kraken.py b/piker/brokers/kraken.py index 8f83c960..c0e61a48 100644 --- a/piker/brokers/kraken.py +++ b/piker/brokers/kraken.py @@ -21,7 +21,7 @@ Kraken backend. from contextlib import asynccontextmanager, AsyncExitStack from dataclasses import asdict, field from types import ModuleType -from typing import List, Dict, Any, Tuple +from typing import List, Dict, Any, Tuple, Optional import json import time @@ -37,6 +37,7 @@ from trio_websocket._impl import ( import arrow import asks +from fuzzywuzzy import process as fuzzy import numpy as np import trio import tractor @@ -147,6 +148,17 @@ class Client: 'User-Agent': 'krakenex/2.1.0 (+https://github.com/veox/python3-krakenex)' }) + self._pairs = None + + @property + def pairs(self) -> Dict[str, Any]: + if self._pairs is None: + raise RuntimeError( + "Make sure to run `cache_symbols()` on startup!" + ) + # retreive and cache all symbols + + return self._pairs async def _public( self, @@ -162,14 +174,48 @@ class Client: async def symbol_info( self, - pair: str = 'all', + pair: Optional[str] = None, ): - resp = await self._public('AssetPairs', {'pair': pair}) + if pair is not None: + pairs = {'pair': pair} + else: + pairs = None # get all pairs + + resp = await self._public('AssetPairs', pairs) err = resp['error'] if err: raise BrokerError(err) - true_pair_key, data = next(iter(resp['result'].items())) - return data + + pairs = resp['result'] + + if pair is not None: + _, data = next(iter(pairs.items())) + return data + else: + return pairs + + async def cache_symbols( + self, + ) -> None: + self._pairs = await self.symbol_info() + + async def search_stocks( + self, + pattern: str, + limit: int = None, + ) -> Dict[str, Any]: + if self._pairs is not None: + data = self._pairs + else: + data = await self.symbol_info() + + matches = fuzzy.extractBests( + pattern, + data, + score_cutoff=50, + ) + # repack in dict form + return {item[0]['altname']: item[0] for item in matches} async def bars( self, @@ -232,7 +278,9 @@ class Client: @asynccontextmanager async def get_client() -> Client: - yield Client() + client = Client() + await client.cache_symbols() + yield client async def stream_messages(ws): @@ -249,7 +297,7 @@ async def stream_messages(ws): too_slow_count += 1 - if too_slow_count > 10: + if too_slow_count > 20: log.warning( "Heartbeat is too slow, resetting ws connection") @@ -368,10 +416,13 @@ class AutoReconWs: self, tries: int = 10000, ) -> None: - try: - await self._stack.aclose() - except (DisconnectionTimeout, RuntimeError): - await trio.sleep(1) + while True: + try: + await self._stack.aclose() + except (DisconnectionTimeout, RuntimeError): + await trio.sleep(1) + else: + break last_err = None for i in range(tries):