diff --git a/piker/brokers/kraken/__init__.py b/piker/brokers/kraken/__init__.py index 783406a4..1f5bc876 100644 --- a/piker/brokers/kraken/__init__.py +++ b/piker/brokers/kraken/__init__.py @@ -19,23 +19,26 @@ Kraken backend. Sub-modules within break into the core functionalities: -- ``broker.py`` part for orders / trading endpoints -- ``feed.py`` for real-time data feed endpoints -- ``api.py`` for the core API machinery which is ``trio``-ized - wrapping around ``ib_insync``. +- .api: for the core API machinery which generally + a ``asks``/``trio-websocket`` implemented ``Client``. +- .broker: part for orders / trading endpoints. +- .feed: for real-time and historical data query endpoints. +- .ledger: for transaction processing as it pertains to accounting. +- .symbols: for market (name) search and symbology meta-defs. ''' -from .symbols import Pair # for symcache +from .symbols import ( + Pair, # for symcache + open_symbol_search, + # required by `.accounting`, `.data` + get_mkt_info, +) # required by `.brokers` from .api import ( get_client, ) from .feed import ( - # required by `.accounting`, `.data` - get_mkt_info, - # required by `.data` - open_symbol_search, stream_quotes, open_history_client, ) @@ -66,6 +69,7 @@ __all__ = [ # tractor RPC enable arg __enable_modules__: list[str] = [ 'api', - 'feed', 'broker', + 'feed', + 'symbols', ] diff --git a/piker/brokers/kraken/feed.py b/piker/brokers/kraken/feed.py index 1d10ad8c..63072fd0 100644 --- a/piker/brokers/kraken/feed.py +++ b/piker/brokers/kraken/feed.py @@ -30,24 +30,16 @@ from typing import ( ) import time -from fuzzywuzzy import process as fuzzy import numpy as np import pendulum from trio_typing import TaskStatus -import tractor import trio from piker.accounting._mktinfo import ( - Asset, MktPair, - unpack_fqme, ) from piker.brokers import ( open_cached_client, - SymbolNotFound, -) -from piker._cacheables import ( - async_lifo_cache, ) from piker.brokers._util import ( BrokerError, @@ -59,9 +51,8 @@ from piker.data.validate import FeedInit from piker.data._web_bs import open_autorecon_ws, NoBsWs from .api import ( log, - Client, - Pair, ) +from .symbols import get_mkt_info class OHLC(Struct, frozen=True): @@ -267,70 +258,6 @@ async def open_history_client( yield get_ohlc, {'erlangs': 1, 'rate': 1} -@async_lifo_cache() -async def get_mkt_info( - fqme: str, - -) -> tuple[MktPair, Pair]: - ''' - Query for and return a `MktPair` and backend-native `Pair` (or - wtv else) info. - - If more then one fqme is provided return a ``dict`` of native - key-strs to `MktPair`s. - - ''' - venue: str = 'spot' - expiry: str = '' - if '.kraken' not in fqme: - fqme += '.kraken' - - broker, pair, venue, expiry = unpack_fqme(fqme) - venue: str = venue or 'spot' - - if venue.lower() != 'spot': - raise SymbolNotFound( - 'kraken only supports spot markets right now!\n' - f'{fqme}\n' - ) - - async with open_cached_client('kraken') as client: - - # uppercase since kraken bs_mktid is always upper - # bs_fqme, _, broker = fqme.partition('.') - # pair_str: str = bs_fqme.upper() - pair_str: str = f'{pair}.{venue}' - - pair: Pair | None = client._pairs.get(pair_str.upper()) - if not pair: - bs_fqme: str = Client.to_bs_fqme(pair_str) - pair: Pair = client._pairs[bs_fqme] - - if not (assets := client._assets): - assets: dict[str, Asset] = await client.get_assets() - - dst_asset: Asset = assets[pair.bs_dst_asset] - src_asset: Asset = assets[pair.bs_src_asset] - - mkt = MktPair( - dst=dst_asset, - src=src_asset, - - price_tick=pair.price_tick, - size_tick=pair.size_tick, - bs_mktid=pair.bs_mktid, - - expiry=expiry, - venue=venue or 'spot', - - # TODO: futes - # _atype=_atype, - - broker='kraken', - ) - return mkt, pair - - async def stream_quotes( send_chan: trio.abc.SendChannel, @@ -486,30 +413,3 @@ async def stream_quotes( log.warning(f'Unknown WSS message: {typ}, {quote}') await send_chan.send({topic: quote}) - - -@tractor.context -async def open_symbol_search( - ctx: tractor.Context, - -) -> Client: - async with open_cached_client('kraken') as client: - - # load all symbols locally for fast search - cache = await client.get_mkt_pairs() - await ctx.started(cache) - - async with ctx.open_stream() as stream: - - async for pattern in stream: - - matches = fuzzy.extractBests( - pattern, - client._pairs, - score_cutoff=50, - ) - # repack in dict form - await stream.send({ - pair[0].altname: pair[0] - for pair in matches - }) diff --git a/piker/brokers/kraken/symbols.py b/piker/brokers/kraken/symbols.py index 43efcac2..ea2c68f4 100644 --- a/piker/brokers/kraken/symbols.py +++ b/piker/brokers/kraken/symbols.py @@ -15,15 +15,30 @@ # along with this program. If not, see . ''' -Symbology defs and deats! +Symbology defs and search. ''' from decimal import Decimal +import tractor +from fuzzywuzzy import process as fuzzy + +from piker._cacheables import ( + async_lifo_cache, +) from piker.accounting._mktinfo import ( digits_to_dec, ) +from piker.brokers import ( + open_cached_client, + SymbolNotFound, +) from piker.data.types import Struct +from piker.accounting._mktinfo import ( + Asset, + MktPair, + unpack_fqme, +) # https://www.kraken.com/features/api#get-tradable-pairs @@ -112,3 +127,89 @@ class Pair(Struct): return f'{dst}{src}.SPOT' +@tractor.context +async def open_symbol_search(ctx: tractor.Context) -> None: + async with open_cached_client('kraken') as client: + + # load all symbols locally for fast search + cache = await client.get_mkt_pairs() + await ctx.started(cache) + + async with ctx.open_stream() as stream: + + async for pattern in stream: + + matches = fuzzy.extractBests( + pattern, + client._pairs, + score_cutoff=50, + ) + # repack in dict form + await stream.send({ + pair[0].altname: pair[0] + for pair in matches + }) + + +@async_lifo_cache() +async def get_mkt_info( + fqme: str, + +) -> tuple[MktPair, Pair]: + ''' + Query for and return a `MktPair` and backend-native `Pair` (or + wtv else) info. + + If more then one fqme is provided return a ``dict`` of native + key-strs to `MktPair`s. + + ''' + venue: str = 'spot' + expiry: str = '' + if '.kraken' not in fqme: + fqme += '.kraken' + + broker, pair, venue, expiry = unpack_fqme(fqme) + venue: str = venue or 'spot' + + if venue.lower() != 'spot': + raise SymbolNotFound( + 'kraken only supports spot markets right now!\n' + f'{fqme}\n' + ) + + async with open_cached_client('kraken') as client: + + # uppercase since kraken bs_mktid is always upper + # bs_fqme, _, broker = fqme.partition('.') + # pair_str: str = bs_fqme.upper() + pair_str: str = f'{pair}.{venue}' + + pair: Pair | None = client._pairs.get(pair_str.upper()) + if not pair: + bs_fqme: str = client.to_bs_fqme(pair_str) + pair: Pair = client._pairs[bs_fqme] + + if not (assets := client._assets): + assets: dict[str, Asset] = await client.get_assets() + + dst_asset: Asset = assets[pair.bs_dst_asset] + src_asset: Asset = assets[pair.bs_src_asset] + + mkt = MktPair( + dst=dst_asset, + src=src_asset, + + price_tick=pair.price_tick, + size_tick=pair.size_tick, + bs_mktid=pair.bs_mktid, + + expiry=expiry, + venue=venue or 'spot', + + # TODO: futes + # _atype=_atype, + + broker='kraken', + ) + return mkt, pair