Swap `open_channel_from()` yield-pair order

Port deribit and IB `asyncio` bridge callables to the new
`to_asyncio.open_channel_from()` signature where the `LinkedTaskChannel`
is the first param and `started_nowait()` replaces the old
`to_trio.send_nowait()` sync handshake.

Deats,
- deribit `api.py`: update `aio_price_feed_relay()` and
  `aio_order_feed_relay()` signatures to take `chan: LinkedTaskChannel`
  as first arg; drop `from_trio`/`to_trio` params; replace
  `to_trio.send_nowait()` with `chan.send_nowait()` and
  `chan.started_nowait()`.
- drop `functools.partial()` wrapping in both `open_price_feed()` and
  `open_order_feed()`; pass `fh=`/`instrument=` as kwargs directly.
- IB `broker.py`: same `chan` + `started_nowait()` port for
  `recv_trade_updates()`.

Other styling,
- rewrap `recv_trade_updates()` docstring to 67 chars.

(this patch was generated in some part by [`claude-code`][claude-code-gh])
[claude-code-gh]: https://github.com/anthropics/claude-code
Gud Boi 2026-03-24 00:10:55 -04:00
parent 799c9f45b4
commit 5e44ad05e8
2 changed files with 20 additions and 26 deletions

View File

@ -23,7 +23,6 @@ from contextlib import (
asynccontextmanager as acm,
)
from datetime import datetime
from functools import partial
import time
from typing import (
Any,
@ -524,13 +523,12 @@ async def maybe_open_feed_handler() -> trio.abc.ReceiveStream:
async def aio_price_feed_relay(
chan: to_asyncio.LinkedTaskChannel,
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _trade(data: dict, receipt_timestamp):
to_trio.send_nowait(('trade', {
chan.send_nowait(('trade', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'last': data,
@ -540,7 +538,7 @@ async def aio_price_feed_relay(
}))
async def _l1(data: dict, receipt_timestamp):
to_trio.send_nowait(('l1', {
chan.send_nowait(('l1', {
'symbol': cb_sym_to_deribit_inst(
str_to_cb_sym(data.symbol)).lower(),
'ticks': [
@ -570,7 +568,7 @@ async def aio_price_feed_relay(
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
chan.started_nowait(None)
await asyncio.sleep(float('inf'))
@ -581,11 +579,9 @@ async def open_price_feed(
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_price_feed_relay,
fh,
instrument
)
fh=fh,
instrument=instrument,
) as (chan, first):
yield chan
@ -611,10 +607,9 @@ async def maybe_open_price_feed(
async def aio_order_feed_relay(
chan: to_asyncio.LinkedTaskChannel,
fh: FeedHandler,
instrument: Symbol,
from_trio: asyncio.Queue,
to_trio: trio.abc.SendChannel,
) -> None:
async def _fill(data: dict, receipt_timestamp):
breakpoint()
@ -637,7 +632,7 @@ async def aio_order_feed_relay(
install_signal_handlers=False)
# sync with trio
to_trio.send_nowait(None)
chan.started_nowait(None)
await asyncio.sleep(float('inf'))
@ -648,11 +643,9 @@ async def open_order_feed(
) -> trio.abc.ReceiveStream:
async with maybe_open_feed_handler() as fh:
async with to_asyncio.open_channel_from(
partial(
aio_order_feed_relay,
fh,
instrument
)
fh=fh,
instrument=instrument,
) as (chan, first):
yield chan

View File

@ -231,20 +231,21 @@ async def handle_order_requests(
async def recv_trade_updates(
chan: tractor.to_asyncio.LinkedTaskChannel,
client: Client,
to_trio: trio.abc.SendChannel,
) -> None:
'''
Receive and relay order control and positioning related events
from `ib_async`, pack as tuples and push over mem-chan to our
trio relay task for processing and relay to EMS.
Receive and relay order control and positioning
related events from `ib_async`, pack as tuples and
push over mem-chan to our trio relay task for
processing and relay to EMS.
'''
client.inline_errors(to_trio)
client.inline_errors(chan)
# sync with trio task
to_trio.send_nowait(client.ib)
chan.started_nowait(client.ib)
def push_tradesies(
eventkit_obj,
@ -282,7 +283,7 @@ async def recv_trade_updates(
try:
# emit event name + relevant ibis internal objects
to_trio.send_nowait((event_name, emit))
chan.send_nowait((event_name, emit))
except trio.BrokenResourceError:
log.exception(f'Disconnected from {eventkit_obj} updates')
eventkit_obj.disconnect(push_tradesies)