diff --git a/piker/brokers/deribit/api.py b/piker/brokers/deribit/api.py index df617d75..eb70bfa0 100644 --- a/piker/brokers/deribit/api.py +++ b/piker/brokers/deribit/api.py @@ -23,7 +23,6 @@ from contextlib import ( asynccontextmanager as acm, ) from datetime import datetime -from functools import partial import time from typing import ( Any, @@ -524,13 +523,12 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream: async def aio_price_feed_relay( + chan: to_asyncio.LinkedTaskChannel, fh: FeedHandler, instrument: Symbol, - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, ) -> None: async def _trade(data: dict, receipt_timestamp): - to_trio.send_nowait(('trade', { + chan.send_nowait(('trade', { 'symbol': cb_sym_to_deribit_inst( str_to_cb_sym(data.symbol)).lower(), 'last': data, @@ -540,7 +538,7 @@ async def aio_price_feed_relay( })) async def _l1(data: dict, receipt_timestamp): - to_trio.send_nowait(('l1', { + chan.send_nowait(('l1', { 'symbol': cb_sym_to_deribit_inst( str_to_cb_sym(data.symbol)).lower(), 'ticks': [ @@ -570,7 +568,7 @@ async def aio_price_feed_relay( install_signal_handlers=False) # sync with trio - to_trio.send_nowait(None) + chan.started_nowait(None) await asyncio.sleep(float('inf')) @@ -581,11 +579,9 @@ async def open_price_feed( ) -> trio.abc.ReceiveStream: async with maybe_open_feed_handler() as fh: async with to_asyncio.open_channel_from( - partial( - aio_price_feed_relay, - fh, - instrument - ) + aio_price_feed_relay, + fh=fh, + instrument=instrument, ) as (chan, first): yield chan @@ -611,10 +607,9 @@ async def maybe_open_price_feed( async def aio_order_feed_relay( + chan: to_asyncio.LinkedTaskChannel, fh: FeedHandler, instrument: Symbol, - from_trio: asyncio.Queue, - to_trio: trio.abc.SendChannel, ) -> None: async def _fill(data: dict, receipt_timestamp): breakpoint() @@ -637,7 +632,7 @@ async def aio_order_feed_relay( install_signal_handlers=False) # sync with trio - to_trio.send_nowait(None) + chan.started_nowait(None) await asyncio.sleep(float('inf')) @@ -648,11 +643,9 @@ async def open_order_feed( ) -> trio.abc.ReceiveStream: async with maybe_open_feed_handler() as fh: async with to_asyncio.open_channel_from( - partial( - aio_order_feed_relay, - fh, - instrument - ) + aio_order_feed_relay, + fh=fh, + instrument=instrument, ) as (chan, first): yield chan diff --git a/piker/brokers/ib/broker.py b/piker/brokers/ib/broker.py index fcead4cf..00141c4f 100644 --- a/piker/brokers/ib/broker.py +++ b/piker/brokers/ib/broker.py @@ -231,20 +231,21 @@ async def handle_order_requests( async def recv_trade_updates( + chan: tractor.to_asyncio.LinkedTaskChannel, client: Client, - to_trio: trio.abc.SendChannel, ) -> None: ''' - Receive and relay order control and positioning related events - from `ib_async`, pack as tuples and push over mem-chan to our - trio relay task for processing and relay to EMS. + Receive and relay order control and positioning + related events from `ib_async`, pack as tuples and + push over mem-chan to our trio relay task for + processing and relay to EMS. ''' - client.inline_errors(to_trio) + client.inline_errors(chan) # sync with trio task - to_trio.send_nowait(client.ib) + chan.started_nowait(client.ib) def push_tradesies( eventkit_obj, @@ -282,7 +283,7 @@ async def recv_trade_updates( try: # emit event name + relevant ibis internal objects - to_trio.send_nowait((event_name, emit)) + chan.send_nowait((event_name, emit)) except trio.BrokenResourceError: log.exception(f'Disconnected from {eventkit_obj} updates') eventkit_obj.disconnect(push_tradesies)