Compare commits

..

10 Commits

Author SHA1 Message Date
Tyler Goodlet 2ced05c4d5 `polars.cumsum()` is now `.cum_sum()` 2026-01-06 16:10:36 -05:00
Tyler Goodlet e10f3a16dd Bump to (latest) `polars`, the `0.20.6x` series B)
Since I was trying out the neat lookin `polars-fuzzy-match` (also added
for now as a core dep here) which requires the new plugin sys, plus it's
about time we synced with upstream!

Adjust some column syntax to the new `.name` sub-field-space and the
`uv` lock-file to match.

Other,
- add back `trio-typing` bc i guess something else needs it (debug
  tooling stuff in new `tractor`?)
- flip back to the `tractor` pre-main pin since the new `main`-branch
  requires new `trio` stuff we haven't ported yet..
2026-01-06 16:10:36 -05:00
Tyler Goodlet 44a3385604 Just drop the merge-msg template, more trouble then it's worth XD 2026-01-06 12:35:51 -05:00
Tyler Goodlet 65320a5e0f Gitea template, wow fix it again.. 2026-01-06 12:28:30 -05:00
Gud Boi 272b74d214 Simplify gitea merge template
Mk title line same as PR, drop issues bit, keep `ReviewedOn` (since nothing else will contain the web addr..) and put the reviewers list.
2026-01-06 17:25:49 +00:00
Tyler Goodlet 4baa330e23 Ye, nm it turns out there's no ${URL} !?
Lol like wtf, how can they have this `ReviewedOn` but not just the PR's
web addr.. XD

I guess i'll just suck back the OCD and try it like this.
2026-01-06 12:22:56 -05:00
Tyler Goodlet f9514582b8 Mk title line same as PR, drop issues bit
Left in docs link for ref, hopefully that doesn't also do something
annoying in the web UI Bp
2026-01-05 14:55:34 -05:00
Gud Boi 8f24a35a5d Merge pull request 'Merge-msg template' (#54) from gitea_merge_template into main
Submitted-in: https://www.pikers.dev/pikers/piker/pulls/54
2026-01-05 18:51:52 +00:00
Tyler Goodlet cccf001aa4 Try out what gemini says will work? 2026-01-05 13:43:10 -05:00
Gud Boi 65a4fafb5d Merge pull request 'no_symcache_no_problem: be more tolerant of not-yet-implemented provider backends' (#39) from no_symcache_no_problem into main
Submitted-in: https://www.pikers.dev/pikers/piker/pulls/39
2026-01-05 16:28:59 +00:00
24 changed files with 212 additions and 425 deletions

View File

@ -465,7 +465,7 @@ def ledger_to_dfs(
df = dfs[key] = ldf.with_columns([ df = dfs[key] = ldf.with_columns([
pl.cumsum('size').alias('cumsize'), pl.cum_sum('size').alias('cumsize'),
# amount of source asset "sent" (via buy txns in # amount of source asset "sent" (via buy txns in
# the market) to acquire the dst asset, PER txn. # the market) to acquire the dst asset, PER txn.
@ -480,7 +480,7 @@ def ledger_to_dfs(
]).with_columns([ ]).with_columns([
# rolling balance in src asset units # rolling balance in src asset units
(pl.col('dst_bot').cumsum() * -1).alias('src_balance'), (pl.col('dst_bot').cum_sum() * -1).alias('src_balance'),
# "position operation type" in terms of increasing the # "position operation type" in terms of increasing the
# amount in the dst asset (entering) or decreasing the # amount in the dst asset (entering) or decreasing the
@ -622,7 +622,7 @@ def ledger_to_dfs(
# cost that was included in the least-recently # cost that was included in the least-recently
# entered txn that is still part of the current CSi # entered txn that is still part of the current CSi
# set. # set.
# => we look up the cost-per-unit cumsum and apply # => we look up the cost-per-unit cum_sum and apply
# if over the current txn size (by multiplication) # if over the current txn size (by multiplication)
# and then reverse that previusly applied cost on # and then reverse that previusly applied cost on
# the txn_cost for this record. # the txn_cost for this record.

View File

@ -98,14 +98,13 @@ async def open_cached_client(
If one has not been setup do it and cache it. If one has not been setup do it and cache it.
''' '''
brokermod: ModuleType = get_brokermod(brokername) brokermod = get_brokermod(brokername)
# TODO: make abstract or `typing.Protocol`
# client: Client
async with maybe_open_context( async with maybe_open_context(
acm_func=brokermod.get_client, acm_func=brokermod.get_client,
kwargs=kwargs, kwargs=kwargs,
) as (cache_hit, client): ) as (cache_hit, client):
if cache_hit: if cache_hit:
log.runtime(f'Reusing existing {client}') log.runtime(f'Reusing existing {client}')

View File

@ -374,14 +374,9 @@ class Client:
pair: Pair = pair_type(**item) pair: Pair = pair_type(**item)
except Exception as e: except Exception as e:
e.add_note( e.add_note(
f'\n' "\nDon't panic, prolly stupid binance changed their symbology schema again..\n"
f'New or removed field we need to codify!\n' 'Check out their API docs here:\n\n'
f'pair-type: {pair_type!r}\n' 'https://binance-docs.github.io/apidocs/spot/en/#exchange-information'
f'\n'
f"Don't panic, prolly stupid binance changed their symbology schema again..\n"
f'Check out their API docs here:\n'
f'\n'
f'https://binance-docs.github.io/apidocs/spot/en/#exchange-information\n'
) )
raise raise
pair_table[pair.symbol.upper()] = pair pair_table[pair.symbol.upper()] = pair

View File

@ -97,13 +97,6 @@ class Pair(Struct, frozen=True, kw_only=True):
baseAsset: str baseAsset: str
baseAssetPrecision: int baseAssetPrecision: int
permissionSets: list[list[str]]
# https://developers.binance.com/docs/binance-spot-api-docs#2025-08-26
# will become non-optional 2025-08-28?
# https://developers.binance.com/docs/binance-spot-api-docs#future-changes
pegInstructionsAllowed: bool|None = None
filters: dict[ filters: dict[
str, str,
str | int | float, str | int | float,
@ -149,11 +142,7 @@ class SpotPair(Pair, frozen=True):
defaultSelfTradePreventionMode: str defaultSelfTradePreventionMode: str
allowedSelfTradePreventionModes: list[str] allowedSelfTradePreventionModes: list[str]
permissions: list[str] permissions: list[str]
permissionSets: list[list[str]]
# can the paint botz creat liq gaps even easier on this asset?
# Bp
# https://developers.binance.com/docs/binance-spot-api-docs/faqs/order_amend_keep_priority
amendAllowed: bool
# NOTE: see `.data._symcache.SymbologyCache.load()` for why # NOTE: see `.data._symcache.SymbologyCache.load()` for why
ns_path: str = 'piker.brokers.binance:SpotPair' ns_path: str = 'piker.brokers.binance:SpotPair'

View File

@ -471,15 +471,11 @@ def search(
''' '''
# global opts # global opts
brokermods: list[ModuleType] = list(config['brokermods'].values()) brokermods = list(config['brokermods'].values())
# TODO: this is coming from the `search --pdb` NOT from
# the `piker --pdb` XD ..
# -[ ] pull from the parent click ctx's values..dumdum
# assert pdb
# define tractor entrypoint # define tractor entrypoint
async def main(func): async def main(func):
async with maybe_open_pikerd( async with maybe_open_pikerd(
loglevel=config['loglevel'], loglevel=config['loglevel'],
debug_mode=pdb, debug_mode=pdb,

View File

@ -22,9 +22,7 @@ routines should be primitive data types where possible.
""" """
import inspect import inspect
from types import ModuleType from types import ModuleType
from typing import ( from typing import List, Dict, Any, Optional
Any,
)
import trio import trio
@ -36,10 +34,8 @@ from ..accounting import MktPair
async def api(brokername: str, methname: str, **kwargs) -> dict: async def api(brokername: str, methname: str, **kwargs) -> dict:
''' """Make (proxy through) a broker API call by name and return its result.
Make (proxy through) a broker API call by name and return its result. """
'''
brokermod = get_brokermod(brokername) brokermod = get_brokermod(brokername)
async with brokermod.get_client() as client: async with brokermod.get_client() as client:
meth = getattr(client, methname, None) meth = getattr(client, methname, None)
@ -66,14 +62,10 @@ async def api(brokername: str, methname: str, **kwargs) -> dict:
async def stocks_quote( async def stocks_quote(
brokermod: ModuleType, brokermod: ModuleType,
tickers: list[str] tickers: List[str]
) -> Dict[str, Dict[str, Any]]:
) -> dict[str, dict[str, Any]]: """Return quotes dict for ``tickers``.
''' """
Return a `dict` of snapshot quotes for the provided input
`tickers`: a `list` of fqmes.
'''
async with brokermod.get_client() as client: async with brokermod.get_client() as client:
return await client.quote(tickers) return await client.quote(tickers)
@ -82,15 +74,13 @@ async def stocks_quote(
async def option_chain( async def option_chain(
brokermod: ModuleType, brokermod: ModuleType,
symbol: str, symbol: str,
date: str|None = None, date: Optional[str] = None,
) -> dict[str, dict[str, dict[str, Any]]]: ) -> Dict[str, Dict[str, Dict[str, Any]]]:
''' """Return option chain for ``symbol`` for ``date``.
Return option chain for ``symbol`` for ``date``.
By default all expiries are returned. If ``date`` is provided By default all expiries are returned. If ``date`` is provided
then contract quotes for that single expiry are returned. then contract quotes for that single expiry are returned.
"""
'''
async with brokermod.get_client() as client: async with brokermod.get_client() as client:
if date: if date:
id = int((await client.tickers2ids([symbol]))[symbol]) id = int((await client.tickers2ids([symbol]))[symbol])
@ -108,7 +98,7 @@ async def option_chain(
# async def contracts( # async def contracts(
# brokermod: ModuleType, # brokermod: ModuleType,
# symbol: str, # symbol: str,
# ) -> dict[str, dict[str, dict[str, Any]]]: # ) -> Dict[str, Dict[str, Dict[str, Any]]]:
# """Return option contracts (all expiries) for ``symbol``. # """Return option contracts (all expiries) for ``symbol``.
# """ # """
# async with brokermod.get_client() as client: # async with brokermod.get_client() as client:
@ -120,24 +110,15 @@ async def bars(
brokermod: ModuleType, brokermod: ModuleType,
symbol: str, symbol: str,
**kwargs, **kwargs,
) -> dict[str, dict[str, dict[str, Any]]]: ) -> Dict[str, Dict[str, Dict[str, Any]]]:
''' """Return option contracts (all expiries) for ``symbol``.
Return option contracts (all expiries) for ``symbol``. """
'''
async with brokermod.get_client() as client: async with brokermod.get_client() as client:
return await client.bars(symbol, **kwargs) return await client.bars(symbol, **kwargs)
async def search_w_brokerd( async def search_w_brokerd(name: str, pattern: str) -> dict:
name: str,
pattern: str,
) -> dict:
# TODO: WHY NOT WORK!?!
# when we `step` through the next block?
# import tractor
# await tractor.pause()
async with open_cached_client(name) as client: async with open_cached_client(name) as client:
# TODO: support multiple asset type concurrent searches. # TODO: support multiple asset type concurrent searches.
@ -149,12 +130,12 @@ async def symbol_search(
pattern: str, pattern: str,
**kwargs, **kwargs,
) -> dict[str, dict[str, dict[str, Any]]]: ) -> Dict[str, Dict[str, Dict[str, Any]]]:
''' '''
Return symbol info from broker. Return symbol info from broker.
''' '''
results: list[str] = [] results = []
async def search_backend( async def search_backend(
brokermod: ModuleType brokermod: ModuleType
@ -162,13 +143,6 @@ async def symbol_search(
brokername: str = mod.name brokername: str = mod.name
# TODO: figure this the FUCK OUT
# -> ok so obvi in the root actor any async task that's
# spawned outside the main tractor-root-actor task needs to
# call this..
# await tractor.devx._debug.maybe_init_greenback()
# tractor.pause_from_sync()
async with maybe_spawn_brokerd( async with maybe_spawn_brokerd(
mod.name, mod.name,
infect_asyncio=getattr( infect_asyncio=getattr(
@ -188,6 +162,7 @@ async def symbol_search(
)) ))
async with trio.open_nursery() as n: async with trio.open_nursery() as n:
for mod in brokermods: for mod in brokermods:
n.start_soon(search_backend, mod.name) n.start_soon(search_backend, mod.name)
@ -197,13 +172,11 @@ async def symbol_search(
async def mkt_info( async def mkt_info(
brokermod: ModuleType, brokermod: ModuleType,
fqme: str, fqme: str,
**kwargs, **kwargs,
) -> MktPair: ) -> MktPair:
''' '''
Return the `piker.accounting.MktPair` info struct from a given Return MktPair info from broker including src and dst assets.
backend broker tradable src/dst asset pair.
''' '''
async with open_cached_client(brokermod.name) as client: async with open_cached_client(brokermod.name) as client:

View File

@ -34,7 +34,6 @@ from piker.brokers._util import get_logger
if TYPE_CHECKING: if TYPE_CHECKING:
from .api import Client from .api import Client
from ib_insync import IB from ib_insync import IB
import i3ipc
log = get_logger('piker.brokers.ib') log = get_logger('piker.brokers.ib')
@ -49,37 +48,6 @@ _reset_tech: Literal[
] = 'vnc' ] = 'vnc'
no_setup_msg:str = (
'No data reset hack test setup for {vnc_sockaddr}!\n'
'See config setup tips @\n'
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
)
def try_xdo_manual(
vnc_sockaddr: str,
):
'''
Do the "manual" `xdo`-based screen switch + click
combo since apparently the `asyncvnc` client ain't workin..
Note this is only meant as a backup method for Xorg users,
ideally you can use a real vnc client and the `vnc_click_hack()`
impl!
'''
global _reset_tech
try:
i3ipc_xdotool_manual_click_hack()
_reset_tech = 'i3ipc_xdotool'
return True
except OSError:
log.exception(
no_setup_msg.format(vnc_sockaddr)
)
return False
async def data_reset_hack( async def data_reset_hack(
# vnc_host: str, # vnc_host: str,
client: Client, client: Client,
@ -122,9 +90,15 @@ async def data_reset_hack(
vnc_port: int vnc_port: int
vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs') vnc_sockaddr: tuple[str] | None = client.conf.get('vnc_addrs')
no_setup_msg:str = (
f'No data reset hack test setup for {vnc_sockaddr}!\n'
'See config setup tips @\n'
'https://github.com/pikers/piker/tree/master/piker/brokers/ib'
)
if not vnc_sockaddr: if not vnc_sockaddr:
log.warning( log.warning(
no_setup_msg.format(vnc_sockaddr) no_setup_msg
+ +
'REQUIRES A `vnc_addrs: array` ENTRY' 'REQUIRES A `vnc_addrs: array` ENTRY'
) )
@ -145,38 +119,27 @@ async def data_reset_hack(
port=vnc_port, port=vnc_port,
) )
) )
except ( except OSError:
OSError, # no VNC server avail.. if vnc_host != 'localhost':
PermissionError, # asyncvnc pw fail.. log.warning(no_setup_msg)
): return False
try: try:
import i3ipc # noqa (since a deps dynamic check) import i3ipc # noqa (since a deps dynamic check)
except ModuleNotFoundError: except ModuleNotFoundError:
log.warning( log.warning(no_setup_msg)
no_setup_msg.format(vnc_sockaddr)
)
return False return False
if vnc_host not in { try:
'localhost', i3ipc_xdotool_manual_click_hack()
'127.0.0.1', _reset_tech = 'i3ipc_xdotool'
}: return True
focussed, matches = i3ipc_fin_wins_titled() except OSError:
if not matches: log.exception(no_setup_msg)
log.warning( return False
no_setup_msg.format(vnc_sockaddr)
)
return False
else:
try_xdo_manual(vnc_sockaddr)
# localhost but no vnc-client or it borked..
else:
try_xdo_manual(vnc_sockaddr)
case 'i3ipc_xdotool': case 'i3ipc_xdotool':
try_xdo_manual(vnc_sockaddr) i3ipc_xdotool_manual_click_hack()
# i3ipc_xdotool_manual_click_hack()
case _ as tech: case _ as tech:
raise RuntimeError(f'{tech} is not supported for reset tech!?') raise RuntimeError(f'{tech} is not supported for reset tech!?')
@ -215,9 +178,9 @@ async def vnc_click_hack(
host, host,
port=port, port=port,
# TODO: doesn't work? # TODO: doesn't work see:
# see, https://github.com/barneygale/asyncvnc/issues/7 # https://github.com/barneygale/asyncvnc/issues/7
password='doggy', # password='ibcansmbz',
) as client: ) as client:
@ -231,103 +194,70 @@ async def vnc_click_hack(
client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked client.keyboard.press('Ctrl', 'Alt', key) # keys are stacked
def i3ipc_fin_wins_titled(
titles: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
# !TODO, remote vnc instance
# -[ ] something in title (or other Con-props) that indicates
# this is explicitly for ibrk sw?
# |_[ ] !can use modden spawn eventually!
'TigerVNC',
# 'vncviewer', # the terminal..
],
) -> tuple[
i3ipc.Con, # orig focussed win
list[tuple[str, i3ipc.Con]], # matching wins by title
]:
'''
Attempt to find a local-DE window titled with an entry in
`titles`.
If found deliver the current focussed window and all matching
`i3ipc.Con`s in a list.
'''
import i3ipc
ipc = i3ipc.Connection()
# TODO: might be worth offering some kinda api for grabbing
# the window id from the pid?
# https://stackoverflow.com/a/2250879
tree = ipc.get_tree()
focussed: i3ipc.Con = tree.find_focused()
matches: list[i3ipc.Con] = []
for name in titles:
results = tree.find_titled(name)
print(f'results for {name}: {results}')
if results:
con = results[0]
matches.append((
name,
con,
))
return (
focussed,
matches,
)
def i3ipc_xdotool_manual_click_hack() -> None: def i3ipc_xdotool_manual_click_hack() -> None:
''' '''
Do the data reset hack but expecting a local X-window using `xdotool`. Do the data reset hack but expecting a local X-window using `xdotool`.
''' '''
focussed, matches = i3ipc_fin_wins_titled() import i3ipc
orig_win_id = focussed.window i3 = i3ipc.Connection()
# TODO: might be worth offering some kinda api for grabbing
# the window id from the pid?
# https://stackoverflow.com/a/2250879
t = i3.get_tree()
orig_win_id = t.find_focused().window
# for tws
win_names: list[str] = [
'Interactive Brokers', # tws running in i3
'IB Gateway', # gw running in i3
# 'IB', # gw running in i3 (newer version?)
]
try: try:
for name, con in matches: for name in win_names:
print(f'Resetting data feed for {name}') results = t.find_titled(name)
win_id = str(con.window) print(f'results for {name}: {results}')
w, h = con.rect.width, con.rect.height if results:
con = results[0]
print(f'Resetting data feed for {name}')
win_id = str(con.window)
w, h = con.rect.width, con.rect.height
# TODO: seems to be a few libs for python but not sure # TODO: seems to be a few libs for python but not sure
# if they support all the sub commands we need, order of # if they support all the sub commands we need, order of
# most recent commit history: # most recent commit history:
# https://github.com/rr-/pyxdotool # https://github.com/rr-/pyxdotool
# https://github.com/ShaneHutter/pyxdotool # https://github.com/ShaneHutter/pyxdotool
# https://github.com/cphyc/pyxdotool # https://github.com/cphyc/pyxdotool
# TODO: only run the reconnect (2nd) kc on a detected # TODO: only run the reconnect (2nd) kc on a detected
# disconnect? # disconnect?
for key_combo, timeout in [ for key_combo, timeout in [
# only required if we need a connection reset. # only required if we need a connection reset.
# ('ctrl+alt+r', 12), # ('ctrl+alt+r', 12),
# data feed reset. # data feed reset.
('ctrl+alt+f', 6) ('ctrl+alt+f', 6)
]: ]:
subprocess.call([ subprocess.call([
'xdotool', 'xdotool',
'windowactivate', '--sync', win_id, 'windowactivate', '--sync', win_id,
# move mouse to bottom left of window (where # move mouse to bottom left of window (where
# there should be nothing to click). # there should be nothing to click).
'mousemove_relative', '--sync', str(w-4), str(h-4), 'mousemove_relative', '--sync', str(w-4), str(h-4),
# NOTE: we may need to stick a `--retry 3` in here.. # NOTE: we may need to stick a `--retry 3` in here..
'click', '--window', win_id, 'click', '--window', win_id,
'--repeat', '3', '1', '--repeat', '3', '1',
# hackzorzes # hackzorzes
'key', key_combo, 'key', key_combo,
], ],
timeout=timeout, timeout=timeout,
) )
# re-activate and focus original window # re-activate and focus original window
subprocess.call([ subprocess.call([

View File

@ -1241,47 +1241,32 @@ async def deliver_trade_events(
# never relay errors for non-broker related issues # never relay errors for non-broker related issues
# https://interactivebrokers.github.io/tws-api/message_codes.html # https://interactivebrokers.github.io/tws-api/message_codes.html
code: int = err['error_code'] code: int = err['error_code']
reason: str = err['reason'] if code in {
reqid: str = str(err['reqid']) 200, # uhh
# "Warning:" msg codes,
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# - 2109: 'Outside Regular Trading Hours'
if 'Warning:' in reason:
log.warning(
f'Order-API-warning: {code!r}\n'
f'reqid: {reqid!r}\n'
f'\n'
f'{pformat(err)}\n'
# ^TODO? should we just print the `reason`
# not the full `err`-dict?
)
continue
# XXX known special (ignore) cases
elif code in {
200, # uhh.. ni idea
# hist pacing / connectivity # hist pacing / connectivity
162, 162,
165, 165,
# WARNING codes:
# https://interactivebrokers.github.io/tws-api/message_codes.html#warning_codes
# Attribute 'Outside Regular Trading Hours' is
# " 'ignored based on the order type and
# destination. PlaceOrder is now ' 'being
# processed.',
2109,
# XXX: lol this isn't even documented.. # XXX: lol this isn't even documented..
# 'No market data during competing live session' # 'No market data during competing live session'
1669, 1669,
}: }:
log.error(
f'Order-API-error which is non-cancel-causing ?!\n'
f'\n'
f'{pformat(err)}\n'
)
continue continue
reqid: str = str(err['reqid'])
reason: str = err['reason']
if err['reqid'] == -1: if err['reqid'] == -1:
log.error( log.error(f'TWS external order error:\n{pformat(err)}')
f'TWS external order error ??\n'
f'{pformat(err)}\n'
)
flow: dict = dict( flow: dict = dict(
flows.get(reqid) flows.get(reqid)

View File

@ -587,7 +587,7 @@ async def get_bars(
data_cs.cancel() data_cs.cancel()
# spawn new data reset task # spawn new data reset task
data_cs, reset_done = await tn.start( data_cs, reset_done = await nurse.start(
partial( partial(
wait_on_data_reset, wait_on_data_reset,
proxy, proxy,
@ -607,11 +607,11 @@ async def get_bars(
# such that simultaneous symbol queries don't try data resettingn # such that simultaneous symbol queries don't try data resettingn
# too fast.. # too fast..
unset_resetter: bool = False unset_resetter: bool = False
async with trio.open_nursery() as tn: async with trio.open_nursery() as nurse:
# start history request that we allow # start history request that we allow
# to run indefinitely until a result is acquired # to run indefinitely until a result is acquired
tn.start_soon(query) nurse.start_soon(query)
# start history reset loop which waits up to the timeout # start history reset loop which waits up to the timeout
# for a result before triggering a data feed reset. # for a result before triggering a data feed reset.
@ -631,7 +631,7 @@ async def get_bars(
unset_resetter: bool = True unset_resetter: bool = True
# spawn new data reset task # spawn new data reset task
data_cs, reset_done = await tn.start( data_cs, reset_done = await nurse.start(
partial( partial(
wait_on_data_reset, wait_on_data_reset,
proxy, proxy,
@ -705,9 +705,7 @@ async def _setup_quote_stream(
# to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore # to_trio, from_aio = trio.open_memory_channel(2**8) # type: ignore
def teardown(): def teardown():
ticker.updateEvent.disconnect(push) ticker.updateEvent.disconnect(push)
log.error( log.error(f"Disconnected stream for `{symbol}`")
f'Disconnected stream for `{symbol}`'
)
client.ib.cancelMktData(contract) client.ib.cancelMktData(contract)
# decouple broadcast mem chan # decouple broadcast mem chan
@ -763,10 +761,7 @@ async def open_aio_quote_stream(
symbol: str, symbol: str,
contract: Contract | None = None, contract: Contract | None = None,
) -> ( ) -> trio.abc.ReceiveStream:
trio.abc.Channel| # iface
tractor.to_asyncio.LinkedTaskChannel # actually
):
from tractor.trionics import broadcast_receiver from tractor.trionics import broadcast_receiver
global _quote_streams global _quote_streams
@ -783,7 +778,6 @@ async def open_aio_quote_stream(
yield from_aio yield from_aio
return return
from_aio: tractor.to_asyncio.LinkedTaskChannel
async with tractor.to_asyncio.open_channel_from( async with tractor.to_asyncio.open_channel_from(
_setup_quote_stream, _setup_quote_stream,
symbol=symbol, symbol=symbol,
@ -989,18 +983,17 @@ async def stream_quotes(
) )
cs: trio.CancelScope | None = None cs: trio.CancelScope | None = None
startup: bool = True startup: bool = True
iter_quotes: trio.abc.Channel
while ( while (
startup startup
or cs.cancel_called or cs.cancel_called
): ):
with trio.CancelScope() as cs: with trio.CancelScope() as cs:
async with ( async with (
trio.open_nursery() as tn, trio.open_nursery() as nurse,
open_aio_quote_stream( open_aio_quote_stream(
symbol=sym, symbol=sym,
contract=con, contract=con,
) as iter_quotes, ) as stream,
): ):
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them
# (ahem, ib_insync is stateful trash) # (ahem, ib_insync is stateful trash)
@ -1028,9 +1021,9 @@ async def stream_quotes(
await rt_ev.wait() await rt_ev.wait()
cs.cancel() # cancel called should now be set cs.cancel() # cancel called should now be set
tn.start_soon(reset_on_feed) nurse.start_soon(reset_on_feed)
async with aclosing(iter_quotes): async with aclosing(stream):
# if syminfo.get('no_vlm', False): # if syminfo.get('no_vlm', False):
if not init_msg.shm_write_opts['has_vlm']: if not init_msg.shm_write_opts['has_vlm']:
@ -1045,21 +1038,19 @@ async def stream_quotes(
# wait for real volume on feed (trading might be # wait for real volume on feed (trading might be
# closed) # closed)
while True: while True:
ticker = await iter_quotes.receive() ticker = await stream.receive()
# for a real volume contract we rait for # for a real volume contract we rait for
# the first "real" trade to take place # the first "real" trade to take place
if ( if (
# not calc_price # not calc_price
# and not ticker.rtTime # and not ticker.rtTime
False not ticker.rtTime
# not ticker.rtTime
): ):
# spin consuming tickers until we # spin consuming tickers until we
# get a real market datum # get a real market datum
log.debug(f"New unsent ticker: {ticker}") log.debug(f"New unsent ticker: {ticker}")
continue continue
else: else:
log.debug("Received first volume tick") log.debug("Received first volume tick")
# ugh, clear ticks since we've # ugh, clear ticks since we've
@ -1075,18 +1066,13 @@ async def stream_quotes(
log.debug(f"First ticker received {quote}") log.debug(f"First ticker received {quote}")
# tell data-layer spawner-caller that live # tell data-layer spawner-caller that live
# quotes are now active desptie not having # quotes are now streaming.
# necessarily received a first vlm/clearing
# tick.
ticker = await iter_quotes.receive()
feed_is_live.set() feed_is_live.set()
fqme: str = quote['fqme']
await send_chan.send({fqme: quote})
# last = time.time() # last = time.time()
async for ticker in iter_quotes: async for ticker in stream:
quote = normalize(ticker) quote = normalize(ticker)
fqme: str = quote['fqme'] fqme = quote['fqme']
await send_chan.send({fqme: quote}) await send_chan.send({fqme: quote})
# ugh, clear ticks since we've consumed them # ugh, clear ticks since we've consumed them

View File

@ -34,7 +34,6 @@ import urllib.parse
import hashlib import hashlib
import hmac import hmac
import base64 import base64
import tractor
import trio import trio
from piker import config from piker import config
@ -373,7 +372,8 @@ class Client:
# 1658347714, 'status': 'Success'}]} # 1658347714, 'status': 'Success'}]}
if xfers: if xfers:
await tractor.pause() import tractor
await tractor.pp()
trans: dict[str, Transaction] = {} trans: dict[str, Transaction] = {}
for entry in xfers: for entry in xfers:
@ -501,8 +501,7 @@ class Client:
for xkey, data in resp['result'].items(): for xkey, data in resp['result'].items():
# NOTE: always cache in pairs tables for faster lookup # NOTE: always cache in pairs tables for faster lookup
with tractor.devx.maybe_open_crash_handler(): # as bxerr: pair = Pair(xname=xkey, **data)
pair = Pair(xname=xkey, **data)
# register the above `Pair` structs for all # register the above `Pair` structs for all
# key-sets/monikers: a set of 4 (frickin) tables # key-sets/monikers: a set of 4 (frickin) tables

View File

@ -175,8 +175,9 @@ async def handle_order_requests(
case { case {
'account': 'kraken.spot' as account, 'account': 'kraken.spot' as account,
'action': 'buy'|'sell', 'action': action,
}: } if action in {'buy', 'sell'}:
# validate # validate
order = BrokerdOrder(**msg) order = BrokerdOrder(**msg)
@ -261,12 +262,6 @@ async def handle_order_requests(
} | extra } | extra
log.info(f'Submitting WS order request:\n{pformat(req)}') log.info(f'Submitting WS order request:\n{pformat(req)}')
# NOTE HOWTO, debug order requests
#
# if 'XRP' in pair:
# await tractor.pause()
await ws.send_msg(req) await ws.send_msg(req)
# placehold for sanity checking in relay loop # placehold for sanity checking in relay loop
@ -549,7 +544,7 @@ async def open_trade_dialog(
# to be reloaded. # to be reloaded.
balances: dict[str, float] = await client.get_balances() balances: dict[str, float] = await client.get_balances()
await verify_balances( verify_balances(
acnt, acnt,
src_fiat, src_fiat,
balances, balances,
@ -1090,8 +1085,6 @@ async def handle_order_updates(
f'Failed to {action} order {reqid}:\n' f'Failed to {action} order {reqid}:\n'
f'{errmsg}' f'{errmsg}'
) )
# if tractor._state.debug_mode():
# await tractor.pause()
symbol: str = 'N/A' symbol: str = 'N/A'
if chain := apiflows.get(reqid): if chain := apiflows.get(reqid):

View File

@ -21,6 +21,7 @@ Symbology defs and search.
from decimal import Decimal from decimal import Decimal
import tractor import tractor
from rapidfuzz import process as fuzzy
from piker._cacheables import ( from piker._cacheables import (
async_lifo_cache, async_lifo_cache,
@ -40,13 +41,8 @@ from piker.accounting._mktinfo import (
) )
# https://www.kraken.com/features/api#get-tradable-pairs
class Pair(Struct): class Pair(Struct):
'''
A tradable asset pair as schema-defined by,
https://docs.kraken.com/api/docs/rest-api/get-tradable-asset-pairs
'''
xname: str # idiotic bs_mktid equiv i guess? xname: str # idiotic bs_mktid equiv i guess?
altname: str # alternate pair name altname: str # alternate pair name
wsname: str # WebSocket pair name (if available) wsname: str # WebSocket pair name (if available)
@ -57,6 +53,7 @@ class Pair(Struct):
lot: str # volume lot size lot: str # volume lot size
cost_decimals: int cost_decimals: int
costmin: float
pair_decimals: int # scaling decimal places for pair pair_decimals: int # scaling decimal places for pair
lot_decimals: int # scaling decimal places for volume lot_decimals: int # scaling decimal places for volume
@ -82,7 +79,6 @@ class Pair(Struct):
tick_size: float # min price step size tick_size: float # min price step size
status: str status: str
costmin: str|None = None # XXX, only some mktpairs?
short_position_limit: float = 0 short_position_limit: float = 0
long_position_limit: float = float('inf') long_position_limit: float = float('inf')

View File

@ -37,12 +37,6 @@ import tractor
from async_generator import asynccontextmanager from async_generator import asynccontextmanager
import numpy as np import numpy as np
import wrapt import wrapt
# TODO, port to `httpx`/`trio-websocket` whenver i get back to
# writing a proper ws-api streamer for this backend (since the data
# feeds are free now) as per GH feat-req:
# https://github.com/pikers/piker/issues/509
#
import asks import asks
from ..calc import humanize, percent_change from ..calc import humanize, percent_change

View File

@ -76,6 +76,7 @@ if TYPE_CHECKING:
# TODO: numba all of this # TODO: numba all of this
def mk_check( def mk_check(
trigger_price: float, trigger_price: float,
known_last: float, known_last: float,
action: str, action: str,
@ -161,7 +162,7 @@ async def clear_dark_triggers(
router: Router, router: Router,
brokerd_orders_stream: tractor.MsgStream, brokerd_orders_stream: tractor.MsgStream,
quote_stream: tractor.MsgStream, quote_stream: tractor.ReceiveMsgStream, # noqa
broker: str, broker: str,
fqme: str, fqme: str,
@ -177,7 +178,6 @@ async def clear_dark_triggers(
''' '''
# XXX: optimize this for speed! # XXX: optimize this for speed!
# TODO: # TODO:
# - port to the new ringbuf stuff in `tractor.ipc`!
# - numba all this! # - numba all this!
# - this stream may eventually contain multiple symbols # - this stream may eventually contain multiple symbols
quote_stream._raise_on_lag = False quote_stream._raise_on_lag = False
@ -1190,16 +1190,12 @@ async def process_client_order_cmds(
submitting live orders immediately if requested by the client. submitting live orders immediately if requested by the client.
''' '''
# TODO, only allow `msgspec.Struct` form! # cmd: dict
cmd: dict
async for cmd in client_order_stream: async for cmd in client_order_stream:
log.info( log.info(f'Received order cmd:\n{pformat(cmd)}')
f'Received order cmd:\n'
f'{pformat(cmd)}\n'
)
# CAWT DAMN we need struct support! # CAWT DAMN we need struct support!
oid: str = str(cmd['oid']) oid = str(cmd['oid'])
# register this stream as an active order dialog (msg flow) for # register this stream as an active order dialog (msg flow) for
# this order id such that translated message from the brokerd # this order id such that translated message from the brokerd
@ -1305,7 +1301,7 @@ async def process_client_order_cmds(
case { case {
'oid': oid, 'oid': oid,
'symbol': fqme, 'symbol': fqme,
'price': price, 'price': trigger_price,
'size': size, 'size': size,
'action': ('buy' | 'sell') as action, 'action': ('buy' | 'sell') as action,
'exec_mode': ('live' | 'paper'), 'exec_mode': ('live' | 'paper'),
@ -1337,7 +1333,7 @@ async def process_client_order_cmds(
symbol=sym, symbol=sym,
action=action, action=action,
price=price, price=trigger_price,
size=size, size=size,
account=req.account, account=req.account,
) )
@ -1359,11 +1355,7 @@ async def process_client_order_cmds(
# (``translate_and_relay_brokerd_events()`` above) will # (``translate_and_relay_brokerd_events()`` above) will
# handle relaying the ems side responses back to # handle relaying the ems side responses back to
# the client/cmd sender from this request # the client/cmd sender from this request
log.info( log.info(f'Sending live order to {broker}:\n{pformat(msg)}')
f'Sending live order to {broker}:\n'
f'{pformat(msg)}'
)
await brokerd_order_stream.send(msg) await brokerd_order_stream.send(msg)
# an immediate response should be ``BrokerdOrderAck`` # an immediate response should be ``BrokerdOrderAck``
@ -1379,7 +1371,7 @@ async def process_client_order_cmds(
case { case {
'oid': oid, 'oid': oid,
'symbol': fqme, 'symbol': fqme,
'price': price, 'price': trigger_price,
'size': size, 'size': size,
'exec_mode': exec_mode, 'exec_mode': exec_mode,
'action': action, 'action': action,
@ -1407,12 +1399,7 @@ async def process_client_order_cmds(
if isnan(last): if isnan(last):
last = flume.rt_shm.array[-1]['close'] last = flume.rt_shm.array[-1]['close']
trigger_price: float = float(price) pred = mk_check(trigger_price, last, action)
pred = mk_check(
trigger_price,
last,
action,
)
# NOTE: for dark orders currently we submit # NOTE: for dark orders currently we submit
# the triggered live order at a price 5 ticks # the triggered live order at a price 5 ticks
@ -1552,7 +1539,7 @@ async def _emsd_main(
ctx: tractor.Context, ctx: tractor.Context,
fqme: str, fqme: str,
exec_mode: str, # ('paper', 'live') exec_mode: str, # ('paper', 'live')
loglevel: str|None = None, loglevel: str | None = None,
) -> tuple[ ) -> tuple[
dict[ dict[

View File

@ -19,7 +19,6 @@ Clearing sub-system message and protocols.
""" """
from __future__ import annotations from __future__ import annotations
from decimal import Decimal
from typing import ( from typing import (
Literal, Literal,
) )
@ -72,15 +71,7 @@ class Order(Struct):
symbol: str # | MktPair symbol: str # | MktPair
account: str # should we set a default as '' ? account: str # should we set a default as '' ?
# https://docs.python.org/3/library/decimal.html#decimal-objects price: float
#
# ?TODO? decimal usage throughout?
# -[ ] possibly leverage the `Encoder(decimal_format='number')`
# bit?
# |_https://jcristharif.com/msgspec/supported-types.html#decimal
# -[ ] should we also use it for .size?
#
price: Decimal
size: float # -ve is "sell", +ve is "buy" size: float # -ve is "sell", +ve is "buy"
brokers: list[str] = [] brokers: list[str] = []
@ -187,7 +178,7 @@ class BrokerdOrder(Struct):
time_ns: int time_ns: int
symbol: str # fqme symbol: str # fqme
price: Decimal price: float
size: float size: float
# TODO: if we instead rely on a +ve/-ve size to determine # TODO: if we instead rely on a +ve/-ve size to determine

View File

@ -510,7 +510,7 @@ async def handle_order_requests(
reqid = await client.submit_limit( reqid = await client.submit_limit(
oid=order.oid, oid=order.oid,
symbol=f'{order.symbol}.{client.broker}', symbol=f'{order.symbol}.{client.broker}',
price=float(order.price), price=order.price,
action=order.action, action=order.action,
size=order.size, size=order.size,
# XXX: by default 0 tells ``ib_insync`` methods that # XXX: by default 0 tells ``ib_insync`` methods that

View File

@ -335,7 +335,7 @@ def services(config, tl, ports):
name='service_query', name='service_query',
loglevel=config['loglevel'] if tl else None, loglevel=config['loglevel'] if tl else None,
), ),
tractor.get_registry( tractor.get_arbiter(
host=host, host=host,
port=ports[0] port=ports[0]
) as portal ) as portal

View File

@ -740,7 +740,7 @@ async def sample_and_broadcast(
log.warning( log.warning(
f'Feed OVERRUN {sub_key}' f'Feed OVERRUN {sub_key}'
f'@{bus.brokername} -> \n' '@{bus.brokername} -> \n'
f'feed @ {chan.uid}\n' f'feed @ {chan.uid}\n'
f'throttle = {throttle} Hz' f'throttle = {throttle} Hz'
) )

View File

@ -786,6 +786,7 @@ async def install_brokerd_search(
@acm @acm
async def maybe_open_feed( async def maybe_open_feed(
fqmes: list[str], fqmes: list[str],
loglevel: str | None = None, loglevel: str | None = None,
@ -839,12 +840,13 @@ async def maybe_open_feed(
@acm @acm
async def open_feed( async def open_feed(
fqmes: list[str], fqmes: list[str],
loglevel: str|None = None, loglevel: str | None = None,
allow_overruns: bool = True, allow_overruns: bool = True,
start_stream: bool = True, start_stream: bool = True,
tick_throttle: float|None = None, # Hz tick_throttle: float | None = None, # Hz
allow_remote_ctl_ui: bool = False, allow_remote_ctl_ui: bool = False,

View File

@ -36,10 +36,10 @@ from ._sharedmem import (
ShmArray, ShmArray,
_Token, _Token,
) )
from piker.accounting import MktPair
if TYPE_CHECKING: if TYPE_CHECKING:
from piker.data.feed import Feed from ..accounting import MktPair
from .feed import Feed
class Flume(Struct): class Flume(Struct):
@ -82,7 +82,7 @@ class Flume(Struct):
# TODO: do we need this really if we can pull the `Portal` from # TODO: do we need this really if we can pull the `Portal` from
# ``tractor``'s internals? # ``tractor``'s internals?
feed: Feed|None = None feed: Feed | None = None
@property @property
def rt_shm(self) -> ShmArray: def rt_shm(self) -> ShmArray:

View File

@ -113,9 +113,9 @@ def validate_backend(
) )
if ep is None: if ep is None:
log.warning( log.warning(
f'Provider backend {mod.name!r} is missing ' f'Provider backend {mod.name} is missing '
f'{daemon_name!r} support?\n' f'{daemon_name} support :(\n'
f'|_module endpoint-func missing: {name!r}\n' f'The following endpoint is missing: {name}'
) )
inits: list[ inits: list[

View File

@ -269,8 +269,6 @@ def hcolor(name: str) -> str:
# default ohlc-bars/curve gray # default ohlc-bars/curve gray
'bracket': '#666666', # like the logo 'bracket': '#666666', # like the logo
'pikers': '#616161', # a trader shade of..
'beast': '#161616', # in the dark alone.
# bluish # bluish
'charcoal': '#36454F', 'charcoal': '#36454F',

View File

@ -21,7 +21,6 @@ Chart trading, the only way to scalp.
from __future__ import annotations from __future__ import annotations
from contextlib import asynccontextmanager from contextlib import asynccontextmanager
from dataclasses import dataclass, field from dataclasses import dataclass, field
from decimal import Decimal
from functools import partial from functools import partial
from pprint import pformat from pprint import pformat
import time import time
@ -42,6 +41,7 @@ from piker.accounting import (
Position, Position,
mk_allocator, mk_allocator,
MktPair, MktPair,
Symbol,
) )
from piker.clearing import ( from piker.clearing import (
open_ems, open_ems,
@ -143,15 +143,6 @@ class OrderMode:
} }
_staged_order: Order | None = None _staged_order: Order | None = None
@property
def curr_mkt(self) -> MktPair:
'''
Deliver the currently selected `MktPair` according
chart state.
'''
return self.chart.linked.mkt
def on_level_change_update_next_order_info( def on_level_change_update_next_order_info(
self, self,
level: float, level: float,
@ -181,11 +172,7 @@ class OrderMode:
line.update_labels(order_info) line.update_labels(order_info)
# update bound-in staged order # update bound-in staged order
mkt: MktPair = self.curr_mkt order.price = level
order.price: Decimal = mkt.quantize(
size=level,
quantity_type='price',
)
order.size = order_info['size'] order.size = order_info['size']
# when an order is changed we flip the settings side-pane to # when an order is changed we flip the settings side-pane to
@ -200,9 +187,7 @@ class OrderMode:
) -> LevelLine: ) -> LevelLine:
# TODO, if we instead just always decimalize at the ems layer level = order.price
# we can avoid this back-n-forth casting?
level = float(order.price)
line = order_line( line = order_line(
chart or self.chart, chart or self.chart,
@ -239,11 +224,7 @@ class OrderMode:
# the order mode allocator but we still need to update the # the order mode allocator but we still need to update the
# "staged" order message we'll send to the ems # "staged" order message we'll send to the ems
def update_order_price(y: float) -> None: def update_order_price(y: float) -> None:
mkt: MktPair = self.curr_mkt order.price = y
order.price: Decimal = mkt.quantize(
size=y,
quantity_type='price',
)
line._on_level_change = update_order_price line._on_level_change = update_order_price
@ -294,31 +275,34 @@ class OrderMode:
chart = cursor.linked.chart chart = cursor.linked.chart
if ( if (
not chart not chart
and and cursor
cursor and cursor.active_plot
and
cursor.active_plot
): ):
return return
chart = cursor.active_plot chart = cursor.active_plot
price: float = cursor._datum_xy[1] price = cursor._datum_xy[1]
if not price: if not price:
# zero prices are not supported by any means # zero prices are not supported by any means
# since that's illogical / a no-op. # since that's illogical / a no-op.
return return
mkt: MktPair = self.chart.linked.mkt
# NOTE : we could also use instead,
# mkt.quantize(price, quantity_type='price')
# but it returns a Decimal and it's probably gonna
# be slower?
# TODO: should we be enforcing this precision # TODO: should we be enforcing this precision
# at a different layer in the stack? # at a different layer in the stack? right now
# |_ might require `MktPair` tracking in the EMS? # any precision error will literally be relayed
# |_ right now any precision error will be relayed # all the way back from the backend.
# all the way back from the backend and vice-versa..
# price = round(
mkt: MktPair = self.curr_mkt price,
price: Decimal = mkt.quantize( ndigits=mkt.price_tick_digits,
size=price,
quantity_type='price',
) )
order = self._staged_order = Order( order = self._staged_order = Order(
action=action, action=action,
price=price, price=price,
@ -394,7 +378,7 @@ class OrderMode:
'oid': oid, 'oid': oid,
}) })
if float(order.price) <= 0: if order.price <= 0:
log.error( log.error(
'*!? Invalid `Order.price <= 0` ?!*\n' '*!? Invalid `Order.price <= 0` ?!*\n'
# TODO: make this present multi-line in object form # TODO: make this present multi-line in object form
@ -531,15 +515,14 @@ class OrderMode:
# if an order msg is provided update the line # if an order msg is provided update the line
# **from** that msg. # **from** that msg.
if order: if order:
price: float = float(order.price) if order.price <= 0:
if price <= 0:
log.error(f'Order has 0 price, cancelling..\n{order}') log.error(f'Order has 0 price, cancelling..\n{order}')
self.cancel_orders([order.oid]) self.cancel_orders([order.oid])
return None return None
line.set_level(price) line.set_level(order.price)
self.on_level_change_update_next_order_info( self.on_level_change_update_next_order_info(
level=price, level=order.price,
line=line, line=line,
order=order, order=order,
# use the corresponding position tracker for the # use the corresponding position tracker for the
@ -698,9 +681,9 @@ class OrderMode:
) -> Dialog | None: ) -> Dialog | None:
# NOTE: the `.order` attr **must** be set with the # NOTE: the `.order` attr **must** be set with the
# equivalent order msg in order to be loaded. # equivalent order msg in order to be loaded.
order: Order = msg.req order = msg.req
oid = str(msg.oid) oid = str(msg.oid)
symbol: str = order.symbol symbol = order.symbol
# TODO: MEGA UGGG ZONEEEE! # TODO: MEGA UGGG ZONEEEE!
src = msg.src src = msg.src
@ -719,22 +702,13 @@ class OrderMode:
order.oid = str(order.oid) order.oid = str(order.oid)
order.brokers = [brokername] order.brokers = [brokername]
# ?TODO? change this over to `MktPair`, but it's gonna be # TODO: change this over to `MktPair`, but it's
# tough since we don't have any such data really in our # gonna be tough since we don't have any such data
# clearing msg schema.. # really in our clearing msg schema..
# BUT WAIT! WHY do we even want/need this!? order.symbol = Symbol.from_fqme(
# fqsn=fqme,
# order.symbol = self.curr_mkt info={},
# )
# XXX, the old approach.. which i don't quire member why..
# -[ ] verify we for sure don't require this any more!
# |_https://github.com/pikers/piker/issues/517
#
# order.symbol = Symbol.from_fqme(
# fqsn=fqme,
# info={},
# )
maybe_dialog: Dialog | None = self.submit_order( maybe_dialog: Dialog | None = self.submit_order(
send_msg=False, send_msg=False,
order=order, order=order,
@ -1127,7 +1101,7 @@ async def process_trade_msg(
) )
) )
): ):
msg.req: Order = order msg.req = order
dialog: ( dialog: (
Dialog Dialog
# NOTE: on an invalid order submission (eg. # NOTE: on an invalid order submission (eg.
@ -1192,7 +1166,7 @@ async def process_trade_msg(
tm = time.time() tm = time.time()
mode.on_fill( mode.on_fill(
oid, oid,
price=float(req.price), price=req.price,
time_s=tm, time_s=tm,
) )
mode.lines.remove_line(uuid=oid) mode.lines.remove_line(uuid=oid)
@ -1247,7 +1221,7 @@ async def process_trade_msg(
tm = details['broker_time'] tm = details['broker_time']
mode.on_fill( mode.on_fill(
oid, oid,
price=float(details['price']), price=details['price'],
time_s=tm, time_s=tm,
pointing='up' if action == 'buy' else 'down', pointing='up' if action == 'buy' else 'down',
) )

View File

@ -179,7 +179,7 @@ def test_ems_err_on_bad_broker(
# NOTE: emsd should error on the actor's enabled modules # NOTE: emsd should error on the actor's enabled modules
# import phase, when looking for a backend named `doggy`. # import phase, when looking for a backend named `doggy`.
except tractor.RemoteActorError as re: except tractor.RemoteActorError as re:
assert re.type is ModuleNotFoundError assert re.type == ModuleNotFoundError
run_and_tollerate_cancels(load_bad_fqme) run_and_tollerate_cancels(load_bad_fqme)