From f55f56a29f9b87c4fe463bff9ceb4bf08a28f6fc Mon Sep 17 00:00:00 2001 From: Guillermo Rodriguez Date: Sat, 16 Jul 2022 12:44:26 -0300 Subject: [PATCH] Refactored deribit backend into new multi file format --- piker/brokers/deribit/__init__.py | 64 +++++ piker/brokers/deribit/api.py | 266 +++++++++++++++++ piker/brokers/{deribit.py => deribit/feed.py} | 271 ++---------------- 3 files changed, 352 insertions(+), 249 deletions(-) create mode 100644 piker/brokers/deribit/__init__.py create mode 100644 piker/brokers/deribit/api.py rename piker/brokers/{deribit.py => deribit/feed.py} (59%) diff --git a/piker/brokers/deribit/__init__.py b/piker/brokers/deribit/__init__.py new file mode 100644 index 00000000..6794482e --- /dev/null +++ b/piker/brokers/deribit/__init__.py @@ -0,0 +1,64 @@ +# piker: trading gear for hackers +# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) + +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Deribit backend. + +''' + +from piker.log import get_logger + +log = get_logger(__name__) + +from .api import ( + get_client, +) +from .feed import ( + open_history_client, + open_symbol_search, + stream_quotes, +) +# from .broker import ( +# trades_dialogue, +# norm_trade_records, +# ) + +__all__ = [ + 'get_client', +# 'trades_dialogue', + 'open_history_client', + 'open_symbol_search', + 'stream_quotes', +# 'norm_trade_records', +] + + +# tractor RPC enable arg +__enable_modules__: list[str] = [ + 'api', + 'feed', +# 'broker', +] + +# passed to ``tractor.ActorNursery.start_actor()`` +_spawn_kwargs = { + 'infect_asyncio': True, +} + +# annotation to let backend agnostic code +# know if ``brokerd`` should be spawned with +# ``tractor``'s aio mode. +_infect_asyncio: bool = True diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py new file mode 100644 index 00000000..e1550ec5 --- /dev/null +++ b/piker/brokers/deribit/api.py @@ -0,0 +1,266 @@ +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as published by +# the Free Software Foundation, either version 3 of the License, or +# (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. + +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see . + +''' +Deribit backend. + +''' + +from contextlib import asynccontextmanager as acm +from datetime import datetime +from typing import Any, Optional, List + +import pendulum +import asks +from fuzzywuzzy import process as fuzzy +import numpy as np +from pydantic import BaseModel + +from .._util import resproc + +from piker import config +from piker.log import get_logger + +from cryptofeed.symbols import Symbol + +_spawn_kwargs = { + 'infect_asyncio': True, +} + +log = get_logger(__name__) + + +_url = 'https://www.deribit.com' + + +# Broker specific ohlc schema (rest) +_ohlc_dtype = [ + ('index', int), + ('time', int), + ('open', float), + ('high', float), + ('low', float), + ('close', float), + ('volume', float), + ('bar_wap', float), # will be zeroed by sampler if not filled +] + + +class JSONRPCResult(BaseModel): + jsonrpc: str = '2.0' + result: dict + usIn: int + usOut: int + usDiff: int + testnet: bool + + +class KLinesResult(BaseModel): + close: List[float] + cost: List[float] + high: List[float] + low: List[float] + open: List[float] + status: str + ticks: List[int] + volume: List[float] + + +class KLines(JSONRPCResult): + result: KLinesResult + + +class Trade(BaseModel): + trade_seq: int + trade_id: str + timestamp: int + tick_direction: int + price: float + mark_price: float + iv: float + instrument_name: str + index_price: float + direction: str + amount: float + +class LastTradesResult(BaseModel): + trades: List[Trade] + has_more: bool + +class LastTrades(JSONRPCResult): + result: LastTradesResult + + +# convert datetime obj timestamp to unixtime in milliseconds +def deribit_timestamp(when): + return int((when.timestamp() * 1000) + (when.microsecond / 1000)) + + +class Client: + + def __init__(self) -> None: + self._sesh = asks.Session(connections=4) + self._sesh.base_location = _url + self._pairs: dict[str, Any] = {} + + async def _api( + self, + method: str, + params: dict, + ) -> dict[str, Any]: + resp = await self._sesh.get( + path=f'/api/v2/public/{method}', + params=params, + timeout=float('inf') + ) + return resproc(resp, log) + + async def symbol_info( + self, + instrument: Optional[str] = None, + currency: str = 'btc', # BTC, ETH, SOL, USDC + kind: str = 'option', + expired: bool = False + ) -> dict[str, Any]: + '''Get symbol info for the exchange. + + ''' + # TODO: we can load from our self._pairs cache + # on repeat calls... + + # will retrieve all symbols by default + params = { + 'currency': currency.upper(), + 'kind': kind, + 'expired': str(expired).lower() + } + + resp = await self._api( + 'get_instruments', params=params) + + results = resp['result'] + + instruments = { + item['instrument_name']: item for item in results} + + if instrument is not None: + return instruments[instrument] + else: + return instruments + + async def cache_symbols( + self, + ) -> dict: + if not self._pairs: + self._pairs = await self.symbol_info() + + return self._pairs + + async def search_symbols( + self, + pattern: str, + limit: int = None, + ) -> dict[str, Any]: + if self._pairs is not None: + data = self._pairs + else: + data = await self.symbol_info() + + matches = fuzzy.extractBests( + pattern, + data, + score_cutoff=50, + ) + # repack in dict form + return {item[0]['instrument_name']: item[0] + for item in matches} + + async def bars( + self, + symbol: str, + start_dt: Optional[datetime] = None, + end_dt: Optional[datetime] = None, + limit: int = 1000, + as_np: bool = True, + ) -> dict: + instrument = symbol + + if end_dt is None: + end_dt = pendulum.now('UTC') + + if start_dt is None: + start_dt = end_dt.start_of( + 'minute').subtract(minutes=limit) + + start_time = deribit_timestamp(start_dt) + end_time = deribit_timestamp(end_dt) + + # https://docs.deribit.com/#public-get_tradingview_chart_data + response = await self._api( + 'get_tradingview_chart_data', + params={ + 'instrument_name': instrument.upper(), + 'start_timestamp': start_time, + 'end_timestamp': end_time, + 'resolution': '1' + } + ) + + klines = KLines(**response) + + result = klines.result + new_bars = [] + for i in range(len(result.close)): + + _open = result.open[i] + high = result.high[i] + low = result.low[i] + close = result.close[i] + volume = result.volume[i] + + row = [ + (start_time + (i * (60 * 1000))) / 1000.0, # time + result.open[i], + result.high[i], + result.low[i], + result.close[i], + result.volume[i], + 0 + ] + + new_bars.append((i,) + tuple(row)) + + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else klines + return array + + async def last_trades( + self, + instrument: str, + count: int = 10 + ): + response = await self._api( + 'get_last_trades_by_instrument', + params={ + 'instrument_name': instrument, + 'count': count + } + ) + + return LastTrades(**response) + + +@acm +async def get_client() -> Client: + client = Client() + await client.cache_symbols() + yield client diff --git a/piker/brokers/deribit.py b/piker/brokers/deribit/feed.py similarity index 59% rename from piker/brokers/deribit.py rename to piker/brokers/deribit/feed.py index b787d2b5..47c742b3 100644 --- a/piker/brokers/deribit.py +++ b/piker/brokers/deribit/feed.py @@ -1,6 +1,3 @@ -# piker: trading gear for hackers -# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) - # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by # the Free Software Foundation, either version 3 of the License, or @@ -14,51 +11,44 @@ # You should have received a copy of the GNU Affero General Public License # along with this program. If not, see . -""" -Deribit backend +''' +Deribit backend. + +''' -""" import asyncio from async_generator import aclosing from contextlib import asynccontextmanager as acm from datetime import datetime -from typing import ( - Any, Union, Optional, List, - AsyncGenerator, Callable, -) +from typing import Any, Optional, List, Callable import time import trio from trio_typing import TaskStatus import pendulum -import asks from fuzzywuzzy import process as fuzzy import numpy as np import tractor from tractor import to_asyncio -from pydantic.dataclasses import dataclass -from pydantic import BaseModel -import wsproto - -from .. import config -from .._cacheables import open_cached_client -from ._util import resproc, SymbolNotFound -from ..log import get_logger, get_console_log -from ..data import ShmArray -from ..data._web_bs import open_autorecon_ws, NoBsWs +from piker import config +from piker._cacheables import open_cached_client +from piker.log import get_logger, get_console_log +from piker.data import ShmArray +from piker.brokers._util import ( + BrokerError, + DataUnavailable, +) from cryptofeed import FeedHandler -from cryptofeed.callback import ( - L1BookCallback, - TradeCallback -) from cryptofeed.defines import ( DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT ) from cryptofeed.symbols import Symbol +from .api import Client + _spawn_kwargs = { 'infect_asyncio': True, } @@ -87,69 +77,6 @@ log = get_logger(__name__) _url = 'https://www.deribit.com' -# Broker specific ohlc schema (rest) -_ohlc_dtype = [ - ('index', int), - ('time', int), - ('open', float), - ('high', float), - ('low', float), - ('close', float), - ('volume', float), - # ('bar_wap', float), # will be zeroed by sampler if not filled -] - - -class JSONRPCResult(BaseModel): - jsonrpc: str = '2.0' - result: dict - usIn: int - usOut: int - usDiff: int - testnet: bool - - -class KLinesResult(BaseModel): - close: List[float] - cost: List[float] - high: List[float] - low: List[float] - open: List[float] - status: str - ticks: List[int] - volume: List[float] - - -class KLines(JSONRPCResult): - result: KLinesResult - - -class Trade(BaseModel): - trade_seq: int - trade_id: str - timestamp: int - tick_direction: int - price: float - mark_price: float - iv: float - instrument_name: str - index_price: float - direction: str - amount: float - -class LastTradesResult(BaseModel): - trades: List[Trade] - has_more: bool - -class LastTrades(JSONRPCResult): - result: LastTradesResult - - -# convert datetime obj timestamp to unixtime in milliseconds -def deribit_timestamp(when): - return int((when.timestamp() * 1000) + (when.microsecond / 1000)) - - def str_to_cb_sym(name: str) -> Symbol: base, strike_price, expiry_date, option_type = name.split('-') @@ -212,165 +139,6 @@ def cb_sym_to_deribit_inst(sym: Symbol): return f'{sym.base}-{day}{month}{year}-{sym.strike_price}-{otype}' -class Client: - - def __init__(self) -> None: - self._sesh = asks.Session(connections=4) - self._sesh.base_location = _url - self._pairs: dict[str, Any] = {} - - async def _api( - self, - method: str, - params: dict, - ) -> dict[str, Any]: - resp = await self._sesh.get( - path=f'/api/v2/public/{method}', - params=params, - timeout=float('inf') - ) - return resproc(resp, log) - - async def symbol_info( - self, - instrument: Optional[str] = None, - currency: str = 'btc', # BTC, ETH, SOL, USDC - kind: str = 'option', - expired: bool = False - ) -> dict[str, Any]: - '''Get symbol info for the exchange. - - ''' - # TODO: we can load from our self._pairs cache - # on repeat calls... - - # will retrieve all symbols by default - params = { - 'currency': currency.upper(), - 'kind': kind, - 'expired': str(expired).lower() - } - - resp = await self._api( - 'get_instruments', params=params) - - results = resp['result'] - - instruments = { - item['instrument_name']: item for item in results} - - if instrument is not None: - return instruments[instrument] - else: - return instruments - - async def cache_symbols( - self, - ) -> dict: - if not self._pairs: - self._pairs = await self.symbol_info() - - return self._pairs - - async def search_symbols( - self, - pattern: str, - limit: int = None, - ) -> dict[str, Any]: - if self._pairs is not None: - data = self._pairs - else: - data = await self.symbol_info() - - matches = fuzzy.extractBests( - pattern, - data, - score_cutoff=50, - ) - # repack in dict form - return {item[0]['instrument_name']: item[0] - for item in matches} - - async def bars( - self, - symbol: str, - start_dt: Optional[datetime] = None, - end_dt: Optional[datetime] = None, - limit: int = 1000, - as_np: bool = True, - ) -> dict: - instrument = symbol - - if end_dt is None: - end_dt = pendulum.now('UTC') - - if start_dt is None: - start_dt = end_dt.start_of( - 'minute').subtract(minutes=limit) - - start_time = deribit_timestamp(start_dt) - end_time = deribit_timestamp(end_dt) - - # https://docs.deribit.com/#public-get_tradingview_chart_data - response = await self._api( - 'get_tradingview_chart_data', - params={ - 'instrument_name': instrument.upper(), - 'start_timestamp': start_time, - 'end_timestamp': end_time, - 'resolution': '1' - } - ) - - klines = KLines(**response) - - result = klines.result - new_bars = [] - for i in range(len(result.close)): - - _open = result.open[i] - high = result.high[i] - low = result.low[i] - close = result.close[i] - volume = result.volume[i] - - row = [ - (start_time + (i * (60 * 1000))) / 1000.0, # time - result.open[i], - result.high[i], - result.low[i], - result.close[i], - result.volume[i] - ] - - new_bars.append((i,) + tuple(row)) - - array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else klines - return array - - async def last_trades( - self, - instrument: str, - count: int = 10 - ): - response = await self._api( - 'get_last_trades_by_instrument', - params={ - 'instrument_name': instrument, - 'count': count - } - ) - - return LastTrades(**response) - - -@acm -async def get_client() -> Client: - client = Client() - await client.cache_symbols() - yield client - - # inside here we are in an asyncio context async def open_aio_cryptofeed_relay( from_trio: asyncio.Queue, @@ -464,8 +232,12 @@ async def open_history_client( start_dt=start_dt, end_dt=end_dt, ) + if len(array) == 0: + raise DataUnavailable + start_dt = pendulum.from_timestamp(array[0]['time']) end_dt = pendulum.from_timestamp(array[-1]['time']) + return array, start_dt, end_dt yield get_ohlc, {'erlangs': 3, 'rate': 3} @@ -514,7 +286,8 @@ async def stream_quotes( # and that history has been written sym: { 'symbol_info': { - 'asset_type': 'option' + 'asset_type': 'option', + 'price_tick_size': 0.0005 }, 'shm_write_opts': {'sum_tick_vml': False}, 'fqsn': sym, @@ -568,7 +341,7 @@ async def open_symbol_search( matches = fuzzy.extractBests( pattern, cache, - score_cutoff=50, + score_cutoff=30, ) # repack in dict form await stream.send(