from pprint import pformat from functools import partial from decimal import Decimal from typing import Callable import tractor import trio from uuid import uuid4 from piker.accounting import dec_digits from piker.clearing import ( open_ems, OrderClient, ) # TODO: we should probably expose these top level in this subsys? from piker.clearing._messages import ( Order, Status, BrokerdPosition, ) from piker.data import ( Flume, open_feed, Feed, ShmArray, ) async def wait_for_order_status( trades_stream: tractor.MsgStream, oid: str, expect_status: str, ) -> tuple[ list[Status], list[BrokerdPosition], ]: ''' Wait for a specific order status for a given dialog, return msg flow up to that msg and any position update msgs in a tuple. ''' # Wait for position message before moving on to verify flow(s) # for the multi-order position entry/exit. status_msgs: list[Status] = [] pp_msgs: list[BrokerdPosition] = [] async for msg in trades_stream: match msg: case {'name': 'position'}: ppmsg = BrokerdPosition(**msg) pp_msgs.append(ppmsg) case { 'name': 'status', }: msg = Status(**msg) status_msgs.append(msg) # if we get the status we expect then return all # collected msgs from the brokerd dialog up to the # exected msg B) if ( msg.resp == expect_status and msg.oid == oid ): return status_msgs, pp_msgs async def bot_main(): ''' Boot the piker runtime, open an ems connection, submit and process orders statuses in real-time. ''' from piker.service import maybe_open_pikerd ll: str = 'info' # open an order ctl client, live data feed, trio nursery for # spawning an order trailer task client: OrderClient trades_stream: tractor.MsgStream feed: Feed accounts: list[str] fqme: str = 'btcusdt.usdtm.perp.binance' async with ( # TODO: do this implicitly inside `open_ems()` ep below? # init and sync actor-service runtime maybe_open_pikerd( loglevel=ll, debug_mode=True, ), open_ems( fqme, mode='paper', # {'live', 'paper'} # mode='live', # for real-brokerd submissions loglevel=ll, ) as ( client, # OrderClient trades_stream, # tractor.MsgStream startup_pps, _, # positions accounts, _, # dialogs ), open_feed( fqmes=[fqme], loglevel=ll, # TODO: if you want to throttle via downsampling # how many tick updates your feed received on # quote streams B) # tick_throttle=10, ) as feed, trio.open_nursery() as tn, ): print(f'Loaded binance accounts: {accounts}') flume: Flume = feed.flumes[fqme] min_tick = Decimal(flume.mkt.price_tick) min_tick_digits: int = dec_digits(min_tick) price_round: Callable = partial( round, ndigits=min_tick_digits, ) quote_stream: trio.abc.ReceiveChannel = feed.streams['binance'] clear_margin: float = 0.9995 async def trailer( order: Order, ): # ref shm OHLCV array s_shm: ShmArray = flume.rt_shm m_shm: ShmArray = flume.hist_shm # NOTE: if you wanted to frame ticks by type like the # the quote throttler does. # from piker.data._sampling import frame_ticks async for quotes in quote_stream: for fqme, quote in quotes.items(): for tick in quote.get('ticks', ()): print( f'{fqme} ticks:\n{pformat(tick)}\n\n' f'last 1s OHLC:\n{s_shm.array[-1]}\n' f'last 1m OHLC:\n{m_shm.array[-1]}\n' ) # always keep live limit 2% below last # clearing price if tick['type'] == 'trade': await client.update( uuid=order.oid, price=price_round( clear_margin * tick['price'] ), ) msgs, pps = await wait_for_order_status( trades_stream, oid, 'open' ) # setup order dialog via first msg size: float = 0.01 price: float = price_round( clear_margin * flume.first_quote['last'] ) oid: str = str(uuid4()) order = Order( exec_mode='live', # {'dark', 'live', 'alert'} action='buy', # TODO: remove this from our schema? oid=oid, account='paper', # use binance.usdtm for binance futes # account='binance.usdtm', # for live binance futes size=size, symbol=fqme, price=price, brokers=['binance'], ) await client.send(order) msgs, pps = await wait_for_order_status( trades_stream, oid, 'open' ) assert not pps assert msgs[-1].oid == oid # start "trailer task" which tracks rt quote stream tn.start_soon(trailer, order) try: # wait for ctl-c from user.. await trio.sleep_forever() except KeyboardInterrupt: # cancel the open order await client.cancel(oid) msgs, pps = await wait_for_order_status( trades_stream, oid, 'canceled' ) raise if __name__ == '__main__': trio.run(bot_main)