From 4c838474bebd2949bf73fa71e4109bbb3953c88c Mon Sep 17 00:00:00 2001 From: Tyler Goodlet Date: Fri, 3 Mar 2023 18:32:24 -0500 Subject: [PATCH] `flake8` linter cleanup and comment out order ctl draft code --- piker/brokers/deribit/api.py | 229 ++++++++++++++++++---------------- piker/brokers/deribit/feed.py | 24 ++-- 2 files changed, 128 insertions(+), 125 deletions(-) diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index f93a20e5..62b4b788 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -1,5 +1,5 @@ # piker: trading gear for hackers -# Copyright (C) Guillermo Rodriguez (in stewardship for piker0) +# Copyright (C) Guillermo Rodriguez (in stewardship for pikers) # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU Affero General Public License as published by @@ -18,49 +18,48 @@ Deribit backend. ''' -import json +from __future__ import annotations import time import asyncio -from contextlib import asynccontextmanager as acm, AsyncExitStack +from contextlib import asynccontextmanager as acm from functools import partial from datetime import datetime -from typing import Any, Optional, Iterable, Callable +from typing import ( + Any, + Optional, + Callable, +) +from cryptofeed import FeedHandler +from cryptofeed.defines import ( + DERIBIT, + L1_BOOK, + TRADES, + OPTION, + CALL, + PUT, +) import pendulum -import asks import trio -from trio_typing import Nursery, TaskStatus +from trio_typing import TaskStatus from fuzzywuzzy import process as fuzzy import numpy as np +from tractor.trionics import ( + broadcast_receiver, + maybe_open_context +) +from tractor import to_asyncio +from cryptofeed.symbols import Symbol from piker.data.types import Struct from piker.data._web_bs import ( - NoBsWs, - open_autorecon_ws, open_jsonrpc_session ) -from .._util import resproc - from piker import config from piker.log import get_logger -from tractor.trionics import ( - broadcast_receiver, - BroadcastReceiver, - maybe_open_context -) -from tractor import to_asyncio - -from cryptofeed import FeedHandler - -from cryptofeed.defines import ( - DERIBIT, - L1_BOOK, TRADES, - OPTION, CALL, PUT -) -from cryptofeed.symbols import Symbol log = get_logger(__name__) @@ -189,8 +188,13 @@ def cb_sym_to_deribit_inst(sym: Symbol): # cryptofeed normalized cb_norm = ['F', 'G', 'H', 'J', 'K', 'M', 'N', 'Q', 'U', 'V', 'X', 'Z'] - # deribit specific - months = ['JAN', 'FEB', 'MAR', 'APR', 'MAY', 'JUN', 'JUL', 'AUG', 'SEP', 'OCT', 'NOV', 'DEC'] + # deribit specific + months = [ + 'JAN', 'FEB', 'MAR', + 'APR', 'MAY', 'JUN', + 'JUL', 'AUG', 'SEP', + 'OCT', 'NOV', 'DEC', + ] exp = sym.expiry_date @@ -210,14 +214,15 @@ def get_config() -> dict[str, Any]: section = conf.get('deribit') - # TODO: document why we send this, basically because logging params for cryptofeed + # TODO: document why we send this, basically because logging params + # for cryptofeed conf['log'] = {} conf['log']['disabled'] = True if section is None: log.warning(f'No config section found for deribit in {path}') - return conf + return conf class Client: @@ -364,6 +369,7 @@ class Client: end_dt: Optional[datetime] = None, limit: int = 1000, as_np: bool = True, + ) -> dict: instrument = symbol @@ -389,14 +395,8 @@ class Client: result = KLinesResult(**resp.result) new_bars = [] + for i in range(len(result.close)): - - _open = result.open[i] - high = result.high[i] - low = result.low[i] - close = result.close[i] - volume = result.volume[i] - row = [ (start_time + (i * (60 * 1000))) / 1000.0, # time result.open[i], @@ -409,7 +409,7 @@ class Client: new_bars.append((i,) + tuple(row)) - array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else klines + array = np.array(new_bars, dtype=_ohlc_dtype) if as_np else new_bars return array async def last_trades( @@ -463,7 +463,7 @@ async def get_client( if time.time() - _expiry_time < renew_time: # if we are close to token expiry time - if _refresh_token != None: + if _refresh_token is not None: # if we have a refresh token already dont need to send # secret params = { @@ -473,7 +473,8 @@ async def get_client( } else: - # we don't have refresh token, send secret to initialize + # we don't have refresh token, send secret to + # initialize params = { 'grant_type': 'client_credentials', 'client_id': client._key_id, @@ -541,20 +542,30 @@ async def aio_price_feed_relay( })) async def _l1(data: dict, receipt_timestamp): - to_trio.send_nowait(('l1', { - 'symbol': cb_sym_to_deribit_inst( - str_to_cb_sym(data.symbol)).lower(), - 'ticks': [ - {'type': 'bid', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'bsize', - 'price': float(data.bid_price), 'size': float(data.bid_size)}, - {'type': 'ask', - 'price': float(data.ask_price), 'size': float(data.ask_size)}, - {'type': 'asize', - 'price': float(data.ask_price), 'size': float(data.ask_size)} - ] - })) + to_trio.send_nowait( + ('l1', { + 'symbol': cb_sym_to_deribit_inst( + str_to_cb_sym(data.symbol)).lower(), + 'ticks': [ + + {'type': 'bid', + 'price': float(data.bid_price), + 'size': float(data.bid_size)}, + + {'type': 'bsize', + 'price': float(data.bid_price), + 'size': float(data.bid_size)}, + + {'type': 'ask', + 'price': float(data.ask_price), + 'size': float(data.ask_size)}, + + {'type': 'asize', + 'price': float(data.ask_price), + 'size': float(data.ask_size)} + ] + }) + ) fh.add_feed( DERIBIT, @@ -610,69 +621,71 @@ async def maybe_open_price_feed( yield feed +# TODO: order broker support: this is all draft code from @guilledk B) -async def aio_order_feed_relay( - fh: FeedHandler, - instrument: Symbol, - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, -) -> None: - async def _fill(data: dict, receipt_timestamp): - breakpoint() +# async def aio_order_feed_relay( +# fh: FeedHandler, +# instrument: Symbol, +# from_trio: asyncio.Queue, +# to_trio: trio.abc.SendChannel, - async def _order_info(data: dict, receipt_timestamp): - breakpoint() +# ) -> None: +# async def _fill(data: dict, receipt_timestamp): +# breakpoint() - fh.add_feed( - DERIBIT, - channels=[FILLS, ORDER_INFO], - symbols=[instrument.upper()], - callbacks={ - FILLS: _fill, - ORDER_INFO: _order_info, - }) +# async def _order_info(data: dict, receipt_timestamp): +# breakpoint() - if not fh.running: - fh.run( - start_loop=False, - install_signal_handlers=False) +# fh.add_feed( +# DERIBIT, +# channels=[FILLS, ORDER_INFO], +# symbols=[instrument.upper()], +# callbacks={ +# FILLS: _fill, +# ORDER_INFO: _order_info, +# }) - # sync with trio - to_trio.send_nowait(None) +# if not fh.running: +# fh.run( +# start_loop=False, +# install_signal_handlers=False) - await asyncio.sleep(float('inf')) +# # sync with trio +# to_trio.send_nowait(None) + +# await asyncio.sleep(float('inf')) -@acm -async def open_order_feed( - instrument: list[str] -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler() as fh: - async with to_asyncio.open_channel_from( - partial( - aio_order_feed_relay, - fh, - instrument - ) - ) as (first, chan): - yield chan +# @acm +# async def open_order_feed( +# instrument: list[str] +# ) -> trio.abc.ReceiveStream: +# async with maybe_open_feed_handler() as fh: +# async with to_asyncio.open_channel_from( +# partial( +# aio_order_feed_relay, +# fh, +# instrument +# ) +# ) as (first, chan): +# yield chan -@acm -async def maybe_open_order_feed( - instrument: str -) -> trio.abc.ReceiveStream: +# @acm +# async def maybe_open_order_feed( +# instrument: str +# ) -> trio.abc.ReceiveStream: - # TODO: add a predicate to maybe_open_context - async with maybe_open_context( - acm_func=open_order_feed, - kwargs={ - 'instrument': instrument, - 'fh': fh - }, - key=f'{instrument}-order', - ) as (cache_hit, feed): - if cache_hit: - yield broadcast_receiver(feed, 10) - else: - yield feed +# # TODO: add a predicate to maybe_open_context +# async with maybe_open_context( +# acm_func=open_order_feed, +# kwargs={ +# 'instrument': instrument, +# 'fh': fh +# }, +# key=f'{instrument}-order', +# ) as (cache_hit, feed): +# if cache_hit: +# yield broadcast_receiver(feed, 10) +# else: +# yield feed diff --git a/piker/brokers/deribit/feed.py b/piker/brokers/deribit/feed.py index d77a7e1b..da5211ce 100644 --- a/piker/brokers/deribit/feed.py +++ b/piker/brokers/deribit/feed.py @@ -20,36 +20,26 @@ Deribit backend. ''' from contextlib import asynccontextmanager as acm from datetime import datetime -from typing import Any, Optional, Callable -import time +from typing import ( + Callable, +) import trio from trio_typing import TaskStatus import pendulum -from fuzzywuzzy import process as fuzzy import numpy as np import tractor from piker._cacheables import open_cached_client from piker.log import get_logger, get_console_log -from piker.data import ShmArray from piker.brokers._util import ( - BrokerError, DataUnavailable, ) -from cryptofeed import FeedHandler - -from cryptofeed.defines import ( - DERIBIT, L1_BOOK, TRADES, OPTION, CALL, PUT -) -from cryptofeed.symbols import Symbol from .api import ( Client, Trade, - get_config, - str_to_cb_sym, piker_sym_to_cb_sym, cb_sym_to_deribit_inst, maybe_open_price_feed @@ -73,8 +63,8 @@ async def open_history_client( async def get_ohlc( timeframe: float, - end_dt: Optional[datetime] = None, - start_dt: Optional[datetime] = None, + end_dt: datetime | None = None, + start_dt: datetime | None = None, ) -> tuple[ np.ndarray, @@ -139,7 +129,7 @@ async def stream_quotes( async with maybe_open_price_feed(sym) as stream: - cache = await client.cache_symbols() + await client.cache_symbols() last_trades = (await client.last_trades( cb_sym_to_deribit_inst(nsym), count=1)).trades @@ -181,7 +171,7 @@ async def open_symbol_search( async with open_cached_client('deribit') as client: # load all symbols locally for fast search - cache = await client.cache_symbols() + await client.cache_symbols() await ctx.started() async with ctx.open_stream() as stream: