Compare commits

..

No commits in common. "b6d60d348a6329d361c8cf68cdbbad57fbb9f579" and "0c99e098e61a6c2b03f6e6ebfe728cb9bbf4d0af" have entirely different histories.

23 changed files with 90 additions and 245 deletions

View File

@ -19,11 +19,9 @@
for tendiez.
'''
from piker.log import (
get_console_log,
get_logger,
)
from piker.calc import (
from ..log import get_logger
from .calc import (
iter_by_dt,
)
from ._ledger import (
@ -53,12 +51,6 @@ from ._allocate import (
log = get_logger(__name__)
# ?TODO, enable console on import
# [ ] necessary? or `open_brokerd_dialog()` doing it is sufficient?
#
# get_console_log(
# name=__name__,
# )
__all__ = [
'Account',

View File

@ -60,17 +60,12 @@ from ..clearing._messages import (
BrokerdPosition,
)
from piker.types import Struct
from piker.log import (
get_console_log,
get_logger,
)
from piker.log import get_logger
if TYPE_CHECKING:
from piker.data._symcache import SymbologyCache
log = get_logger(
name=__name__,
)
log = get_logger(__name__)
class Position(Struct):

View File

@ -25,16 +25,15 @@ from types import ModuleType
from tractor.trionics import maybe_open_context
from piker.log import (
get_logger,
)
from ._util import (
log,
BrokerError,
SymbolNotFound,
NoData,
DataUnavailable,
DataThrottle,
resproc,
get_logger,
)
__all__: list[str] = [
@ -44,6 +43,7 @@ __all__: list[str] = [
'DataUnavailable',
'DataThrottle',
'resproc',
'get_logger',
]
__brokers__: list[str] = [
@ -65,10 +65,6 @@ __brokers__: list[str] = [
# bitso
]
log = get_logger(
name=__name__,
)
def get_brokermod(brokername: str) -> ModuleType:
'''

View File

