From 3aea296caac27464e867b76101747c4366c52f09 Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Wed, 29 Jan 2025 18:08:00 -0300 Subject: [PATCH 01/13] Venues Moved from api to venues all the msgspecs structs, also added critical imports in api, feed and __init__ mods. --- piker/brokers/deribit/__init__.py | 4 + piker/brokers/deribit/api.py | 77 ++++-------- piker/brokers/deribit/feed.py | 10 +- piker/brokers/deribit/venues.py | 191 ++++++++++++++++++++++++++++++ 4 files changed, 228 insertions(+), 54 deletions(-) create mode 100644 piker/brokers/deribit/venues.py diff --git a/piker/brokers/deribit/__init__.py b/piker/brokers/deribit/__init__.py index 4c0c1850..7499cd35 100644 --- a/piker/brokers/deribit/__init__.py +++ b/piker/brokers/deribit/__init__.py @@ -34,6 +34,9 @@ from .feed import ( # open_trade_dialog, # norm_trade_records, # ) +from .venues import ( + OptionPair, +) log = get_logger(__name__) @@ -43,6 +46,7 @@ __all__ = [ 'open_history_client', 'open_symbol_search', 'stream_quotes', + 'OptionPair', # 'norm_trade_records', ] diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 03cc301e..b9b98ad9 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -19,10 +19,14 @@ Deribit backend. ''' import asyncio +from collections import ChainMap from contextlib import ( asynccontextmanager as acm, ) from datetime import datetime +from decimal import ( + Decimal, +) from functools import partial import time from typing import ( @@ -31,7 +35,7 @@ from typing import ( Callable, ) -import pendulum +from pendulum import now import trio from trio_typing import TaskStatus from rapidfuzz import process as fuzzy @@ -51,7 +55,25 @@ from cryptofeed.defines import ( OPTION, CALL, PUT ) from cryptofeed.symbols import Symbol - +# types for managing the cb callbacks. +# from cryptofeed.types import L1Book +from .venues import ( + _ws_url, + MarketType, + PAIRTYPES, + Pair, + OptionPair, + JSONRPCResult, + JSONRPCChannel, + KLinesResult, + Trade, + LastTradesResult, +) +from piker.accounting import ( + Asset, + digits_to_dec, + MktPair, +) from piker.data import ( def_iohlcv_fields, match_from_pairs, @@ -74,57 +96,6 @@ _spawn_kwargs = { } -_url = 'https://www.deribit.com' -_ws_url = 'wss://www.deribit.com/ws/api/v2' -_testnet_ws_url = 'wss://test.deribit.com/ws/api/v2' - - -class JSONRPCResult(Struct): - jsonrpc: str = '2.0' - id: int - result: Optional[list[dict]] = None - error: Optional[dict] = None - usIn: int - usOut: int - usDiff: int - testnet: bool - -class JSONRPCChannel(Struct): - jsonrpc: str = '2.0' - method: str - params: dict - - -class KLinesResult(Struct): - close: list[float] - cost: list[float] - high: list[float] - low: list[float] - open: list[float] - status: str - ticks: list[int] - volume: list[float] - -class Trade(Struct): - trade_seq: int - trade_id: str - timestamp: int - tick_direction: int - price: float - mark_price: float - iv: float - instrument_name: str - index_price: float - direction: str - combo_trade_id: Optional[int] = 0, - combo_id: Optional[str] = '', - amount: float - -class LastTradesResult(Struct): - trades: list[Trade] - has_more: bool - - # convert datetime obj timestamp to unixtime in milliseconds def deribit_timestamp(when): return int((when.timestamp() * 1000) + (when.microsecond / 1000)) diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 821aab87..bd2fbe06 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -21,11 +21,15 @@ Deribit backend. from contextlib import asynccontextmanager as acm from datetime import datetime from typing import Any, Optional, Callable +from pprint import pformat import time import trio from trio_typing import TaskStatus -import pendulum +from pendulum import ( + from_timestamp, + now, +) from rapidfuzz import process as fuzzy import numpy as np import tractor @@ -50,6 +54,10 @@ from .api import ( str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, maybe_open_price_feed ) +from .venues import ( + Pair, + OptionPair, +) _spawn_kwargs = { 'infect_asyncio': True, diff --git a/piker/brokers/deribit/venues.py b/piker/brokers/deribit/venues.py new file mode 100644 index 00000000..91a1583f --- /dev/null +++ b/piker/brokers/deribit/venues.py @@ -0,0 +1,191 @@ +# piker: trading gear for hackers +# Copyright (C) Tyler Goodlet (in stewardship for pikers) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +""" +Per market data-type definitions and schemas types. + +""" +from __future__ import annotations +import pendulum +from typing import ( + Literal, +) +from decimal import Decimal + +from msgspec import field + +from piker.types import Struct + + +# API endpoint paths by venue / sub-API +_domain: str = 'deribit.com' +_url = f'https://www.{_domain}' + +# WEBsocketz +_ws_url: str = f'wss://www.{_domain}/ws/api/v2' + +# test nets +_testnet_ws_url: str = f'wss://test.{_domain}/ws/api/v2' + +MarketType = Literal[ + 'option' +] + + +def get_api_eps(venue: MarketType) -> tuple[str, str]: + ''' + Return API ep root paths per venue. + + ''' + return { + 'option': ( + _ws_url, + ), + }[venue] + + +class Pair(Struct, frozen=True, kw_only=True): + + symbol: str + + # src + quote_currency: str # 'BTC' + + # dst + base_currency: str # "BTC", + + tick_size: float # 0.0001 # [{'above_price': 0.005, 'tick_size': 0.0005}] + tick_size_steps: list[dict[str, float]] + + @property + def price_tick(self) -> Decimal: + return Decimal(str(self.tick_size_steps[0]['above_price'])) + + @property + def size_tick(self) -> Decimal: + return Decimal(str(self.tick_size)) + + @property + def bs_fqme(self) -> str: + return f'{self.symbol}' + + @property + def bs_mktid(self) -> str: + return f'{self.symbol}.{self.venue}' + + +class OptionPair(Pair, frozen=True): + + taker_commission: float # 0.0003 + strike: float # 5000.0 + settlement_period: str # 'day' + settlement_currency: str # "BTC", + rfq: bool # false + price_index: str # 'btc_usd' + option_type: str # 'call' + min_trade_amount: float # 0.1 + maker_commission: float # 0.0003 + kind: str # 'option' + is_active: bool # true + instrument_type: str # 'reversed' + instrument_name: str # 'BTC-1SEP24-55000-C' + instrument_id: int # 364671 + expiration_timestamp: int # 1725177600000 + creation_timestamp: int # 1724918461000 + counter_currency: str # 'USD' + contract_size: float # '1.0' + block_trade_tick_size: float # '0.0001' + block_trade_min_trade_amount: int # '25' + block_trade_commission: float # '0.003' + + + # NOTE: see `.data._symcache.SymbologyCache.load()` for why + ns_path: str = 'piker.brokers.deribit:OptionPair' + + @property + def expiry(self) -> str: + iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat() + return iso_date + + @property + def venue(self) -> str: + return 'option' + + @property + def bs_fqme(self) -> str: + return f'{self.symbol}' + + @property + def bs_src_asset(self) -> str: + return f'{self.quote_currency}' + + @property + def bs_dst_asset(self) -> str: + return f'{self.symbol}' + + +PAIRTYPES: dict[MarketType, Pair] = { + 'option': OptionPair, +} + + +class JSONRPCResult(Struct): + id: int + usIn: int + usOut: int + usDiff: int + testnet: bool + jsonrpc: str = '2.0' + error: Optional[dict] = None + result: Optional[list[dict]] = None + +class JSONRPCChannel(Struct): + method: str + params: dict + jsonrpc: str = '2.0' + + +class KLinesResult(Struct): + low: list[float] + cost: list[float] + high: list[float] + open: list[float] + close: list[float] + ticks: list[int] + status: str + volume: list[float] + +class Trade(Struct): + iv: float + price: float + amount: float + trade_id: str + contracts: float + direction: str + trade_seq: int + timestamp: int + mark_price: float + index_price: float + tick_direction: int + instrument_name: str + combo_id: Optional[str] = '', + combo_trade_id: Optional[int] = 0, + block_trade_id: Optional[str] = '', + block_trade_leg_count: Optional[int] = 0, + +class LastTradesResult(Struct): + trades: list[Trade] + has_more: bool -- 2.34.1 From 1061103f764928c67f461cea8e6f9d84ec822314 Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Wed, 29 Jan 2025 18:44:41 -0300 Subject: [PATCH 02/13] Deribit's api fix key changes: - Resolved the issue with the expiration dates from deribits, now we int instead of the crazy custom deribits format. - The client now has a new `_json_rpc_auth_wrapper` that adquires a first access token and then will refresh the access token when this expires. - `get_assets` fixed, now we use the public endpoint to check the availables assets, in the future probably this will change, but for now is working just fine. - `get_mkt_pairs` added. - `exch_info` added. - `cache_symbols` fixed. - Also a lot of reformat made in api. --- piker/brokers/deribit/api.py | 425 +++++++++++++++++++++-------------- 1 file changed, 257 insertions(+), 168 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index b9b98ad9..3b19ff59 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -113,13 +113,15 @@ def str_to_cb_sym(name: str) -> Symbol: else: raise Exception("Couldn\'t parse option type") + new_expiry_date = get_values_from_cb_normalized_date(expiry_date) + return Symbol( - base, quote, + base=base, + quote=quote, type=OPTION, strike_price=strike_price, option_type=option_type, - expiry_date=expiry_date, - expiry_normalize=False) + expiry_date=new_expiry_date) def piker_sym_to_cb_sym(name: str) -> Symbol: @@ -130,83 +132,138 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: if option_type == 'P': option_type = PUT - elif option_type == 'C': + elif option_type == 'C': option_type = CALL else: raise Exception("Couldn\'t parse option type") return Symbol( - base, quote, + base=base, + quote=quote, type=OPTION, strike_price=strike_price, option_type=option_type, - expiry_date=expiry_date.upper()) + expiry_date=expiry_date) def cb_sym_to_deribit_inst(sym: Symbol): - # cryptofeed normalized - cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] - - # deribit specific - months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] - - exp = sym.expiry_date - - # YYMDD - # 01234 - year, month, day = ( - exp[:2], months[cb_norm.index(exp[2:3])], exp[3:]) - + new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) otype = 'C' if sym.option_type == CALL else 'P' - return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' + return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' + + +def get_values_from_cb_normalized_date(expiry_date: str) -> str: + # deribit specific + cb_norm = [ + 'F', 'G', 'H', 'J', + 'K', 'M', 'N', 'Q', + 'U', 'V', 'X', 'Z' + ] + months = [ + 'JAN', 'FEB', 'MAR', 'APR', + 'MAY', 'JUN', 'JUL', 'AUG', + 'SEP', 'OCT', 'NOV', 'DEC' + ] + # YYMDD + # 01234 + day, month, year = ( + expiry_date[3:], + months[cb_norm.index(expiry_date[2:3])], + expiry_date[:2] + ) + return f'{day}{month}{year}' def get_config() -> dict[str, Any]: - conf, path = config.load() + conf: dict + path: Path + conf, path = config.load( + conf_name='brokers', + touch_if_dne=True, + ) + section: dict = {} section = conf.get('deribit') - - # TODO: document why we send this, basically because logging params for cryptofeed - conf['log'] = {} - conf['log']['disabled'] = True - if section is None: log.warning(f'No config section found for deribit in {path}') + return {} - return conf + conf_option = section.get('option', {}) + section.clear # clear the dict to reuse it + section['deribit'] = {} + section['deribit']['key_id'] = conf_option.get('api_key') + section['deribit']['key_secret'] = conf_option.get('api_secret') + + section['log'] = {} + section['log']['filename'] = 'feedhandler.log' + section['log']['level'] = 'DEBUG' + + return section class Client: - def __init__(self, json_rpc: Callable) -> None: - self._pairs: dict[str, Any] = None + def __init__( + self, + + json_rpc: Callable + + ) -> None: + self._pairs: ChainMap[str, Pair] = ChainMap() config = get_config().get('deribit', {}) - if ('key_id' in config) and ('key_secret' in config): - self._key_id = config['key_id'] - self._key_secret = config['key_secret'] - - else: - self._key_id = None - self._key_secret = None + self._key_id = config.get('key_id') + self._key_secret = config.get('key_secret') self.json_rpc = json_rpc - @property - def currencies(self): - return ['btc', 'eth', 'sol', 'usd'] + self._auth_ts = None + self._auth_renew_ts = 5 # seconds to renew auth - async def get_balances(self, kind: str = 'option') -> dict[str, float]: + async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult: + + """Background task that adquires a first access token and then will + refresh the access token. + + https://docs.deribit.com/?python#authentication-2 + """ + access_scope = 'trade:read_write' + current_ts = time.time() + + if not self._auth_ts or current_ts - self._auth_ts < self._auth_renew_ts: + # if we are close to token expiry time + + params = { + 'grant_type': 'client_credentials', + 'client_id': self._key_id, + 'client_secret': self._key_secret, + 'scope': access_scope + } + + resp = await self.json_rpc('public/auth', params) + result = resp.result + + self._auth_ts = time.time() + result['expires_in'] + + return await self.json_rpc(*args, **kwargs) + + + + + async def get_balances( + self, + kind: str = 'option' + ) -> dict[str, float]: """Return the set of positions for this account by symbol. """ balances = {} for currency in self.currencies: - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'private/get_positions', params={ 'currency': currency.upper(), 'kind': kind}) @@ -215,20 +272,46 @@ class Client: return balances - async def get_assets(self) -> dict[str, float]: + async def get_assets( + self, + venue: str | None = None, + + ) -> dict[str, Asset]: """Return the set of asset balances for this account by symbol. """ - balances = {} + assets = {} + resp = await self._json_rpc_auth_wrapper( + 'public/get_currencies', + params={} + ) + currencies = resp.result + for currency in currencies: + name = currency['currency'] + tx_tick = digits_to_dec(currency['fee_precision']) + atype='crypto_currency' + assets[name] = Asset( + name=name, + atype=atype, + tx_tick=tx_tick) - for currency in self.currencies: - resp = await self.json_rpc( - 'private/get_account_summary', params={ - 'currency': currency.upper()}) + instruments = await self.symbol_info(currency=name) + for instrument in instruments: + pair = instruments[instrument] + assets[pair.symbol] = Asset( + name=pair.symbol, + atype=pair.venue, + tx_tick=pair.size_tick) - balances[currency] = resp.result['balance'] + return assets - return balances + async def get_mkt_pairs(self) -> dict[str, Pair]: + flat: dict[str, Pair] = {} + for key in self._pairs: + item = self._pairs.get(key) + flat[item.bs_fqme] = item + + return flat async def submit_limit( self, @@ -245,7 +328,7 @@ class Client: 'type': 'limit', 'price': price, } - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( f'private/{action}', params) return resp.result @@ -253,10 +336,32 @@ class Client: async def submit_cancel(self, oid: str): """Send cancel request for order id """ - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'private/cancel', {'order_id': oid}) return resp.result + async def exch_info( + self, + sym: str | None = None, + + venue: MarketType = 'option', + expiry: str | None = None, + + ) -> dict[str, Pair] | Pair: + + pair_table: dict[str, Pair] = self._pairs + + if ( + sym + and (cached_pair := pair_table.get(sym)) + ): + return cached_pair + + if sym: + return pair_table[sym] + else: + return self._pairs + async def symbol_info( self, instrument: Optional[str] = None, @@ -264,7 +369,7 @@ class Client: kind: str = 'option', expired: bool = False - ) -> dict[str, dict]: + ) -> dict[str, Pair] | Pair: ''' Get symbol infos. @@ -279,28 +384,65 @@ class Client: 'expired': str(expired).lower() } - resp: JSONRPCResult = await self.json_rpc( + resp: JSONRPCResult = await self._json_rpc_auth_wrapper( 'public/get_instruments', params, ) # convert to symbol-keyed table + pair_type: Type = PAIRTYPES[kind] results: list[dict] | None = resp.result - instruments: dict[str, dict] = { - item['instrument_name'].lower(): item - for item in results - } + + instruments: dict[str, Pair] = {} + for item in results: + symbol=item['instrument_name'].lower() + try: + pair: Pair = pair_type( + symbol=symbol, + **item + ) + except Exception as e: + e.add_note( + "\nDon't panic, prolly stupid deribit changed their symbology schema again..\n" + 'Check out their API docs here:\n\n' + 'https://docs.deribit.com/?python#deribit-api-v2-1-1' + ) + raise + + instruments[symbol] = pair if instrument is not None: - return instruments[instrument] + return instruments[instrument.lower()] else: return instruments async def cache_symbols( self, - ) -> dict: + venue: MarketType = 'option', - if not self._pairs: - self._pairs = await self.symbol_info() + ) -> None: + # lookup internal mkt-specific pair table to update + pair_table: dict[str, Pair] = self._pairs + + # make API request(s) + mkt_pairs = await self.symbol_info() + + if not mkt_pairs: + raise SymbolNotFound(f'No market pairs found!?:\n{resp}') + + pairs_view_subtable: dict[str, Pair] = {} + + for instrument in mkt_pairs: + pair_type: Type = PAIRTYPES[venue] + + pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) + + pair_table[pair.symbol.upper()] = pair + + # update an additional top-level-cross-venue-table + # `._pairs: ChainMap` for search B0 + pairs_view_subtable[pair.bs_fqme] = pair + + self._pairs.maps.append(pairs_view_subtable) return self._pairs @@ -308,37 +450,35 @@ class Client: self, pattern: str, limit: int = 30, - ) -> dict[str, Any]: + ) -> dict[str, Pair]: ''' Fuzzy search symbology set for pairs matching `pattern`. ''' - pairs: dict[str, Any] = await self.symbol_info() - matches: dict[str, Pair] = match_from_pairs( + pairs: dict[str, Pair] = await self.exch_info() + + return match_from_pairs( pairs=pairs, query=pattern.upper(), score_cutoff=35, limit=limit ) - # repack in name-keyed table - return { - pair['instrument_name'].lower(): pair - for pair in matches.values() - } - async def bars( self, - symbol: str, + mkt: MktPair, + start_dt: Optional[datetime] = None, end_dt: Optional[datetime] = None, + limit: int = 1000, as_np: bool = True, - ) -> dict: - instrument = symbol + + ) -> list[tuple] | np.ndarray: + instrument: str = mkt.bs_fqme.split('.')[0] if end_dt is None: - end_dt = pendulum.now('UTC') + end_dt = now('UTC') if start_dt is None: start_dt = end_dt.start_of( @@ -348,7 +488,7 @@ class Client: end_time = deribit_timestamp(end_dt) # https://docs.deribit.com/#public-get_tradingview_chart_data - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'public/get_tradingview_chart_data', params={ 'instrument_name': instrument.upper(), @@ -358,36 +498,34 @@ class Client: }) result = KLinesResult(**resp.result) - new_bars = [] + new_bars: list[tuple] = [] for i in range(len(result.close)): - _open = result.open[i] - high = result.high[i] - low = result.low[i] - close = result.close[i] - volume = result.volume[i] - - row = [ + row = [ (start_time + (i * (60 * 1000))) / 1000.0, # time result.open[i], result.high[i], result.low[i], result.close[i], - result.volume[i], - 0 + result.volume[i] ] new_bars.append((i,) + tuple(row)) - array = np.array(new_bars, dtype=def_iohlcv_fields) if as_np else klines - return array + if not as_np: + return result + + return np.array( + new_bars, + dtype=def_iohlcv_fields + ) async def last_trades( self, instrument: str, count: int = 10 ): - resp = await self.json_rpc( + resp = await self._json_rpc_auth_wrapper( 'public/get_last_trades_by_instrument', params={ 'instrument_name': instrument, @@ -399,78 +537,17 @@ class Client: @acm async def get_client( - is_brokercheck: bool = False + is_brokercheck: bool = False, + venue: MarketType = 'option', ) -> Client: async with ( trio.open_nursery() as n, open_jsonrpc_session( - _testnet_ws_url, dtype=JSONRPCResult) as json_rpc + _ws_url, response_type=JSONRPCResult + ) as json_rpc ): client = Client(json_rpc) - - _refresh_token: Optional[str] = None - _access_token: Optional[str] = None - - async def _auth_loop( - task_status: TaskStatus = trio.TASK_STATUS_IGNORED - ): - """Background task that adquires a first access token and then will - refresh the access token while the nursery isn't cancelled. - - https://docs.deribit.com/?python#authentication-2 - """ - renew_time = 10 - access_scope = 'trade:read_write' - _expiry_time = time.time() - got_access = False - nonlocal _refresh_token - nonlocal _access_token - - while True: - if time.time() - _expiry_time < renew_time: - # if we are close to token expiry time - - if _refresh_token != None: - # if we have a refresh token already dont need to send - # secret - params = { - 'grant_type': 'refresh_token', - 'refresh_token': _refresh_token, - 'scope': access_scope - } - - else: - # we don't have refresh token, send secret to initialize - params = { - 'grant_type': 'client_credentials', - 'client_id': client._key_id, - 'client_secret': client._key_secret, - 'scope': access_scope - } - - resp = await json_rpc('public/auth', params) - result = resp.result - - _expiry_time = time.time() + result['expires_in'] - _refresh_token = result['refresh_token'] - - if 'access_token' in result: - _access_token = result['access_token'] - - if not got_access: - # first time this loop runs we must indicate task is - # started, we have auth - got_access = True - task_status.started() - - else: - await trio.sleep(renew_time / 2) - - # if we have client creds launch auth loop - if client._key_id is not None: - await n.start(_auth_loop) - await client.cache_symbols() yield client n.cancel_scope.cancel() @@ -494,7 +571,7 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: async def aio_price_feed_relay( fh: FeedHandler, - instrument: Symbol, + instrument: str, from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, ) -> None: @@ -513,21 +590,33 @@ async def aio_price_feed_relay( 'symbol': cb_sym_to_deribit_inst( str_to_cb_sym(data.symbol)).lower(), 'ticks': [ - {'type': 'bid', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'bsize', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'ask', - 'price': float(data.ask_price), 'size': float(data.ask_size)}, - {'type': 'asize', - 'price': float(data.ask_price), 'size': float(data.ask_size)} + { + 'type': 'bid', + 'price': float(data.bid_price), + 'size': float(data.bid_size) + }, + { + 'type': 'bsize', + 'price': float(data.bid_price), + 'size': float(data.bid_size) + }, + { + 'type': 'ask', + 'price': float(data.ask_price), + 'size': float(data.ask_size) + }, + { + 'type': 'asize', + 'price': float(data.ask_price), + 'size': float(data.ask_size) + } ] })) - + sym: Symbol = piker_sym_to_cb_sym(instrument) fh.add_feed( DERIBIT, channels=[TRADES, L1_BOOK], - symbols=[piker_sym_to_cb_sym(instrument)], + symbols=[sym], callbacks={ TRADES: _trade, L1_BOOK: _l1 @@ -568,9 +657,9 @@ async def maybe_open_price_feed( async with maybe_open_context( acm_func=open_price_feed, kwargs={ - 'instrument': instrument + 'instrument': instrument.split('.')[0] }, - key=f'{instrument}-price', + key=f'{instrument.split('.')[0]}-price', ) as (cache_hit, feed): if cache_hit: yield broadcast_receiver(feed, 10) @@ -635,10 +724,10 @@ async def maybe_open_order_feed( async with maybe_open_context( acm_func=open_order_feed, kwargs={ - 'instrument': instrument, + 'instrument': instrument.split('.')[0], 'fh': fh }, - key=f'{instrument}-order', + key=f'{instrument.split('.')[0]}-order', ) as (cache_hit, feed): if cache_hit: yield broadcast_receiver(feed, 10) -- 2.34.1 From f1436c93dbfefedad2ac57d604d6a8fc8ed28a60 Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Wed, 29 Jan 2025 19:01:37 -0300 Subject: [PATCH 03/13] Deribit's feed fix - `FeedInit` for init_msgs in `stream_quotes`. - new cache is `client_pairs` so is replacing the old `client.cache_symbols`. - `get_mkt_info` added - `get_ohlc` fixed to comply the new ways of the feed. --- piker/brokers/__init__.py | 2 +- piker/brokers/deribit/__init__.py | 2 + piker/brokers/deribit/feed.py | 142 ++++++++++++++++++++++++------ 3 files changed, 117 insertions(+), 29 deletions(-) diff --git a/piker/brokers/__init__.py b/piker/brokers/__init__.py index 0c328d9f..e096af16 100644 --- a/piker/brokers/__init__.py +++ b/piker/brokers/__init__.py @@ -51,6 +51,7 @@ __brokers__: list[str] = [ 'ib', 'kraken', 'kucoin', + 'deribit', # broken but used to work # 'questrade', @@ -61,7 +62,6 @@ __brokers__: list[str] = [ # wstrade # iex - # deribit # bitso ] diff --git a/piker/brokers/deribit/__init__.py b/piker/brokers/deribit/__init__.py index 7499cd35..5e87a708 100644 --- a/piker/brokers/deribit/__init__.py +++ b/piker/brokers/deribit/__init__.py @@ -25,6 +25,7 @@ from .api import ( get_client, ) from .feed import ( + get_mkt_info, open_history_client, open_symbol_search, stream_quotes, @@ -43,6 +44,7 @@ log = get_logger(__name__) __all__ = [ 'get_client', # 'trades_dialogue', + 'get_mkt_info', 'open_history_client', 'open_symbol_search', 'stream_quotes', diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index bd2fbe06..e32e31d2 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -34,9 +34,20 @@ from rapidfuzz import process as fuzzy import numpy as np import tractor -from piker.brokers import open_cached_client +from piker.accounting import ( + MktPair, + unpack_fqme, +) +from piker.brokers import ( + open_cached_client, + NoData, +) +from piker._cacheables import ( + async_lifo_cache, +) from piker.log import get_logger, get_console_log from piker.data import ShmArray +from piker.data.validate import FeedInit from piker.brokers._util import ( BrokerError, DataUnavailable, @@ -51,7 +62,7 @@ from cryptofeed.symbols import Symbol from .api import ( Client, Trade, get_config, - str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, + piker_sym_to_cb_sym, cb_sym_to_deribit_inst, maybe_open_price_feed ) from .venues import ( @@ -72,36 +83,107 @@ async def open_history_client( mkt: MktPair, ) -> tuple[Callable, int]: - fnstrument: str = mkt.bs_fqme # TODO implement history getter for the new storage layer. async with open_cached_client('deribit') as client: async def get_ohlc( - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, + timeframe: float, + end_dt: datetime | None = None, + start_dt: datetime | None = None, ) -> tuple[ np.ndarray, datetime, # start datetime, # end ]: + if timeframe != 60: + raise DataUnavailable('Only 1m bars are supported') - array = await client.bars( - instrument, + array: np.ndarray = await client.bars( + mkt, start_dt=start_dt, end_dt=end_dt, ) if len(array) == 0: - raise DataUnavailable + raise NoData( + f'No frame for {start_dt} -> {end_dt}\n' + ) - start_dt = pendulum.from_timestamp(array[0]['time']) - end_dt = pendulum.from_timestamp(array[-1]['time']) + start_dt = from_timestamp(array[0]['time']) + end_dt = from_timestamp(array[-1]['time']) + + times = array['time'] + if not times.any(): + raise ValueError( + 'Bad frame with null-times?\n\n' + f'{times}' + ) + + if end_dt is None: + inow: int = round(time.time()) + if (inow - times[-1]) > 60: + await tractor.pause() return array, start_dt, end_dt yield get_ohlc, {'erlangs': 3, 'rate': 3} +@async_lifo_cache() +async def get_mkt_info( + fqme: str, + +) -> tuple[MktPair, Pair] | None: + + # uppercase since kraken bs_mktid is always upper + if 'deribit' not in fqme.lower(): + fqme += '.deribit' + + mkt_mode: str = '' + broker, mkt_ep, venue, expiry = unpack_fqme(fqme) + + # NOTE: we always upper case all tokens to be consistent with + # binance's symbology style for pairs, like `BTCUSDT`, but in + # theory we could also just keep things lower case; as long as + # we're consistent and the symcache matches whatever this func + # returns, always! + expiry: str = expiry.upper() + venue: str = venue.upper() + venue_lower: str = venue.lower() + + mkt_mode: str = 'option' + + async with open_cached_client( + 'deribit', + ) as client: + + assets: dict[str, Asset] = await client.get_assets() + pair_str: str = mkt_ep.lower() + + pair: Pair = await client.exch_info( + sym=pair_str, + ) + mkt_mode = pair.venue + client.mkt_mode = mkt_mode + + dst: Asset | None = assets.get(pair.bs_dst_asset) + src: Asset | None = assets.get(pair.bs_src_asset) + + mkt = MktPair( + dst=dst, + src=src, + price_tick=pair.price_tick, + size_tick=pair.size_tick, + bs_mktid=pair.symbol, + expiry=pair.expiry, + venue=mkt_mode, + broker='deribit', + _atype=mkt_mode, + _fqme_without_src=True, + ) + return mkt, pair + + async def stream_quotes( send_chan: trio.abc.SendChannel, @@ -116,31 +198,26 @@ async def stream_quotes( # XXX: required to propagate ``tractor`` loglevel to piker logging get_console_log(loglevel or tractor.current_actor().loglevel) - sym = symbols[0] + sym = symbols[0].split('.')[0] + + init_msgs: list[FeedInit] = [] async with ( open_cached_client('deribit') as client, send_chan as send_chan ): - init_msgs = { - # pass back token, and bool, signalling if we're the writer - # and that history has been written - sym: { - 'symbol_info': { - 'asset_type': 'option', - 'price_tick_size': 0.0005 - }, - 'shm_write_opts': {'sum_tick_vml': False}, - 'fqsn': sym, - }, - } + mkt, pair = await get_mkt_info(sym) + # build out init msgs according to latest spec + init_msgs.append( + FeedInit(mkt_info=mkt) + ) nsym = piker_sym_to_cb_sym(sym) async with maybe_open_price_feed(sym) as stream: - cache = await client.cache_symbols() + cache = client._pairs last_trades = (await client.last_trades( cb_sym_to_deribit_inst(nsym), count=1)).trades @@ -182,12 +259,21 @@ async def open_symbol_search( async with open_cached_client('deribit') as client: # load all symbols locally for fast search - cache = await client.cache_symbols() + cache = client._pairs await ctx.started() async with ctx.open_stream() as stream: + pattern: str async for pattern in stream: - # repack in dict form - await stream.send( - await client.search_symbols(pattern)) + # NOTE: pattern fuzzy-matching is done within + # the methd impl. + pairs: dict[str, Pair] = await client.search_symbols( + pattern, + ) + # repack in fqme-keyed table + byfqme: dict[str, Pair] = {} + for pair in pairs.values(): + byfqme[pair.bs_fqme] = pair + + await stream.send(byfqme) -- 2.34.1 From ec6dd7cafc4683cef014ad12db94f6ce7d1f85b4 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Nov 2024 17:05:13 -0500 Subject: [PATCH 04/13] 'Fix `Optional` and use `'linear/reverse'` in `OptionPair.venue`' --- piker/brokers/deribit/venues.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/piker/brokers/deribit/venues.py b/piker/brokers/deribit/venues.py index 91a1583f..f74c970e 100644 --- a/piker/brokers/deribit/venues.py +++ b/piker/brokers/deribit/venues.py @@ -22,6 +22,7 @@ from __future__ import annotations import pendulum from typing import ( Literal, + Optional, ) from decimal import Decimal @@ -111,7 +112,6 @@ class OptionPair(Pair, frozen=True): block_trade_min_trade_amount: int # '25' block_trade_commission: float # '0.003' - # NOTE: see `.data._symcache.SymbologyCache.load()` for why ns_path: str = 'piker.brokers.deribit:OptionPair' @@ -122,7 +122,7 @@ class OptionPair(Pair, frozen=True): @property def venue(self) -> str: - return 'option' + return f'{self.instrument_type}_option' @property def bs_fqme(self) -> str: -- 2.34.1 From 438e69e42c2cae6184956ebf545919326640020f Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Nov 2024 17:09:16 -0500 Subject: [PATCH 05/13] `.deribit.api` bit of tidying/typing There were some imports missing or unused as well as a variety of spots that had grokability issues due to missing type hints. Other tweaks as part some more thorough manual testing: - always raise when not `brokers.toml` section since the API can never work (no free data without keys). - inline the `Asset.atype='crypto_currency` field despite it maybe not being the best value for `OptionPair` instruments.. - tossed in a now-masked pause block for debugging history queries in `Client.bars()`. - commented out all the live order ctl (internal) endpoints for now since they're unused. --- piker/brokers/deribit/api.py | 205 +++++++++++++++++++---------------- 1 file changed, 113 insertions(+), 92 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 3b19ff59..68f76d4d 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -29,6 +29,7 @@ from decimal import ( ) from functools import partial import time +from pathlib import Path from typing import ( Any, Optional, @@ -37,8 +38,6 @@ from typing import ( from pendulum import now import trio -from trio_typing import TaskStatus -from rapidfuzz import process as fuzzy import numpy as np from tractor.trionics import ( broadcast_receiver, @@ -55,8 +54,10 @@ from cryptofeed.defines import ( OPTION, CALL, PUT ) from cryptofeed.symbols import Symbol + # types for managing the cb callbacks. # from cryptofeed.types import L1Book +from piker.brokers import SymbolNotFound from .venues import ( _ws_url, MarketType, @@ -64,9 +65,9 @@ from .venues import ( Pair, OptionPair, JSONRPCResult, - JSONRPCChannel, + # JSONRPCChannel, KLinesResult, - Trade, + # Trade, LastTradesResult, ) from piker.accounting import ( @@ -77,7 +78,7 @@ from piker.accounting import ( from piker.data import ( def_iohlcv_fields, match_from_pairs, - Struct, + # Struct, ) from piker.data._web_bs import ( open_jsonrpc_session @@ -97,7 +98,7 @@ _spawn_kwargs = { # convert datetime obj timestamp to unixtime in milliseconds -def deribit_timestamp(when): +def deribit_timestamp(when) -> int: return int((when.timestamp() * 1000) + (when.microsecond / 1000)) @@ -179,16 +180,18 @@ def get_config() -> dict[str, Any]: conf: dict path: Path - conf, path = config.load( conf_name='brokers', touch_if_dne=True, ) - section: dict = {} - section = conf.get('deribit') + section: dict|None = conf.get('deribit') if section is None: - log.warning(f'No config section found for deribit in {path}') - return {} + raise ValueError( + f'No `[deribit]` section found in\n' + f'{path!r}\n\n' + f'See the template config from the core repo for samples..\n' + # f'' + ) conf_option = section.get('option', {}) section.clear # clear the dict to reuse it @@ -223,8 +226,12 @@ class Client: self._auth_ts = None self._auth_renew_ts = 5 # seconds to renew auth - async def _json_rpc_auth_wrapper(self, *args, **kwargs) -> JSONRPCResult: - + async def _json_rpc_auth_wrapper( + self, + *args, + **kwargs, + ) -> JSONRPCResult: + """Background task that adquires a first access token and then will refresh the access token. @@ -250,9 +257,6 @@ class Client: return await self.json_rpc(*args, **kwargs) - - - async def get_balances( self, kind: str = 'option' @@ -277,23 +281,29 @@ class Client: venue: str | None = None, ) -> dict[str, Asset]: - """Return the set of asset balances for this account - by symbol. - """ + ''' + Return the set of asset balances for this account + by (deribit's) symbol. + + + ''' assets = {} resp = await self._json_rpc_auth_wrapper( 'public/get_currencies', params={} ) - currencies = resp.result + currencies: list[dict] = resp.result for currency in currencies: - name = currency['currency'] - tx_tick = digits_to_dec(currency['fee_precision']) - atype='crypto_currency' + name: str = currency['currency'] + tx_tick: Decimal = digits_to_dec(currency['fee_precision']) + + # TODO, handling of options, futures, perps etc. more + # specifically with diff `.atype`s? assets[name] = Asset( name=name, - atype=atype, - tx_tick=tx_tick) + atype='crypto_currency', + tx_tick=tx_tick, + ) instruments = await self.symbol_info(currency=name) for instrument in instruments: @@ -301,9 +311,10 @@ class Client: assets[pair.symbol] = Asset( name=pair.symbol, atype=pair.venue, - tx_tick=pair.size_tick) + tx_tick=pair.size_tick, + ) - return assets + return assets async def get_mkt_pairs(self) -> dict[str, Pair]: flat: dict[str, Pair] = {} @@ -381,7 +392,7 @@ class Client: params: dict[str, str] = { 'currency': currency.upper(), 'kind': kind, - 'expired': str(expired).lower() + 'expired': expired, } resp: JSONRPCResult = await self._json_rpc_auth_wrapper( @@ -389,9 +400,9 @@ class Client: params, ) # convert to symbol-keyed table - pair_type: Type = PAIRTYPES[kind] + pair_type: Pair = PAIRTYPES[kind] results: list[dict] | None = resp.result - + instruments: dict[str, Pair] = {} for item in results: symbol=item['instrument_name'].lower() @@ -427,12 +438,15 @@ class Client: mkt_pairs = await self.symbol_info() if not mkt_pairs: - raise SymbolNotFound(f'No market pairs found!?:\n{resp}') + raise SymbolNotFound( + f'No market pairs found!?:\n' + f'{mkt_pairs}' + ) pairs_view_subtable: dict[str, Pair] = {} for instrument in mkt_pairs: - pair_type: Type = PAIRTYPES[venue] + pair_type: Pair|OptionPair = PAIRTYPES[venue] pair: Pair = pair_type(**mkt_pairs[instrument].to_dict()) @@ -480,12 +494,14 @@ class Client: if end_dt is None: end_dt = now('UTC') + _orig_start_dt = start_dt if start_dt is None: start_dt = end_dt.start_of( - 'minute').subtract(minutes=limit) + 'minute' + ).subtract(minutes=limit) - start_time = deribit_timestamp(start_dt) - end_time = deribit_timestamp(end_dt) + start_time: int = deribit_timestamp(start_dt) + end_time: int = deribit_timestamp(end_dt) # https://docs.deribit.com/#public-get_tradingview_chart_data resp = await self._json_rpc_auth_wrapper( @@ -499,9 +515,13 @@ class Client: result = KLinesResult(**resp.result) new_bars: list[tuple] = [] - for i in range(len(result.close)): + # if _orig_start_dt is None: + # if not new_bars: + # import tractor + # await tractor.pause() - row = [ + for i in range(len(result.close)): + row = [ (start_time + (i * (60 * 1000))) / 1000.0, # time result.open[i], result.high[i], @@ -668,68 +688,69 @@ async def maybe_open_price_feed( -async def aio_order_feed_relay( - fh: FeedHandler, - instrument: Symbol, - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, -) -> None: - async def _fill(data: dict, receipt_timestamp): - breakpoint() +# TODO, move all to `.broker` submod! +# async def aio_order_feed_relay( +# fh: FeedHandler, +# instrument: Symbol, +# from_trio: asyncio.Queue, +# to_trio: trio.abc.SendChannel, +# ) -> None: +# async def _fill(data: dict, receipt_timestamp): +# breakpoint() - async def _order_info(data: dict, receipt_timestamp): - breakpoint() +# async def _order_info(data: dict, receipt_timestamp): +# breakpoint() - fh.add_feed( - DERIBIT, - channels=[FILLS, ORDER_INFO], - symbols=[instrument.upper()], - callbacks={ - FILLS: _fill, - ORDER_INFO: _order_info, - }) +# fh.add_feed( +# DERIBIT, +# channels=[FILLS, ORDER_INFO], +# symbols=[instrument.upper()], +# callbacks={ +# FILLS: _fill, +# ORDER_INFO: _order_info, +# }) - if not fh.running: - fh.run( - start_loop=False, - install_signal_handlers=False) +# if not fh.running: +# fh.run( +# start_loop=False, +# install_signal_handlers=False) - # sync with trio - to_trio.send_nowait(None) +# # sync with trio +# to_trio.send_nowait(None) - await asyncio.sleep(float('inf')) +# await asyncio.sleep(float('inf')) -@acm -async def open_order_feed( - instrument: list[str] -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( - partial( - aio_order_feed_relay, - fh, - instrument - ) - ) as (first, chan): - yield chan +# @acm +# async def open_order_feed( +# instrument: list[str] +# ) -> trio.abc.ReceiveStream: +# async with maybe_open_feed_handler() as fh: +# async with to_asyncio.open_channel_from( +# partial( +# aio_order_feed_relay, +# fh, +# instrument +# ) +# ) as (first, chan): +# yield chan -@acm -async def maybe_open_order_feed( - instrument: str -) -> trio.abc.ReceiveStream: +# @acm +# async def maybe_open_order_feed( +# instrument: str +# ) -> trio.abc.ReceiveStream: - # TODO: add a predicate to maybe_open_context - async with maybe_open_context( - acm_func=open_order_feed, - kwargs={ - 'instrument': instrument.split('.')[0], - 'fh': fh - }, - key=f'{instrument.split('.')[0]}-order', - ) as (cache_hit, feed): - if cache_hit: - yield broadcast_receiver(feed, 10) - else: - yield feed +# # TODO: add a predicate to maybe_open_context +# async with maybe_open_context( +# acm_func=open_order_feed, +# kwargs={ +# 'instrument': instrument.split('.')[0], +# 'fh': fh +# }, +# key=f'{instrument.split('.')[0]}-order', +# ) as (cache_hit, feed): +# if cache_hit: +# yield broadcast_receiver(feed, 10) +# else: +# yield feed -- 2.34.1 From 5b87b3c2a64f0c9407949bb736fc594b1b860cd7 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Nov 2024 17:45:39 -0500 Subject: [PATCH 06/13] Signal hist start using `OptionPair.creation_timestamp` Such that the `get_hist()` query func raises `DataUnavailable` with an explicit message regarding the start of the (option) contract's lifetime. Other, - mask some unused imports (for now?) - drop a duplicate `tractor.get_console_log()` call which was causing duplicate console emits (it's already setup by brokerd init now). - comment various unused code bits i found. - add a info log around live quotes so we can see for the moment when they actually occur.. XD --- piker/brokers/deribit/feed.py | 105 +++++++++++++++++++++++++--------- 1 file changed, 78 insertions(+), 27 deletions(-) diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index e32e31d2..b3a202ba 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -18,56 +18,65 @@ Deribit backend. ''' +from __future__ import annotations from contextlib import asynccontextmanager as acm from datetime import datetime -from typing import Any, Optional, Callable -from pprint import pformat +from typing import ( + # Any, + # Optional, + Callable, +) +# from pprint import pformat import time import trio from trio_typing import TaskStatus from pendulum import ( from_timestamp, - now, ) -from rapidfuzz import process as fuzzy import numpy as np import tractor from piker.accounting import ( + Asset, MktPair, unpack_fqme, ) from piker.brokers import ( open_cached_client, NoData, + DataUnavailable, ) from piker._cacheables import ( async_lifo_cache, ) -from piker.log import get_logger, get_console_log -from piker.data import ShmArray +from piker.log import ( + get_logger, +) from piker.data.validate import FeedInit -from piker.brokers._util import ( - BrokerError, - DataUnavailable, -) -from cryptofeed import FeedHandler -from cryptofeed.defines import ( - DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT -) -from cryptofeed.symbols import Symbol +# from cryptofeed import FeedHandler +# from cryptofeed.defines import ( +# DERIBIT, +# L1_BOOK, +# TRADES, +# OPTION, +# CALL, +# PUT, +# ) +# from cryptofeed.symbols import Symbol from .api import ( - Client, Trade, - get_config, - piker_sym_to_cb_sym, cb_sym_to_deribit_inst, + Client, + # get_config, + piker_sym_to_cb_sym, + cb_sym_to_deribit_inst, maybe_open_price_feed ) from .venues import ( Pair, OptionPair, + Trade, ) _spawn_kwargs = { @@ -86,6 +95,10 @@ async def open_history_client( # TODO implement history getter for the new storage layer. async with open_cached_client('deribit') as client: + pair: OptionPair = client._pairs[mkt.dst.name] + # XXX NOTE, the cuckers use ms !!! + creation_time_s: int = pair.creation_timestamp/1000 + async def get_ohlc( timeframe: float, end_dt: datetime | None = None, @@ -105,6 +118,31 @@ async def open_history_client( end_dt=end_dt, ) if len(array) == 0: + if ( + end_dt is None + ): + raise DataUnavailable( + 'No history seems to exist yet?\n\n' + f'{mkt}' + ) + elif ( + end_dt + and + end_dt.timestamp() < creation_time_s + ): + # the contract can't have history + # before it was created. + pair_type_str: str = type(pair).__name__ + create_dt: datetime = from_timestamp(creation_time_s) + raise DataUnavailable( + f'No history prior to\n' + f'`{pair_type_str}.creation_timestamp: int = ' + f'{pair.creation_timestamp}\n\n' + f'------ deribit sux ------\n' + f'WHICH IN "NORMAL PEOPLE WHO USE EPOCH TIME" form is,\n' + f'creation_time_s: {creation_time_s}\n' + f'create_dt: {create_dt}\n' + ) raise NoData( f'No frame for {start_dt} -> {end_dt}\n' ) @@ -126,14 +164,20 @@ async def open_history_client( return array, start_dt, end_dt - yield get_ohlc, {'erlangs': 3, 'rate': 3} + yield ( + get_ohlc, + { # backfill config + 'erlangs': 3, + 'rate': 3, + } + ) @async_lifo_cache() async def get_mkt_info( fqme: str, -) -> tuple[MktPair, Pair] | None: +) -> tuple[MktPair, Pair|OptionPair] | None: # uppercase since kraken bs_mktid is always upper if 'deribit' not in fqme.lower(): @@ -149,7 +193,7 @@ async def get_mkt_info( # returns, always! expiry: str = expiry.upper() venue: str = venue.upper() - venue_lower: str = venue.lower() + # venue_lower: str = venue.lower() mkt_mode: str = 'option' @@ -195,8 +239,6 @@ async def stream_quotes( task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) sym = symbols[0].split('.')[0] @@ -217,7 +259,8 @@ async def stream_quotes( async with maybe_open_price_feed(sym) as stream: - cache = client._pairs + # TODO, uhh use it ?? XD + # cache = client._pairs last_trades = (await client.last_trades( cb_sym_to_deribit_inst(nsym), count=1)).trades @@ -247,9 +290,16 @@ async def stream_quotes( feed_is_live.set() + # deliver until cancelled async for typ, quote in stream: - topic = quote['symbol'] - await send_chan.send({topic: quote}) + topic: str = quote['symbol'] + log.info( + f'deribit {typ!r} quote\n\n' + f'{quote}\n' + ) + await send_chan.send({ + topic: quote, + }) @tractor.context @@ -259,13 +309,14 @@ async def open_symbol_search( async with open_cached_client('deribit') as client: # load all symbols locally for fast search - cache = client._pairs + # cache = client._pairs await ctx.started() async with ctx.open_stream() as stream: pattern: str async for pattern in stream: + # NOTE: pattern fuzzy-matching is done within # the methd impl. pairs: dict[str, Pair] = await client.search_symbols( -- 2.34.1 From 1e0c3da32d893ee543ef80c2577ba235ee44d7f0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Nov 2024 17:50:26 -0500 Subject: [PATCH 07/13] Report the closest (via fuzzy match) pairs on unmatched input --- piker/brokers/deribit/api.py | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 68f76d4d..0450b9eb 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -28,8 +28,9 @@ from decimal import ( Decimal, ) from functools import partial -import time from pathlib import Path +from pprint import pformat +import time from typing import ( Any, Optional, @@ -369,6 +370,19 @@ class Client: return cached_pair if sym: + opt: OptionPair|None = pair_table.get(sym) + if not opt: + closest_matches: dict[str, Pair] = match_from_pairs( + pairs=pair_table, + query=sym, + score_cutoff=40, + ) + closest_syms: list[str] = list(closest_matches.keys()) + raise ValueError( + f'No contract found for {sym!r}\n\n' + f'Closest {len(closest_syms)} available contracts:\n\n' + f'{pformat(closest_syms)}\n' + ) return pair_table[sym] else: return self._pairs -- 2.34.1 From 04421e5ad262dae688640373935bec6e8cd9b757 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Nov 2024 21:13:21 -0500 Subject: [PATCH 08/13] .deribit.venues: add todo for an ideal `OptionPair.expiry` fmt/value --- piker/brokers/deribit/venues.py | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/piker/brokers/deribit/venues.py b/piker/brokers/deribit/venues.py index f74c970e..0179c5f0 100644 --- a/piker/brokers/deribit/venues.py +++ b/piker/brokers/deribit/venues.py @@ -26,8 +26,6 @@ from typing import ( ) from decimal import Decimal -from msgspec import field - from piker.types import Struct @@ -115,9 +113,13 @@ class OptionPair(Pair, frozen=True): # NOTE: see `.data._symcache.SymbologyCache.load()` for why ns_path: str = 'piker.brokers.deribit:OptionPair' + # TODO, impl this without the MM:SS part of + # the `'THH:MM:SS..'` etc.. @property def expiry(self) -> str: - iso_date = pendulum.from_timestamp(self.expiration_timestamp / 1000).isoformat() + iso_date = pendulum.from_timestamp( + self.expiration_timestamp / 1000 + ).isoformat() return iso_date @property -- 2.34.1 From 8bd0a182cfd6f1dc6c28b450af918923b61131cc Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Nov 2024 21:14:11 -0500 Subject: [PATCH 09/13] Bit more `cryptofeed` adapter formatting and typing for clarity.. --- piker/brokers/deribit/api.py | 106 ++++++++++++++++++++++------------- 1 file changed, 66 insertions(+), 40 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 0450b9eb..5945e634 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -123,14 +123,20 @@ def str_to_cb_sym(name: str) -> Symbol: type=OPTION, strike_price=strike_price, option_type=option_type, - expiry_date=new_expiry_date) + expiry_date=new_expiry_date + ) def piker_sym_to_cb_sym(name: str) -> Symbol: - base, expiry_date, strike_price, option_type = tuple( + ( + base, + expiry_date, + strike_price, + option_type, + )= tuple( name.upper().split('-')) - quote = base + quote: str = base if option_type == 'P': option_type = PUT @@ -145,7 +151,8 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: type=OPTION, strike_price=strike_price, option_type=option_type, - expiry_date=expiry_date) + expiry_date=expiry_date + ) def cb_sym_to_deribit_inst(sym: Symbol): @@ -208,7 +215,10 @@ def get_config() -> dict[str, Any]: class Client: + ''' + Hi-level interface for the jsron-RPC over websocket API. + ''' def __init__( self, @@ -609,43 +619,59 @@ async def aio_price_feed_relay( from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, ) -> None: - async def _trade(data: dict, receipt_timestamp): - to_trio.send_nowait(('trade', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'last': data, - 'broker_ts': time.time(), - 'data': data.to_dict(), - 'receipt': receipt_timestamp - })) - async def _l1(data: dict, receipt_timestamp): - to_trio.send_nowait(('l1', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'ticks': [ - { - 'type': 'bid', - 'price': float(data.bid_price), - 'size': float(data.bid_size) - }, - { - 'type': 'bsize', - 'price': float(data.bid_price), - 'size': float(data.bid_size) - }, - { - 'type': 'ask', - 'price': float(data.ask_price), - 'size': float(data.ask_size) - }, - { - 'type': 'asize', - 'price': float(data.ask_price), - 'size': float(data.ask_size) - } - ] - })) + async def _trade( + data: dict, + receipt_timestamp: int, + ) -> None: + ''' + Send `cryptofeed.FeedHandler` quotes to `piker`-side + `trio.Task`. + + ''' + to_trio.send_nowait(( + 'trade', { + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), + 'last': data, + 'broker_ts': time.time(), + 'data': data.to_dict(), + 'receipt': receipt_timestamp, + }, + )) + + async def _l1( + data: dict, + receipt_timestamp: int, + ) -> None: + to_trio.send_nowait(( + 'l1', { + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), + 'ticks': [ + { + 'type': 'bid', + 'price': float(data.bid_price), + 'size': float(data.bid_size) + }, + { + 'type': 'bsize', + 'price': float(data.bid_price), + 'size': float(data.bid_size) + }, + { + 'type': 'ask', + 'price': float(data.ask_price), + 'size': float(data.ask_size) + }, + { + 'type': 'asize', + 'price': float(data.ask_price), + 'size': float(data.ask_size) + } + ] + }, + )) sym: Symbol = piker_sym_to_cb_sym(instrument) fh.add_feed( DERIBIT, -- 2.34.1 From dae17bb04397a6256dd33e68c29f47fbb226c2b0 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Nov 2024 21:14:33 -0500 Subject: [PATCH 10/13] `.deribit.feed`: get live quotes workin (again) The quote-msg `'topic'` field was being set and sent as the `OptionPair.symbol: str` value instead of as the `MktPair.bs_fqme: str` as is required for matching on the `piker.data.feed` side. So change to that and simplify the actual `.bs_fqme: str` value to NOT include the ISO-format time (for now) since it's a big ugly and longer term we need a `piker`-fqme friendly-on-ze-eyes format/style anyway.. --- piker/brokers/deribit/feed.py | 68 +++++++++++++++++++++-------------- 1 file changed, 41 insertions(+), 27 deletions(-) diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index b3a202ba..28c1cbed 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -29,6 +29,7 @@ from typing import ( # from pprint import pformat import time +import cryptofeed import trio from trio_typing import TaskStatus from pendulum import ( @@ -52,19 +53,10 @@ from piker._cacheables import ( ) from piker.log import ( get_logger, + mk_repr, ) from piker.data.validate import FeedInit -# from cryptofeed import FeedHandler -# from cryptofeed.defines import ( -# DERIBIT, -# L1_BOOK, -# TRADES, -# OPTION, -# CALL, -# PUT, -# ) -# from cryptofeed.symbols import Symbol from .api import ( Client, @@ -219,51 +211,64 @@ async def get_mkt_info( price_tick=pair.price_tick, size_tick=pair.size_tick, bs_mktid=pair.symbol, - expiry=pair.expiry, venue=mkt_mode, broker='deribit', _atype=mkt_mode, _fqme_without_src=True, + + # expiry=pair.expiry, + # XXX TODO, currently we don't use it since it's + # already "described" in the `OptionPair.symbol: str` + # and if we slap in the ISO repr it's kinda hideous.. + # -[ ] figure out the best either std ) return mkt, pair async def stream_quotes( - send_chan: trio.abc.SendChannel, symbols: list[str], feed_is_live: trio.Event, - loglevel: str = None, # startup sync task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, ) -> None: + ''' + Open a live quote stream for the market set defined by `symbols`. + ''' sym = symbols[0].split('.')[0] - init_msgs: list[FeedInit] = [] + # multiline nested `dict` formatter (since rn quote-msgs are + # just that). + pfmt: Callable[[str], str] = mk_repr() + async with ( open_cached_client('deribit') as client, send_chan as send_chan ): - + mkt: MktPair + pair: Pair mkt, pair = await get_mkt_info(sym) # build out init msgs according to latest spec init_msgs.append( - FeedInit(mkt_info=mkt) + FeedInit( + mkt_info=mkt, + ) ) - nsym = piker_sym_to_cb_sym(sym) + # build `cryptofeed` feed-handle + cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym) async with maybe_open_price_feed(sym) as stream: - - # TODO, uhh use it ?? XD - # cache = client._pairs - - last_trades = (await client.last_trades( - cb_sym_to_deribit_inst(nsym), count=1)).trades + last_trades = ( + await client.last_trades( + cb_sym_to_deribit_inst(cf_sym), + count=1, + ) + ).trades if len(last_trades) == 0: last_trade = None @@ -286,16 +291,25 @@ async def stream_quotes( 'broker_ts': last_trade.timestamp }] } - task_status.started((init_msgs, first_quote)) + task_status.started(( + init_msgs, + first_quote, + )) feed_is_live.set() + # NOTE XXX, static for now! + # => since this only handles ONE mkt feed at a time we + # don't need a lookup table to map interleaved quotes + # from multiple possible mkt-pairs + topic: str = mkt.bs_fqme + # deliver until cancelled async for typ, quote in stream: - topic: str = quote['symbol'] + sym: str = quote['symbol'] log.info( - f'deribit {typ!r} quote\n\n' - f'{quote}\n' + f'deribit {typ!r} quote for {sym!r}\n\n' + f'{pfmt(quote)}\n' ) await send_chan.send({ topic: quote, -- 2.34.1 From 75ddba09f7da2581747e23a06c1f4da752e87f39 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 22 Nov 2024 14:58:30 -0500 Subject: [PATCH 11/13] `deribit.feed`: fix "trade" event streaming The main change needed to make `piker.data.feed._FeedsBus` work was to correctly format the `'trade'` msgs with the (new schema) expected `'ticks': list[dict]` field which, - we compute the `piker` quote-msg-`dict` from the (now directly proxied through) `cryptofeed.types.Trade`'s fields inside the body of `stream_quotes()`. - similarly, move the `'l1'` msg processing, **out of** the `asyncio`-side `_l1()` callback (defined as a closure in `.api.aio_price_feed_relay()` and passed to the `cryptofeed.FeedHandler`) and instead mod the callback to simply pass through the `.types.L1Book` ref directly to the `piker`/`trio` side task for conversion. In support of all that, - mask-to-drop the alt-branch to wait on a first rt event when the `cryptofeed.LastTradesResult.trades: list[Trade]` is empty; doesn't seem like this ever even happens? - add a buncha typing, comments and doc-strs to the routines in `.deribit.api` including notes on where we can choose to mod the `.bs_fqme` for our eventually preferred `piker` style format. - simplify some nested `@acm` enters to the new single `async with )` style. - be particularly pedantic about typing `tractor.to_asyncio.LinkedTaskChannel` - bit of pep8 line-spacing fixes in `.venues`. --- piker/brokers/deribit/api.py | 133 +++++++++++++++++--------------- piker/brokers/deribit/feed.py | 117 ++++++++++++++++++++++------ piker/brokers/deribit/venues.py | 3 + 3 files changed, 167 insertions(+), 86 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index 5945e634..f846a5c0 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -55,9 +55,10 @@ from cryptofeed.defines import ( OPTION, CALL, PUT ) from cryptofeed.symbols import Symbol - -# types for managing the cb callbacks. -# from cryptofeed.types import L1Book +from cryptofeed.types import ( + L1Book, + Trade, +) from piker.brokers import SymbolNotFound from .venues import ( _ws_url, @@ -66,9 +67,7 @@ from .venues import ( Pair, OptionPair, JSONRPCResult, - # JSONRPCChannel, KLinesResult, - # Trade, LastTradesResult, ) from piker.accounting import ( @@ -98,9 +97,17 @@ _spawn_kwargs = { } -# convert datetime obj timestamp to unixtime in milliseconds -def deribit_timestamp(when) -> int: - return int((when.timestamp() * 1000) + (when.microsecond / 1000)) +def deribit_timestamp(when: datetime) -> int: + ''' + Convert conventional epoch timestamp, in secs, to unixtime in + milliseconds. + + ''' + return int( + (when.timestamp() * 1000) + + + (when.microsecond / 1000) + ) def str_to_cb_sym(name: str) -> Symbol: @@ -155,11 +162,28 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: ) -def cb_sym_to_deribit_inst(sym: Symbol): - new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) - otype = 'C' if sym.option_type == CALL else 'P' +# TODO, instead can't we just lookup the `MktPair` directly +# and pass it upward to `stream_quotes()`?? +def cb_sym_to_deribit_inst(sym: Symbol) -> str: + ''' + Generate our own internal `str`-repr for a `cryptofeed.Symbol` + uniquely from its fields. - return f'{sym.base}-{new_expiry_date}-{sym.strike_price}-{otype}' + This is the equiv of generating a `Pair.fmqe` from `cryptofeed` + for now i suppose..? + + ''' + new_expiry_date = get_values_from_cb_normalized_date(sym.expiry_date) + otype = ( + 'C' if sym.option_type == CALL + else 'P' + ) + return ( + f'{sym.base}-' + f'{new_expiry_date}-' + f'{sym.strike_price}-' + f'{otype}' + ) def get_values_from_cb_normalized_date(expiry_date: str) -> str: @@ -598,7 +622,7 @@ async def get_client( @acm -async def open_feed_handler(): +async def open_feed_handler() -> FeedHandler: fh = FeedHandler(config=get_config()) yield fh await to_asyncio.run_task(fh.stop_async) @@ -619,59 +643,37 @@ async def aio_price_feed_relay( from_trio: asyncio.Queue, to_trio: trio.abc.SendChannel, ) -> None: + ''' + Relay price feed quotes from the `cryptofeed.FeedHandler` to + the `piker`-side `trio.task` consumers for delivery to consumer + sub-actors for various subsystems. + ''' async def _trade( - data: dict, + trade: Trade, # cryptofeed, NOT ours from `.venues`! receipt_timestamp: int, ) -> None: ''' - Send `cryptofeed.FeedHandler` quotes to `piker`-side - `trio.Task`. + Proxy-thru `cryptofeed.FeedHandler` "trades" to `piker`-side. ''' - to_trio.send_nowait(( - 'trade', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'last': data, - 'broker_ts': time.time(), - 'data': data.to_dict(), - 'receipt': receipt_timestamp, - }, - )) + to_trio.send_nowait(('trade', trade)) async def _l1( - data: dict, + book: L1Book, receipt_timestamp: int, ) -> None: - to_trio.send_nowait(( - 'l1', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'ticks': [ - { - 'type': 'bid', - 'price': float(data.bid_price), - 'size': float(data.bid_size) - }, - { - 'type': 'bsize', - 'price': float(data.bid_price), - 'size': float(data.bid_size) - }, - { - 'type': 'ask', - 'price': float(data.ask_price), - 'size': float(data.ask_size) - }, - { - 'type': 'asize', - 'price': float(data.ask_price), - 'size': float(data.ask_size) - } - ] - }, - )) + ''' + Relay-thru "l1 book" updates. + + ''' + + to_trio.send_nowait(('l1', book)) + + # TODO, make this work! + # -[ ] why isn't this working in `tractor.pause_from_sync()`?? + # breakpoint() + sym: Symbol = piker_sym_to_cb_sym(instrument) fh.add_feed( DERIBIT, @@ -685,27 +687,35 @@ async def aio_price_feed_relay( if not fh.running: fh.run( start_loop=False, - install_signal_handlers=False) + install_signal_handlers=False + ) # sync with trio to_trio.send_nowait(None) + # run until cancelled await asyncio.sleep(float('inf')) @acm async def open_price_feed( instrument: str -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( +) -> to_asyncio.LinkedTaskChannel: + + fh: FeedHandler + first: None + chan: to_asyncio.LinkedTaskChannel + async with ( + maybe_open_feed_handler() as fh, + to_asyncio.open_channel_from( partial( aio_price_feed_relay, fh, instrument ) - ) as (first, chan): - yield chan + ) as (first, chan) + ): + yield chan @acm @@ -714,6 +724,7 @@ async def maybe_open_price_feed( ) -> trio.abc.ReceiveStream: # TODO: add a predicate to maybe_open_context + feed: to_asyncio.LinkedTaskChannel async with maybe_open_context( acm_func=open_price_feed, kwargs={ diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index 28c1cbed..efd43ea5 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -63,6 +63,7 @@ from .api import ( # get_config, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, + str_to_cb_sym, maybe_open_price_feed ) from .venues import ( @@ -237,13 +238,19 @@ async def stream_quotes( ''' Open a live quote stream for the market set defined by `symbols`. + Internally this starts a `cryptofeed.FeedHandler` inside an `asyncio`-side + task and relays through L1 and `Trade` msgs here to our `trio.Task`. + ''' sym = symbols[0].split('.')[0] init_msgs: list[FeedInit] = [] # multiline nested `dict` formatter (since rn quote-msgs are # just that). - pfmt: Callable[[str], str] = mk_repr() + pfmt: Callable[[str], str] = mk_repr( + # so we can see `deribit`'s delightfully mega-long bs fields.. + maxstring=100, + ) async with ( open_cached_client('deribit') as client, @@ -262,25 +269,31 @@ async def stream_quotes( # build `cryptofeed` feed-handle cf_sym: cryptofeed.Symbol = piker_sym_to_cb_sym(sym) - async with maybe_open_price_feed(sym) as stream: - last_trades = ( - await client.last_trades( - cb_sym_to_deribit_inst(cf_sym), - count=1, - ) - ).trades + from_cf: tractor.to_asyncio.LinkedTaskChannel + async with maybe_open_price_feed(sym) as from_cf: - if len(last_trades) == 0: - last_trade = None - async for typ, quote in stream: - if typ == 'trade': - last_trade = Trade(**(quote['data'])) - break + # load the "last trades" summary + last_trades_res: cryptofeed.LastTradesResult = await client.last_trades( + cb_sym_to_deribit_inst(cf_sym), + count=1, + ) + last_trades: list[Trade] = last_trades_res.trades - else: - last_trade = Trade(**(last_trades[0])) + # TODO, do we even need this or will the above always + # work? + # if not last_trades: + # await tractor.pause() + # async for typ, quote in from_cf: + # if typ == 'trade': + # last_trade = Trade(**(quote['data'])) + # break - first_quote = { + # else: + last_trade = Trade( + **(last_trades[0]) + ) + + first_quote: dict = { 'symbol': sym, 'last': last_trade.price, 'brokerd_ts': last_trade.timestamp, @@ -305,14 +318,69 @@ async def stream_quotes( topic: str = mkt.bs_fqme # deliver until cancelled - async for typ, quote in stream: - sym: str = quote['symbol'] - log.info( - f'deribit {typ!r} quote for {sym!r}\n\n' - f'{pfmt(quote)}\n' - ) + async for typ, ref in from_cf: + match typ: + case 'trade': + trade: cryptofeed.types.Trade = ref + + # TODO, re-impl this according to teh ideal + # fqme for opts that we choose!! + bs_fqme: str = cb_sym_to_deribit_inst( + str_to_cb_sym(trade.symbol) + ).lower() + + piker_quote: dict = { + 'symbol': bs_fqme, + 'last': trade.price, + 'broker_ts': time.time(), + # ^TODO, name this `brokerd/datad_ts` and + # use `time.time_ns()` ?? + 'ticks': [{ + 'type': 'trade', + 'price': float(trade.price), + 'size': float(trade.amount), + 'broker_ts': trade.timestamp, + }], + } + log.info( + f'deribit {typ!r} quote for {sym!r}\n\n' + f'{trade}\n\n' + f'{pfmt(piker_quote)}\n' + ) + + case 'l1': + book: cryptofeed.types.L1Book = ref + + # TODO, so this is where we can possibly change things + # and instead lever the `MktPair.bs_fqme: str` output? + bs_fqme: str = cb_sym_to_deribit_inst( + str_to_cb_sym(book.symbol) + ).lower() + + piker_quote: dict = { + 'symbol': bs_fqme, + 'ticks': [ + + {'type': 'bid', + 'price': float(book.bid_price), + 'size': float(book.bid_size)}, + + {'type': 'bsize', + 'price': float(book.bid_price), + 'size': float(book.bid_size),}, + + {'type': 'ask', + 'price': float(book.ask_price), + 'size': float(book.ask_size),}, + + {'type': 'asize', + 'price': float(book.ask_price), + 'size': float(book.ask_size),} + ] + } + await send_chan.send({ - topic: quote, + topic: piker_quote, }) @@ -327,7 +395,6 @@ async def open_symbol_search( await ctx.started() async with ctx.open_stream() as stream: - pattern: str async for pattern in stream: diff --git a/piker/brokers/deribit/venues.py b/piker/brokers/deribit/venues.py index 0179c5f0..0dda913e 100644 --- a/piker/brokers/deribit/venues.py +++ b/piker/brokers/deribit/venues.py @@ -154,6 +154,7 @@ class JSONRPCResult(Struct): error: Optional[dict] = None result: Optional[list[dict]] = None + class JSONRPCChannel(Struct): method: str params: dict @@ -170,6 +171,7 @@ class KLinesResult(Struct): status: str volume: list[float] + class Trade(Struct): iv: float price: float @@ -188,6 +190,7 @@ class Trade(Struct): block_trade_id: Optional[str] = '', block_trade_leg_count: Optional[int] = 0, + class LastTradesResult(Struct): trades: list[Trade] has_more: bool -- 2.34.1 From 8a9d21468abc442cc2df59b4161d150363af064d Mon Sep 17 00:00:00 2001 From: Nelson Torres Date: Thu, 30 Jan 2025 01:38:37 -0300 Subject: [PATCH 12/13] Deribit api key changes introduce: - `get_timestamp_int`: added this is the hack, so we can aboid use the custom deribit date format. - `get_currencies`: added so we could get all deribit's available currencies. - Also a couple of format fixes. --- piker/brokers/deribit/api.py | 68 +++++++++++++++++++++++------------- 1 file changed, 44 insertions(+), 24 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index f846a5c0..ee9c7033 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -52,12 +52,14 @@ from cryptofeed import FeedHandler from cryptofeed.defines import ( DERIBIT, L1_BOOK, TRADES, - OPTION, CALL, PUT + OPTION, CALL, PUT, + OPEN_INTEREST, ) from cryptofeed.symbols import Symbol from cryptofeed.types import ( L1Book, Trade, + OpenInterest, ) from piker.brokers import SymbolNotFound from .venues import ( @@ -110,20 +112,25 @@ def deribit_timestamp(when: datetime) -> int: ) +def get_timestamp_int(expiry_date: str) -> int: + return int(time.mktime(time.strptime(expiry_date, '%d%b%y'))) + + def str_to_cb_sym(name: str) -> Symbol: base, strike_price, expiry_date, option_type = name.split('-') quote = base if option_type == 'put': - option_type = PUT - elif option_type == 'call': + option_type = PUT + elif option_type == 'call': option_type = CALL else: raise Exception("Couldn\'t parse option type") - new_expiry_date = get_values_from_cb_normalized_date(expiry_date) - + new_expiry_date: int = get_timestamp_int( + get_values_from_cb_normalized_date(expiry_date) + ) return Symbol( base=base, quote=quote, @@ -143,11 +150,12 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: )= tuple( name.upper().split('-')) + new_expiry_date = get_timestamp_int(expiry_date) quote: str = base - if option_type == 'P': - option_type = PUT - elif option_type == 'C': + if option_type == 'P' or option_type == 'PUT': + option_type = PUT + elif option_type == 'C' or option_type == 'CALL': option_type = CALL else: raise Exception("Couldn\'t parse option type") @@ -158,7 +166,7 @@ def piker_sym_to_cb_sym(name: str) -> Symbol: type=OPTION, strike_price=strike_price, option_type=option_type, - expiry_date=expiry_date + expiry_date=new_expiry_date ) @@ -226,16 +234,18 @@ def get_config() -> dict[str, Any]: ) conf_option = section.get('option', {}) - section.clear # clear the dict to reuse it - section['deribit'] = {} - section['deribit']['key_id'] = conf_option.get('api_key') - section['deribit']['key_secret'] = conf_option.get('api_secret') - - section['log'] = {} - section['log']['filename'] = 'feedhandler.log' - section['log']['level'] = 'DEBUG' - - return section + conf_log = conf_option.get('log', {}) + return { + 'deribit': { + 'key_id': conf_option['key_id'], + 'key_secret': conf_option['key_secret'], + }, + 'log': { + 'filename': conf_log['filename'], + 'level': conf_log['level'], + 'disabled': conf_log['disabled'], + } + } class Client: @@ -311,6 +321,20 @@ class Client: return balances + async def get_currencies( + self, + + ) -> list[dict]: + ''' + Return the set of currencies for deribit. + ''' + assets = {} + resp = await self._json_rpc_auth_wrapper( + 'public/get_currencies', + params={} + ) + return resp.result + async def get_assets( self, venue: str | None = None, @@ -323,11 +347,7 @@ class Client: ''' assets = {} - resp = await self._json_rpc_auth_wrapper( - 'public/get_currencies', - params={} - ) - currencies: list[dict] = resp.result + currencies = await self.get_currencies() for currency in currencies: name: str = currency['currency'] tx_tick: Decimal = digits_to_dec(currency['fee_precision']) -- 2.34.1 From b209512eb638d7c8a32297a47d8ab4d173bec1d1 Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Tue, 19 Nov 2024 21:05:16 -0500 Subject: [PATCH 13/13] Add `.log.mk_repr()` to create `reprlib.Repr`s --- piker/log.py | 28 ++++++++++++++++++++++++++++ 1 file changed, 28 insertions(+) diff --git a/piker/log.py b/piker/log.py index 56776e1e..7f554f16 100644 --- a/piker/log.py +++ b/piker/log.py @@ -18,7 +18,11 @@ Log like a forester! """ import logging +import reprlib import json +from typing import ( + Callable, +) import tractor from pygments import ( @@ -84,3 +88,27 @@ def colorize_json( # likeable styles: algol_nu, tango, monokai formatters.TerminalTrueColorFormatter(style=style) ) + + +def mk_repr( + **repr_kws, +) -> Callable[[str], str]: + ''' + Allocate and deliver a `repr.Repr` instance with provided input + settings using the std-lib's `reprlib` mod, + * https://docs.python.org/3/library/reprlib.html + + ------ Ex. ------ + An up to 6-layer-nested `dict` as multi-line: + - https://stackoverflow.com/a/79102479 + - https://docs.python.org/3/library/reprlib.html#reprlib.Repr.maxlevel + + ''' + def_kws: dict[str, int] = dict( + indent=2, + maxlevel=6, # recursion levels + maxstring=66, # match editor line-len limit + ) + def_kws |= repr_kws + reprr = reprlib.Repr(**def_kws) + return reprr.repr -- 2.34.1