From 9bf6f557ed15add6ca87157f8fe1a9f08081a8c5 Mon Sep 17 00:00:00 2001 From: jaredgoldman Date: Sun, 19 Mar 2023 14:14:33 -0400 Subject: [PATCH] Label private methods accordingly, remove cryptofeeds module --- piker/brokers/kucoin.py | 14 +- piker/data/cryptofeeds.py | 316 -------------------------------------- 2 files changed, 7 insertions(+), 323 deletions(-) delete mode 100644 piker/data/cryptofeeds.py diff --git a/piker/brokers/kucoin.py b/piker/brokers/kucoin.py index 1f87e701..bfa4fc8f 100644 --- a/piker/brokers/kucoin.py +++ b/piker/brokers/kucoin.py @@ -246,7 +246,7 @@ class Client: f'Error making request for Kucoin ws token -> {res.json()["msg"]}' ) - async def get_pairs( + async def _get_pairs( self, ) -> dict[str, KucoinMktPair]: if self._pairs: @@ -265,12 +265,12 @@ class Client: ''' if not self._pairs: - self._pairs = await self.get_pairs() + self._pairs = await self._get_pairs() if normalize: - self._pairs = self.normalize_pairs(self._pairs) + self._pairs = self._normalize_pairs(self._pairs) return self._pairs - def normalize_pairs( + def _normalize_pairs( self, pairs: dict[str, KucoinMktPair] ) -> dict[str, KucoinMktPair]: """ @@ -290,7 +290,7 @@ class Client: pattern: str, limit: int = 30, ) -> dict[str, KucoinMktPair]: - data = await self.get_pairs() + data = await self._get_pairs() matches = fuzzy.extractBests(pattern, data, score_cutoff=35, limit=limit) # repack in dict form @@ -300,7 +300,7 @@ class Client: trades = await self._request("GET", f"/accounts/ledgers?currency={sym}", "v1") return trades.items - async def get_bars( + async def _get_bars( self, fqsn: str, start_dt: Optional[datetime] = None, @@ -564,7 +564,7 @@ async def open_history_client( if timeframe != 60: raise DataUnavailable("Only 1m bars are supported") - array = await client.get_bars( + array = await client._get_bars( symbol, start_dt=start_dt, end_dt=end_dt, diff --git a/piker/data/cryptofeeds.py b/piker/data/cryptofeeds.py deleted file mode 100644 index 5605993d..00000000 --- a/piker/data/cryptofeeds.py +++ /dev/null @@ -1,316 +0,0 @@ -# piker: trading gear for hackers -# Copyright (C) Jared Goldman (in stewardship for pikers) - -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as published by -# the Free Software Foundation, either version 3 of the License, or -# (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# GNU Affero General Public License for more details. - -# You should have received a copy of the GNU Affero General Public License -# along with this program. If not, see . - -""" -""" -from os import walk -from contextlib import asynccontextmanager as acm -from datetime import datetime -from types import ModuleType -from typing import Any, Literal, Optional, Callable -import time -from functools import partial - -import trio -from trio_typing import TaskStatus -from tractor.trionics import broadcast_receiver, maybe_open_context -import pendulum -from fuzzywuzzy import process as fuzzy -import numpy as np -import tractor -from tractor import to_asyncio -from cryptofeed import FeedHandler -from cryptofeed.defines import TRADES, L2_BOOK -from cryptofeed.symbols import Symbol -from cryptofeed.types import OrderBook -import asyncio - -from piker._cacheables import open_cached_client -from piker.log import get_logger, get_console_log -from piker.data import ShmArray -from piker.brokers._util import ( - BrokerError, - DataUnavailable, -) -from piker.pp import config - -_spawn_kwargs = { - "infect_asyncio": True, -} - -log = get_logger(__name__) - - -def fqsn_to_cb_sym(pair_data: Symbol) -> Symbol: - return Symbol(base=pair_data["baseCurrency"], quote=pair_data["quoteCurrency"]) - - -def fqsn_to_cf_sym(fqsn: str, pairs: dict[str, Symbol]) -> str: - pair_data = pairs[fqsn] - return pair_data["baseCurrency"] + "-" + pair_data["quoteCurrency"] - - -def pair_data_to_cf_sym(sym_data: Symbol): - return sym_data["baseCurrency"] + "-" + sym_data["quoteCurrency"] - - -def cf_sym_to_fqsn(sym: str) -> str: - return sym.lower().replace("-", "") - - -def get_config(exchange: str) -> dict[str, Any]: - conf, path = config.load() - - section = conf.get(exchange.lower()) - - # TODO: document why we send this, basically because logging params for cryptofeed - conf["log"] = {} - conf["log"]["disabled"] = True - - if section is None: - log.warning(f"No config section found for deribit in {exchange}") - - return conf - - -async def mk_stream_quotes( - exchange: str, - channels: list[str], - send_chan: trio.abc.SendChannel, - symbols: list[str], - feed_is_live: trio.Event, - loglevel: str = None, - # startup sync - task_status: TaskStatus[tuple[dict, dict]] = trio.TASK_STATUS_IGNORED, -) -> None: - # XXX: required to propagate ``tractor`` loglevel to piker logging - get_console_log(loglevel or tractor.current_actor().loglevel) - - sym = symbols[0] - - async with ( - open_cached_client(exchange.lower()) as client, - # send_chan as send_chan, - ): - pairs = await client.cache_pairs() - pair_data = pairs[sym] - - async with maybe_open_price_feed( - pair_data, - exchange, - channels, - ) as stream: - - init_msgs = { - sym: { - "symbol_info": {"asset_type": "crypto", "price_tick_size": 0.0005}, - "shm_write_opts": {"sum_tick_vml": False}, - "fqsn": sym, - }, - } - quote_msg = {"symbol": pair_data["name"], "last": 0, "ticks": []} - - task_status.started((init_msgs, quote_msg)) - feed_is_live.set() - - async for typ, quote in stream: - topic = quote["symbol"] - await send_chan.send({topic: quote}) - log.info( - f'sending {typ} quote:\n' - f'{quote}' - ) - # try: - # finally: - # breakpoint() - - # while True: - # with trio.move_on_after(16) as cancel_scope: - - # log.warning(f'WAITING FOR MESSAGE') - # typ, quote = await stream.receive() - - # log.warning(f'RECEIVED MSG: {quote}') - - # topic = quote["symbol"] - # await send_chan.send({topic: quote}) - - # log.warning(f'SENT TO CHAN') - - # if cancel_scope.cancelled_caught: - # await tractor.breakpoint() - -@acm -async def maybe_open_price_feed( - pair_data: Symbol, - exchange: str, - channels, - -) -> trio.abc.ReceiveStream: - # TODO: add a predicate to maybe_open_context - # TODO: ensure we can dynamically pass down args here - async with maybe_open_context( - acm_func=open_price_feed, - kwargs={ - "pair_data": pair_data, - "exchange": exchange, - "channels": channels, - }, - key=pair_data["name"], - ) as (cache_hit, feed): - yield feed - - -@acm -async def open_price_feed( - pair_data: Symbol, exchange, channels -) -> trio.abc.ReceiveStream: - async with maybe_open_feed_handler(exchange) as fh: - async with to_asyncio.open_channel_from( - partial( - aio_price_feed_relay, - pair_data, - exchange, - channels, - fh, - ) - ) as (first, chan): - yield chan - - -@acm -async def maybe_open_feed_handler(exchange: str) -> trio.abc.ReceiveStream: - async with maybe_open_context( - acm_func=open_feed_handler, - kwargs={ - "exchange": exchange, - }, - key="feedhandler", - ) as (cache_hit, fh): - yield fh - - -@acm -async def open_feed_handler(exchange: str): - fh = FeedHandler(config=get_config(exchange)) - yield fh - await to_asyncio.run_task(fh.stop_async) - - -async def aio_price_feed_relay( - pair_data: Symbol, - exchange: str, - channels: list[str], - fh: FeedHandler, - - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, - -) -> None: - - # sync with trio - to_trio.send_nowait(None) - - async def _trade(data: dict, receipt_timestamp): - data = data.to_dict() - message = ( - "trade", - { - "symbol": cf_sym_to_fqsn(data['symbol']), - "last": float(data['price']), - "broker_ts": time.time(), - "ticks": [{ - 'type': 'trade', - 'price': float(data['price']), - 'size': float(data['amount']), - 'broker_ts': receipt_timestamp - }], - }, - ) - try: - to_trio.send_nowait(message) - await asyncio.sleep(0.001) - except trio.WouldBlock as e: - log.exception( - 'l1: OVERRUN ASYNCIO -> TRIO\n' - f'TO_TRIO.stats -> {to_trio.statistics()}' - - ) - await asyncio.sleep(0) - - async def _l1( - data: dict, - receipt_timestamp: str | None, - ) -> None: - log.info(f'RECV L1 {receipt_timestamp}') - - bid = data.book.to_dict()['bid'] - ask = data.book.to_dict()['ask'] - l1_ask_price, l1_ask_size = next(iter(ask.items())) - l1_bid_price, l1_bid_size = next(iter(bid.items())) - message = ( - "l1", - { - "symbol": cf_sym_to_fqsn(data.symbol), - "broker_ts": time.time(), - "ticks": [ - { - "type": "bid", - "price": float(l1_bid_price), - "size": float(l1_bid_size), - }, - { - "type": "bsize", - "price": float(l1_bid_price), - "size": float(l1_bid_size), - }, - { - "type": "ask", - "price": float(l1_ask_price), - "size": float(l1_ask_size), - }, - { - "type": "asize", - "price": float(l1_ask_price), - "size": float(l1_ask_size), - }, - ] - } - ) - try: - to_trio.send_nowait(message) - await asyncio.sleep(0.001) - except trio.WouldBlock as e: - log.exception( - 'l1: OVERRUN ASYNCIO -> TRIO\n' - f'TO_TRIO.stats -> {to_trio.statistics()}' - - ) - await asyncio.sleep(0) - # breakpoint() - # raise - - fh.add_feed( - exchange, - channels=channels, - symbols=[pair_data_to_cf_sym(pair_data)], - callbacks={TRADES: _trade, L2_BOOK: _l1} - ) - - if not fh.running: - fh.run(start_loop=False, install_signal_handlers=False) - - await asyncio.sleep(float("inf"))