@ -33,18 +33,12 @@ import exceptiongroup as eg
import tractor
import trio
from piker.log import (
get_logger,
get_console_log,
)
from . import _util
from . import get_brokermod
if TYPE_CHECKING:
from ..data import _FeedsBus
log = get_logger(name=__name__)
# `brokerd` enabled modules
# TODO: move this def to the `.data` subpkg..
# NOTE: keeping this list as small as possible is part of our caps-sec
@ -80,13 +74,16 @@ async def _setup_persistent_brokerd(
# any further (level) configuration on their own B)
actor: tractor.Actor = tractor.current_actor()
tll: str = actor.loglevel
log = get_console_log(
log = _util.get_console_log(
level=loglevel or tll,
name=f'{_util.subsys}.{brokername}',
with_tractor_log=bool(tll),
)
assert log.name == _util.subsys
# set global for this actor to this new process-wide instance B)
_util.log = log
# further, set the log level on any broker broker specific
# logger instance.
@ -256,7 +253,7 @@ async def spawn_brokerd(
async def maybe_spawn_brokerd(
brokername: str,
loglevel: str|None = None,
loglevel: str | None = None,
**pikerd_kwargs,
@ -271,11 +268,6 @@ async def maybe_spawn_brokerd(
'''
from piker.service import maybe_spawn_daemon
# if (
# loglevel != 'info'
# ):
# await tractor.pause()
async with maybe_spawn_daemon(
f'brokerd.{brokername}',

View File

@ -19,13 +19,15 @@ Handy cross-broker utils.
"""
from __future__ import annotations
# from functools import partial
from functools import partial
import json
import httpx
import logging
from piker.log import (
from ..log import (
get_logger,
get_console_log,
colorize_json,
)
subsys: str = 'piker.brokers'
@ -33,22 +35,12 @@ subsys: str = 'piker.brokers'
# NOTE: level should be reset by any actor that is spawned
# as well as given a (more) explicit name/key such
# as `piker.brokers.binance` matching the subpkg.
# log = get_logger(subsys)
log = get_logger(subsys)
# ?TODO?? we could use this approach, but we need to be able
# to pass multiple `name=` values so for example we can include the
# emissions in `.accounting._pos` and others!
# [ ] maybe we could do the `log = get_logger()` above,
# then cycle through the list of subsys mods we depend on
# and then get all their loggers and pass them to
# `get_console_log(logger=)`??
# [ ] OR just write THIS `get_console_log()` as a hook which does
# that based on who calls it?.. i dunno
#
# get_console_log = partial(
# get_console_log,
# name=subsys,
# )
get_console_log = partial(
get_console_log,
name=subsys,
)
class BrokerError(Exception):

View File

@ -37,9 +37,8 @@ import trio
from piker.accounting import (
Asset,
)
from piker.log import (
from piker.brokers._util import (
get_logger,
get_console_log,
)
from piker.data._web_bs import (
open_autorecon_ws,
@ -70,9 +69,7 @@ from .venues import (
)
from .api import Client
log = get_logger(
name=__name__,
)
log = get_logger('piker.brokers.binance')
# Fee schedule template, mostly for paper engine fees modelling.
@ -248,16 +245,9 @@ async def handle_order_requests(
@tractor.context
async def open_trade_dialog(
ctx: tractor.Context,
loglevel: str = 'warning',
) -> AsyncIterator[dict[str, Any]]:
# enable piker.clearing console log for *this* `brokerd` subactor
get_console_log(
level=loglevel,
name=__name__,
)
# TODO: how do we set this from the EMS such that
# positions are loaded from the correct venue on the user
# stream at startup? (that is in an attempt to support both

View File

@ -64,9 +64,9 @@ from piker.data._web_bs import (
open_autorecon_ws,
NoBsWs,
)
from piker.log import get_logger
from piker.brokers._util import (
DataUnavailable,
get_logger,
)
from .api import (
@ -78,7 +78,7 @@ from .venues import (
get_api_eps,
)
log = get_logger(name=__name__)
log = get_logger('piker.brokers.binance')
class L1(Struct):

View File

@ -27,12 +27,14 @@ import click
import trio
import tractor
from piker.cli import cli
from piker import watchlists as wl
from piker.log import (
from ..cli import cli
from .. import watchlists as wl
from ..log import (
colorize_json,
)
from ._util import (
log,
get_console_log,
get_logger,
)
from ..service import (
maybe_spawn_brokerd,
@ -43,15 +45,12 @@ from ..brokers import (
get_brokermod,
data,
)
log = get_logger(
name=__name__,
)
DEFAULT_BROKER = 'binance'
_config_dir = click.get_app_dir('piker')
_watchlists_data_path = os.path.join(_config_dir, 'watchlists.json')
OK = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
@ -346,10 +345,7 @@ def contracts(ctx, loglevel, broker, symbol, ids):
'''
brokermod = get_brokermod(broker)
get_console_log(
level=loglevel,
name=__name__,
)
get_console_log(loglevel)
contracts = trio.run(partial(core.contracts, brokermod, symbol))
if not ids:
@ -481,12 +477,11 @@ def search(
# the `piker --pdb` XD ..
# -[ ] pull from the parent click ctx's values..dumdum
# assert pdb
loglevel: str = config['loglevel']
# define tractor entrypoint
async def main(func):
async with maybe_open_pikerd(
loglevel=loglevel,
loglevel=config['loglevel'],
debug_mode=pdb,
):
return await func()
@ -499,7 +494,6 @@ def search(
core.symbol_search,
brokermods,
pattern,
loglevel=loglevel,
),
)

View File

@ -28,14 +28,12 @@ from typing import (
import trio
from piker.log import get_logger
from ._util import log
from . import get_brokermod
from ..service import maybe_spawn_brokerd
from . import open_cached_client
from ..accounting import MktPair
log = get_logger(name=__name__)
async def api(brokername: str, methname: str, **kwargs) -> dict:
'''
@ -149,7 +147,6 @@ async def search_w_brokerd(
async def symbol_search(
brokermods: list[ModuleType],
pattern: str,
loglevel: str = 'warning',
**kwargs,
) -> dict[str, dict[str, dict[str, Any]]]:
@ -179,7 +176,6 @@ async def symbol_search(
'_infect_asyncio',
False,
),
loglevel=loglevel
) as portal:
results.append((

View File

@ -41,15 +41,12 @@ import tractor
from tractor.experimental import msgpub
from async_generator import asynccontextmanager
from piker.log import(
get_logger,
from ._util import (
log,
get_console_log,
)
from . import get_brokermod
log = get_logger(
name='piker.brokers.binance',
)
async def wait_for_network(
net_func: Callable,
@ -246,10 +243,7 @@ async def start_quote_stream(
'''
# XXX: why do we need this again?
get_console_log(
level=tractor.current_actor().loglevel,
name=__name__,
)
get_console_log(tractor.current_actor().loglevel)
# pull global vars from local actor
symbols = list(symbols)

View File

@ -34,7 +34,7 @@ import subprocess
import tractor
from piker.log import get_logger
from piker.brokers._util import get_logger
if TYPE_CHECKING:
from .api import Client

View File

@ -50,10 +50,7 @@ from ib_insync.objects import (
)
from piker import config
from piker.log import (
get_logger,
get_console_log,
)
from piker.log import get_logger
from piker.types import Struct
from piker.accounting import (
Position,
@ -98,9 +95,7 @@ from .ledger import (
update_ledger_from_api_trades,
)
log = get_logger(
name=__name__,
)
log = get_logger(name=__name__)
def pack_position(
@ -543,15 +538,9 @@ class IbAcnt(Struct):
@tractor.context
async def open_trade_dialog(
ctx: tractor.Context,
loglevel: str = 'warning',
) -> AsyncIterator[dict[str, Any]]:
get_console_log(
level=loglevel,
name=__name__,
)
# task local msg dialog tracking
flows = OrderDialogs()
accounts_def = config.load_accounts(['ib'])

View File

@ -62,12 +62,9 @@ from piker.clearing._messages import (
from piker.brokers import (
open_cached_client,
)
from piker.log import (
get_console_log,
get_logger,
)
from piker.data import open_symcache
from .api import (
log,
Client,
BrokerError,
)
@ -81,8 +78,6 @@ from .ledger import (
verify_balances,
)
log = get_logger(name=__name__)
MsgUnion = Union[
BrokerdCancel,
BrokerdError,
@ -436,15 +431,9 @@ def trades2pps(
@tractor.context
async def open_trade_dialog(
ctx: tractor.Context,
loglevel: str = 'warning',
) -> AsyncIterator[dict[str, Any]]:
get_console_log(
level=loglevel,
name=__name__,
)
async with (
# TODO: maybe bind these together and deliver
# a tuple from `.open_cached_client()`?

View File

@ -50,19 +50,13 @@ from . import open_cached_client
from piker._cacheables import async_lifo_cache
from .. import config
from ._util import resproc, BrokerError, SymbolNotFound
from piker.log import (
from ..log import (
colorize_json,
)
from ._util import (
log,
get_console_log,
)
from piker.log import (
get_logger,
)
log = get_logger(
name=__name__,
)
_use_practice_account = False
_refresh_token_ep = 'https://{}login.questrade.com/oauth2/'
@ -1211,10 +1205,7 @@ async def stream_quotes(
# feed_type: str = 'stock',
) -> AsyncGenerator[str, Dict[str, Any]]:
# XXX: required to propagate ``tractor`` loglevel to piker logging
get_console_log(
level=loglevel,
name=__name__,
)
get_console_log(loglevel)
async with open_cached_client('questrade') as client:
if feed_type == 'stock':

View File

@ -30,16 +30,9 @@ import asks
from ._util import (
resproc,
BrokerError,
log,
)
from piker.calc import percent_change
from piker.log import (
get_logger,
)
log = get_logger(
name=__name__,
)
from ..calc import percent_change
_service_ep = 'https://api.robinhood.com'

View File

@ -215,7 +215,7 @@ async def relay_orders_from_sync_code(
async def open_ems(
fqme: str,
mode: str = 'live',
loglevel: str = 'warning',
loglevel: str = 'error',
) -> tuple[
OrderClient, # client

View File

@ -352,21 +352,9 @@ async def open_brokerd_dialog(
broker backend, configuration, or client code usage.
'''
get_console_log(
level=loglevel,
name='clearing',
)
# enable `.accounting` console since normally used by
# each `brokerd`.
get_console_log(
level=loglevel,
name='piker.accounting',
)
broker: str = brokermod.name
def mk_paper_ep(
loglevel: str,
):
def mk_paper_ep():
from . import _paper_engine as paper_mod
nonlocal brokermod, exec_mode
@ -418,21 +406,17 @@ async def open_brokerd_dialog(
if (
trades_endpoint is not None
or
exec_mode != 'paper'
or exec_mode != 'paper'
):
# open live brokerd trades endpoint
open_trades_endpoint = portal.open_context(
trades_endpoint,
loglevel=loglevel,
)
@acm
async def maybe_open_paper_ep():
if exec_mode == 'paper':
async with mk_paper_ep(
loglevel=loglevel,
) as msg:
async with mk_paper_ep() as msg:
yield msg
return
@ -443,9 +427,7 @@ async def open_brokerd_dialog(
# runtime indication that the backend can't support live
# order ctrl yet, so boot the paperboi B0
if first == 'paper':
async with mk_paper_ep(
loglevel=loglevel,
) as msg:
async with mk_paper_ep() as msg:
yield msg
return
else:

View File

@ -59,9 +59,9 @@ from piker.data import (
open_symcache,
)
from piker.types import Struct
from piker.log import (
from ._util import (
log, # sub-sys logger
get_console_log,
get_logger,
)
from ._messages import (
BrokerdCancel,
@ -73,8 +73,6 @@ from ._messages import (
BrokerdError,
)
log = get_logger(name=__name__)
class PaperBoi(Struct):
'''
@ -552,18 +550,16 @@ _sells: defaultdict[
@tractor.context
async def open_trade_dialog(
ctx: tractor.Context,
broker: str,
fqme: str|None = None, # if empty, we only boot broker mode
fqme: str | None = None, # if empty, we only boot broker mode
loglevel: str = 'warning',
) -> None:
# enable piker.clearing console log for *this* `brokerd` subactor
get_console_log(
level=loglevel,
name=__name__,
)
# enable piker.clearing console log for *this* subactor
get_console_log(loglevel)
symcache: SymbologyCache
async with open_symcache(get_brokermod(broker)) as symcache:

View File

@ -335,18 +335,10 @@ async def register_with_sampler(
open_index_stream: bool = True, # open a 2way stream for sample step msgs?
sub_for_broadcasts: bool = True, # sampler side to send step updates?
loglevel: str|None = None,
) -> set[int]:
get_console_log(
level=(
loglevel
or
tractor.current_actor().loglevel
),
name=__name__,
)
get_console_log(tractor.current_actor().loglevel)
incr_was_started: bool = False
try:
@ -483,7 +475,6 @@ async def spawn_samplerd(
register_with_sampler,
period_s=1,
sub_for_broadcasts=False,
loglevel=loglevel,
)
return True
@ -492,6 +483,7 @@ async def spawn_samplerd(
@acm
async def maybe_open_samplerd(
loglevel: str|None = None,
**pikerd_kwargs,
@ -520,10 +512,10 @@ async def open_sample_stream(
shms_by_period: dict[float, dict]|None = None,
open_index_stream: bool = True,
sub_for_broadcasts: bool = True,
loglevel: str|None = None,
# cache_key: str|None = None,
# allow_new_sampler: bool = True,
cache_key: str|None = None,
allow_new_sampler: bool = True,
ensure_is_active: bool = False,
) -> AsyncIterator[dict[str, float]]:
@ -558,9 +550,7 @@ async def open_sample_stream(
# XXX: this should be singleton on a host,
# a lone broker-daemon per provider should be
# created for all practical purposes
maybe_open_samplerd(
loglevel=loglevel,
) as portal,
maybe_open_samplerd() as portal,
portal.open_context(
register_with_sampler,
@ -569,7 +559,6 @@ async def open_sample_stream(
'shms_by_period': shms_by_period,
'open_index_stream': open_index_stream,
'sub_for_broadcasts': sub_for_broadcasts,
'loglevel': loglevel,
},
) as (ctx, shm_periods)
):

View File

@ -239,6 +239,7 @@ async def allocate_persistent_feed(
brokername: str,
symstr: str,
loglevel: str,
start_stream: bool = True,
init_timeout: float = 616,
@ -277,7 +278,7 @@ async def allocate_persistent_feed(
# ``stream_quotes()``, a required broker backend endpoint.
init_msgs: (
list[FeedInit] # new
|dict[str, dict[str, str]] # legacy / deprecated
| dict[str, dict[str, str]] # legacy / deprecated
)
# TODO: probably make a struct msg type for this as well
@ -347,14 +348,11 @@ async def allocate_persistent_feed(
izero_rt,
rt_shm,
) = await bus.nursery.start(
partial(
manage_history,
mod=mod,
mkt=mkt,
some_data_ready=some_data_ready,
feed_is_live=feed_is_live,
loglevel=loglevel,
)
manage_history,
mod,
mkt,
some_data_ready,
feed_is_live,
)
# yield back control to starting nursery once we receive either
@ -462,6 +460,7 @@ async def allocate_persistent_feed(
@tractor.context
async def open_feed_bus(
ctx: tractor.Context,
brokername: str,
symbols: list[str], # normally expected to the broker-specific fqme
@ -482,16 +481,13 @@ async def open_feed_bus(
'''
if loglevel is None:
loglevel: str = tractor.current_actor().loglevel
loglevel = tractor.current_actor().loglevel
# XXX: required to propagate ``tractor`` loglevel to piker
# logging
get_console_log(
level=(loglevel
or
tractor.current_actor().loglevel
),
name=__name__,
loglevel
or tractor.current_actor().loglevel
)
# local state sanity checks
@ -803,7 +799,7 @@ async def install_brokerd_search(
@acm
async def maybe_open_feed(
fqmes: list[str],
loglevel: str|None = None,
loglevel: str | None = None,
**kwargs,
@ -819,11 +815,6 @@ async def maybe_open_feed(
'''
fqme = fqmes[0]
# if (
# loglevel != 'info'
# ):
# await tractor.pause()
async with trionics.maybe_open_context(
acm_func=open_feed,
kwargs={
@ -890,12 +881,9 @@ async def open_feed(
providers.setdefault(mod, []).append(bs_fqme)
feed.mods[mod.name] = mod
if (
loglevel != 'info'
):
await tractor.pause()
# one actor per brokerd for now
brokerd_ctxs = []
for brokermod, bfqmes in providers.items():
# if no `brokerd` for this backend exists yet we spawn

View File

@ -30,9 +30,8 @@ from contextlib import (
import tractor
from trio.lowlevel import current_task
from piker.log import (
get_console_log,
get_logger,
from ._util import (
log, # sub-sys logger
)
from ._mngr import (
Services,
@ -40,8 +39,6 @@ from ._mngr import (
from ._actor_runtime import maybe_open_pikerd
from ._registry import find_service
log = get_logger(name=__name__)
@acm
async def maybe_spawn_daemon(
@ -51,7 +48,7 @@ async def maybe_spawn_daemon(
spawn_args: dict[str, Any],
loglevel: str|None = None,
loglevel: str | None = None,
singleton: bool = False,
**pikerd_kwargs,
@ -69,10 +66,6 @@ async def maybe_spawn_daemon(
clients.
'''
get_console_log(
level=loglevel,
name=__name__,
)
# serialize access to this section to avoid
# 2 or more tasks racing to create a daemon
lock = Services.locks[service_name]

View File

@ -54,10 +54,10 @@ from ..log import (
# for "time series processing"
subsys: str = 'piker.tsp'
log = get_logger(name=__name__)
log = get_logger(subsys)
get_console_log = partial(
get_console_log,
name=subsys, # activate for subsys-pkg "downward"
name=subsys,
)
# NOTE: union type-defs to handle generic `numpy` and `polars` types

View File

@ -63,10 +63,8 @@ from ..data._sharedmem import (
maybe_open_shm_array,
ShmArray,
)
from piker.data._source import (
def_iohlcv_fields,
)
from piker.data._sampling import (
from ..data._source import def_iohlcv_fields
from ..data._sampling import (
open_sample_stream,
)
@ -98,9 +96,7 @@ if TYPE_CHECKING:
# from .feed import _FeedsBus
log = get_logger(
name=__name__,
)
log = get_logger(__name__)
# `ShmArray` buffer sizing configuration:
@ -554,7 +550,7 @@ async def start_backfill(
)
# ?TODO, check against venue closure hours
# if/when provided by backend?
# await tractor.pause()
await tractor.pause()
expected_dur: Interval = (
last_start_dt.subtract(
@ -1324,7 +1320,6 @@ async def manage_history(
mkt: MktPair,
some_data_ready: trio.Event,
feed_is_live: trio.Event,
loglevel: str = 'warning',
timeframe: float = 60, # in seconds
wait_for_live_timeout: float = 0.5,
@ -1502,7 +1497,6 @@ async def manage_history(
# data feed layer that needs to consume it).
open_index_stream=True,
sub_for_broadcasts=False,
loglevel=loglevel,
) as sample_stream:
# register 1s and 1m buffers with the global