From d60f222bb7c4ba5f20d2c3f7b96cbed3bf512b0a Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Mon, 22 Aug 2022 19:18:31 -0300 Subject: [PATCH] Add get_balances, and get_assets rpc to deribit.api.Client Improve symbol_info search results Expect cancellation on cryptofeeds asyncio task Fix the no trades on instrument bug that we had on startup --- piker/brokers/deribit/api.py | 41 +++++++++++++++++++++++-- piker/brokers/deribit/feed.py | 58 ++++++++++++++++++----------------- 2 files changed, 68 insertions(+), 31 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index a0d67f71..1e783b15 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -148,6 +148,10 @@ class Client: self._access_token: Optional[str] = None self._refresh_token: Optional[str] = None + @property + def currencies(self): + return ['btc', 'eth', 'sol', 'usd'] + def _next_json_body(self, method: str, params: Dict): """get the typical json rpc 2.0 msg body and increment the req id """ @@ -263,6 +267,37 @@ class Client: else: await trio.sleep(renew_time / 2) + async def get_balances(self, kind: str = 'option') -> dict[str, float]: + """Return the set of positions for this account + by symbol. + """ + balances = {} + + for currency in self.currencies: + resp = await self.json_rpc( + 'private/get_positions', params={ + 'currency': currency.upper(), + 'kind': kind}) + + balances[currency] = resp.result + + return balances + + async def get_assets(self) -> dict[str, float]: + """Return the set of asset balances for this account + by symbol. + """ + balances = {} + + for currency in self.currencies: + resp = await self.json_rpc( + 'private/get_account_summary', params={ + 'currency': currency.upper()}) + + balances[currency] = resp.result['balance'] + + return balances + async def symbol_info( self, instrument: Optional[str] = None, @@ -305,7 +340,7 @@ class Client: async def search_symbols( self, pattern: str, - limit: int = None, + limit: int = 30, ) -> dict[str, Any]: if self._pairs is not None: data = self._pairs @@ -315,7 +350,8 @@ class Client: matches = fuzzy.extractBests( pattern, data, - score_cutoff=50, + score_cutoff=35, + limit=limit ) # repack in dict form return {item[0]['instrument_name']: item[0] @@ -397,7 +433,6 @@ async def get_client() -> Client: trio.open_nursery() as n, open_autorecon_ws(_testnet_ws_url) as ws ): - client = Client(n, ws) await client.start_rpc() await client.cache_symbols() diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index ae4d3bfc..a4ff9ae1 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -177,7 +177,11 @@ async def open_aio_cryptofeed_relay( # sync with trio to_trio.send_nowait(None) - await asyncio.sleep(float('inf')) + try: + await asyncio.sleep(float('inf')) + + except asyncio.exceptions.CancelledError: + ... @acm @@ -262,7 +266,6 @@ async def stream_quotes( async with ( open_cached_client('deribit') as client, send_chan as send_chan, - trio.open_nursery() as n, open_cryptofeeds(symbols) as stream ): @@ -284,23 +287,31 @@ async def stream_quotes( # keep client cached for real-time section cache = await client.cache_symbols() - last_trade = Trade(**(await client.last_trades( - cb_sym_to_deribit_inst(nsym), count=1)).trades[0]) - - first_quote = { - 'symbol': sym, - 'last': last_trade.price, - 'brokerd_ts': last_trade.timestamp, - 'ticks': [{ - 'type': 'trade', - 'price': last_trade.price, - 'size': last_trade.amount, - 'broker_ts': last_trade.timestamp - }] - } - task_status.started((init_msgs, first_quote)) - async with aclosing(stream): + last_trades = (await client.last_trades( + cb_sym_to_deribit_inst(nsym), count=1)).trades + + if len(last_trades) == 0: + async for typ, quote in stream: + if typ == 'trade': + last_trade = Trade(**quote['data']) + + else: + last_trade = Trade(**last_trades[0]) + + first_quote = { + 'symbol': sym, + 'last': last_trade.price, + 'brokerd_ts': last_trade.timestamp, + 'ticks': [{ + 'type': 'trade', + 'price': last_trade.price, + 'size': last_trade.amount, + 'broker_ts': last_trade.timestamp + }] + } + task_status.started((init_msgs, first_quote)) + feed_is_live.set() async for typ, quote in stream: @@ -321,15 +332,6 @@ async def open_symbol_search( async with ctx.open_stream() as stream: async for pattern in stream: - # results = await client.symbol_info(sym=pattern.upper()) - - matches = fuzzy.extractBests( - pattern, - cache, - score_cutoff=30, - ) # repack in dict form await stream.send( - {item[0]['instrument_name']: item[0] - for item in matches} - ) + await client.search_symbols(pattern))