Compare commits
21 Commits
gitea_feat
...
dict_diffe
| Author | SHA1 | Date |
|---|---|---|
|
|
2f6e3ad03f | |
|
|
b75683879a | |
|
|
db8a3dd1b7 | |
|
|
2d92ed2052 | |
|
|
0756cb0289 | |
|
|
66f7dd9020 | |
|
|
9782107153 | |
|
|
1f43f660fe | |
|
|
d3b7d0e247 | |
|
|
700dbf0e2b | |
|
|
b52c4092f3 | |
|
|
7fe3e3f482 | |
|
|
bbbdcad33b | |
|
|
a3812cd169 | |
|
|
5ac5743c66 | |
|
|
aa204228ab | |
|
|
0bd8f2bcd9 | |
|
|
334f512ad3 | |
|
|
71cca4ceda | |
|
|
0d332427e2 | |
|
|
02980282cd |
|
|
@ -36,8 +36,6 @@ from trio_typing import TaskStatus
|
||||||
import tractor
|
import tractor
|
||||||
from ib_insync.contract import (
|
from ib_insync.contract import (
|
||||||
Contract,
|
Contract,
|
||||||
# Option,
|
|
||||||
# Forex,
|
|
||||||
)
|
)
|
||||||
from ib_insync.order import (
|
from ib_insync.order import (
|
||||||
Trade,
|
Trade,
|
||||||
|
|
@ -61,6 +59,8 @@ from piker.pp import (
|
||||||
)
|
)
|
||||||
from piker.log import get_console_log
|
from piker.log import get_console_log
|
||||||
from piker.clearing._messages import (
|
from piker.clearing._messages import (
|
||||||
|
Order,
|
||||||
|
Status,
|
||||||
BrokerdOrder,
|
BrokerdOrder,
|
||||||
BrokerdOrderAck,
|
BrokerdOrderAck,
|
||||||
BrokerdStatus,
|
BrokerdStatus,
|
||||||
|
|
@ -123,11 +123,13 @@ async def handle_order_requests(
|
||||||
f'An IB account number for name {account} is not found?\n'
|
f'An IB account number for name {account} is not found?\n'
|
||||||
'Make sure you have all TWS and GW instances running.'
|
'Make sure you have all TWS and GW instances running.'
|
||||||
)
|
)
|
||||||
await ems_order_stream.send(BrokerdError(
|
await ems_order_stream.send(
|
||||||
oid=request_msg['oid'],
|
BrokerdError(
|
||||||
symbol=request_msg['symbol'],
|
oid=request_msg['oid'],
|
||||||
reason=f'No account found: `{account}` ?',
|
symbol=request_msg['symbol'],
|
||||||
))
|
reason=f'No account found: `{account}` ?',
|
||||||
|
)
|
||||||
|
)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
client = _accounts2clients.get(account)
|
client = _accounts2clients.get(account)
|
||||||
|
|
@ -147,6 +149,14 @@ async def handle_order_requests(
|
||||||
# validate
|
# validate
|
||||||
order = BrokerdOrder(**request_msg)
|
order = BrokerdOrder(**request_msg)
|
||||||
|
|
||||||
|
# XXX: by default 0 tells ``ib_insync`` methods that
|
||||||
|
# there is no existing order so ask the client to create
|
||||||
|
# a new one (which it seems to do by allocating an int
|
||||||
|
# counter - collision prone..)
|
||||||
|
reqid = order.reqid
|
||||||
|
if reqid is not None:
|
||||||
|
reqid = int(reqid)
|
||||||
|
|
||||||
# call our client api to submit the order
|
# call our client api to submit the order
|
||||||
reqid = client.submit_limit(
|
reqid = client.submit_limit(
|
||||||
oid=order.oid,
|
oid=order.oid,
|
||||||
|
|
@ -155,12 +165,7 @@ async def handle_order_requests(
|
||||||
action=order.action,
|
action=order.action,
|
||||||
size=order.size,
|
size=order.size,
|
||||||
account=acct_number,
|
account=acct_number,
|
||||||
|
reqid=reqid,
|
||||||
# XXX: by default 0 tells ``ib_insync`` methods that
|
|
||||||
# there is no existing order so ask the client to create
|
|
||||||
# a new one (which it seems to do by allocating an int
|
|
||||||
# counter - collision prone..)
|
|
||||||
reqid=order.reqid,
|
|
||||||
)
|
)
|
||||||
if reqid is None:
|
if reqid is None:
|
||||||
await ems_order_stream.send(BrokerdError(
|
await ems_order_stream.send(BrokerdError(
|
||||||
|
|
@ -180,9 +185,9 @@ async def handle_order_requests(
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
||||||
elif action == 'cancel':
|
if action == 'cancel':
|
||||||
msg = BrokerdCancel(**request_msg)
|
msg = BrokerdCancel(**request_msg)
|
||||||
client.submit_cancel(reqid=msg.reqid)
|
client.submit_cancel(reqid=int(msg.reqid))
|
||||||
|
|
||||||
else:
|
else:
|
||||||
log.error(f'Unknown order command: {request_msg}')
|
log.error(f'Unknown order command: {request_msg}')
|
||||||
|
|
@ -357,11 +362,24 @@ async def update_and_audit_msgs(
|
||||||
# presume we're at least not more in the shit then we
|
# presume we're at least not more in the shit then we
|
||||||
# thought.
|
# thought.
|
||||||
if diff:
|
if diff:
|
||||||
|
reverse_split_ratio = pikersize / ibsize
|
||||||
|
split_ratio = 1/reverse_split_ratio
|
||||||
|
|
||||||
|
if split_ratio >= reverse_split_ratio:
|
||||||
|
entry = f'split_ratio = {int(split_ratio)}'
|
||||||
|
else:
|
||||||
|
entry = f'split_ratio = 1/{int(reverse_split_ratio)}'
|
||||||
|
|
||||||
raise ValueError(
|
raise ValueError(
|
||||||
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
f'POSITION MISMATCH ib <-> piker ledger:\n'
|
||||||
f'ib: {ibppmsg}\n'
|
f'ib: {ibppmsg}\n'
|
||||||
f'piker: {msg}\n'
|
f'piker: {msg}\n'
|
||||||
'YOU SHOULD FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?'
|
f'reverse_split_ratio: {reverse_split_ratio}\n'
|
||||||
|
f'split_ratio: {split_ratio}\n\n'
|
||||||
|
'FIGURE OUT WHY TF YOUR LEDGER IS OFF!?!?\n\n'
|
||||||
|
'If you are expecting a (reverse) split in this '
|
||||||
|
'instrument you should probably put the following '
|
||||||
|
f'in the `pps.toml` section:\n{entry}'
|
||||||
)
|
)
|
||||||
msg.size = ibsize
|
msg.size = ibsize
|
||||||
|
|
||||||
|
|
@ -439,7 +457,6 @@ async def trades_dialogue(
|
||||||
# we might also want to delegate a specific actor for
|
# we might also want to delegate a specific actor for
|
||||||
# ledger writing / reading for speed?
|
# ledger writing / reading for speed?
|
||||||
async with (
|
async with (
|
||||||
# trio.open_nursery() as nurse,
|
|
||||||
open_client_proxies() as (proxies, aioclients),
|
open_client_proxies() as (proxies, aioclients),
|
||||||
):
|
):
|
||||||
# Open a trade ledgers stack for appending trade records over
|
# Open a trade ledgers stack for appending trade records over
|
||||||
|
|
@ -468,6 +485,52 @@ async def trades_dialogue(
|
||||||
|
|
||||||
client = aioclients[account]
|
client = aioclients[account]
|
||||||
|
|
||||||
|
trades: list[Trade] = client.ib.openTrades()
|
||||||
|
order_msgs = []
|
||||||
|
for trade in trades:
|
||||||
|
|
||||||
|
order = trade.order
|
||||||
|
quant = trade.order.totalQuantity
|
||||||
|
action = order.action.lower()
|
||||||
|
size = {
|
||||||
|
'sell': -1,
|
||||||
|
'buy': 1,
|
||||||
|
}[action] * quant
|
||||||
|
con = trade.contract
|
||||||
|
|
||||||
|
# TODO: in the case of the SMART venue (aka ib's
|
||||||
|
# router-clearing sys) we probably should handle
|
||||||
|
# showing such orders overtop of the fqsn for the
|
||||||
|
# primary exchange, how to map this easily is going
|
||||||
|
# to be a bit tricky though?
|
||||||
|
deats = await proxy.con_deats(contracts=[con])
|
||||||
|
fqsn = list(deats)[0]
|
||||||
|
|
||||||
|
reqid = order.orderId
|
||||||
|
|
||||||
|
# TODO: maybe embed a ``BrokerdOrder`` instead
|
||||||
|
# since then we can directly load it on the client
|
||||||
|
# side in the order mode loop?
|
||||||
|
msg = Status(
|
||||||
|
time_ns=time.time_ns(),
|
||||||
|
resp='open',
|
||||||
|
oid=str(reqid),
|
||||||
|
reqid=reqid,
|
||||||
|
|
||||||
|
# embedded order info
|
||||||
|
req=Order(
|
||||||
|
action=action,
|
||||||
|
exec_mode='live',
|
||||||
|
oid=str(reqid),
|
||||||
|
symbol=fqsn,
|
||||||
|
account=accounts_def.inverse[order.account],
|
||||||
|
price=order.lmtPrice,
|
||||||
|
size=size,
|
||||||
|
),
|
||||||
|
src='ib',
|
||||||
|
)
|
||||||
|
order_msgs.append(msg)
|
||||||
|
|
||||||
# process pp value reported from ib's system. we only use these
|
# process pp value reported from ib's system. we only use these
|
||||||
# to cross-check sizing since average pricing on their end uses
|
# to cross-check sizing since average pricing on their end uses
|
||||||
# the so called (bs) "FIFO" style which more or less results in
|
# the so called (bs) "FIFO" style which more or less results in
|
||||||
|
|
@ -480,6 +543,7 @@ async def trades_dialogue(
|
||||||
# sure know which positions to update from the ledger if
|
# sure know which positions to update from the ledger if
|
||||||
# any are missing from the ``pps.toml``
|
# any are missing from the ``pps.toml``
|
||||||
bsuid, msg = pack_position(pos)
|
bsuid, msg = pack_position(pos)
|
||||||
|
|
||||||
acctid = msg.account = accounts_def.inverse[msg.account]
|
acctid = msg.account = accounts_def.inverse[msg.account]
|
||||||
acctid = acctid.strip('ib.')
|
acctid = acctid.strip('ib.')
|
||||||
cids2pps[(acctid, bsuid)] = msg
|
cids2pps[(acctid, bsuid)] = msg
|
||||||
|
|
@ -493,9 +557,7 @@ async def trades_dialogue(
|
||||||
or pp.size != msg.size
|
or pp.size != msg.size
|
||||||
):
|
):
|
||||||
trans = norm_trade_records(ledger)
|
trans = norm_trade_records(ledger)
|
||||||
updated = table.update_from_trans(trans)
|
table.update_from_trans(trans)
|
||||||
pp = updated[bsuid]
|
|
||||||
|
|
||||||
# update trades ledgers for all accounts from connected
|
# update trades ledgers for all accounts from connected
|
||||||
# api clients which report trades for **this session**.
|
# api clients which report trades for **this session**.
|
||||||
trades = await proxy.trades()
|
trades = await proxy.trades()
|
||||||
|
|
@ -521,9 +583,28 @@ async def trades_dialogue(
|
||||||
trans = trans_by_acct.get(acctid)
|
trans = trans_by_acct.get(acctid)
|
||||||
if trans:
|
if trans:
|
||||||
table.update_from_trans(trans)
|
table.update_from_trans(trans)
|
||||||
updated = table.update_from_trans(trans)
|
|
||||||
|
|
||||||
assert msg.size == pp.size, 'WTF'
|
# XXX: not sure exactly why it wouldn't be in
|
||||||
|
# the updated output (maybe this is a bug?) but
|
||||||
|
# if you create a pos from TWS and then load it
|
||||||
|
# from the api trades it seems we get a key
|
||||||
|
# error from ``update[bsuid]`` ?
|
||||||
|
pp = table.pps.get(bsuid)
|
||||||
|
if not pp:
|
||||||
|
log.error(
|
||||||
|
f'The contract id for {msg} may have '
|
||||||
|
f'changed to {bsuid}\nYou may need to '
|
||||||
|
'adjust your ledger for this, skipping '
|
||||||
|
'for now.'
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
|
||||||
|
if msg.size != pp.size:
|
||||||
|
log.error(
|
||||||
|
'Position mismatch {pp.symbol.front_fqsn()}:\n'
|
||||||
|
f'ib: {msg.size}\n'
|
||||||
|
f'piker: {pp.size}\n'
|
||||||
|
)
|
||||||
|
|
||||||
active_pps, closed_pps = table.dump_active()
|
active_pps, closed_pps = table.dump_active()
|
||||||
|
|
||||||
|
|
@ -575,6 +656,10 @@ async def trades_dialogue(
|
||||||
ctx.open_stream() as ems_stream,
|
ctx.open_stream() as ems_stream,
|
||||||
trio.open_nursery() as n,
|
trio.open_nursery() as n,
|
||||||
):
|
):
|
||||||
|
# relay existing open orders to ems
|
||||||
|
for msg in order_msgs:
|
||||||
|
await ems_stream.send(msg)
|
||||||
|
|
||||||
trade_event_stream = await n.start(open_trade_event_stream)
|
trade_event_stream = await n.start(open_trade_event_stream)
|
||||||
clients.append((client, trade_event_stream))
|
clients.append((client, trade_event_stream))
|
||||||
|
|
||||||
|
|
@ -586,6 +671,7 @@ async def trades_dialogue(
|
||||||
for client, stream in clients:
|
for client, stream in clients:
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
deliver_trade_events,
|
deliver_trade_events,
|
||||||
|
n,
|
||||||
stream,
|
stream,
|
||||||
ems_stream,
|
ems_stream,
|
||||||
accounts_def,
|
accounts_def,
|
||||||
|
|
@ -661,8 +747,24 @@ async def emit_pp_update(
|
||||||
await ems_stream.send(msg)
|
await ems_stream.send(msg)
|
||||||
|
|
||||||
|
|
||||||
|
_statuses: dict[str, str] = {
|
||||||
|
'cancelled': 'canceled',
|
||||||
|
'submitted': 'open',
|
||||||
|
# XXX: just pass these through? it duplicates actual fill events other
|
||||||
|
# then the case where you the `.remaining == 0` case which is our
|
||||||
|
# 'closed'` case.
|
||||||
|
# 'filled': 'pending',
|
||||||
|
# 'pendingsubmit': 'pending',
|
||||||
|
|
||||||
|
# TODO: see a current ``ib_insync`` issue around this:
|
||||||
|
# https://github.com/erdewit/ib_insync/issues/363
|
||||||
|
'inactive': 'pending',
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
async def deliver_trade_events(
|
async def deliver_trade_events(
|
||||||
|
|
||||||
|
nurse: trio.Nursery,
|
||||||
trade_event_stream: trio.MemoryReceiveChannel,
|
trade_event_stream: trio.MemoryReceiveChannel,
|
||||||
ems_stream: tractor.MsgStream,
|
ems_stream: tractor.MsgStream,
|
||||||
accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'`
|
accounts_def: dict[str, str], # eg. `'ib.main'` -> `'DU999999'`
|
||||||
|
|
@ -718,6 +820,45 @@ async def deliver_trade_events(
|
||||||
# unwrap needed data from ib_insync internal types
|
# unwrap needed data from ib_insync internal types
|
||||||
trade: Trade = item
|
trade: Trade = item
|
||||||
status: OrderStatus = trade.orderStatus
|
status: OrderStatus = trade.orderStatus
|
||||||
|
ib_status_key = status.status.lower()
|
||||||
|
|
||||||
|
acctid = accounts_def.inverse[trade.order.account]
|
||||||
|
|
||||||
|
# double check there is no error when
|
||||||
|
# cancelling.. gawwwd
|
||||||
|
if ib_status_key == 'cancelled':
|
||||||
|
last_log = trade.log[-1]
|
||||||
|
if (
|
||||||
|
last_log.message
|
||||||
|
and 'Error' not in last_log.message
|
||||||
|
):
|
||||||
|
ib_status_key = trade.log[-2].status
|
||||||
|
print(ib_status_key)
|
||||||
|
|
||||||
|
elif ib_status_key == 'inactive':
|
||||||
|
async def sched_cancel():
|
||||||
|
log.warning(
|
||||||
|
'OH GAWD an inactive order..scheduling a cancel\n'
|
||||||
|
f'{pformat(item)}'
|
||||||
|
)
|
||||||
|
proxy = proxies[acctid]
|
||||||
|
await proxy.submit_cancel(reqid=trade.order.orderId)
|
||||||
|
await trio.sleep(1)
|
||||||
|
nurse.start_soon(sched_cancel)
|
||||||
|
|
||||||
|
nurse.start_soon(sched_cancel)
|
||||||
|
|
||||||
|
status_key = (
|
||||||
|
_statuses.get(ib_status_key)
|
||||||
|
or ib_status_key.lower()
|
||||||
|
)
|
||||||
|
|
||||||
|
remaining = status.remaining
|
||||||
|
if (
|
||||||
|
status_key == 'filled'
|
||||||
|
and remaining == 0
|
||||||
|
):
|
||||||
|
status_key = 'closed'
|
||||||
|
|
||||||
# skip duplicate filled updates - we get the deats
|
# skip duplicate filled updates - we get the deats
|
||||||
# from the execution details event
|
# from the execution details event
|
||||||
|
|
@ -728,14 +869,14 @@ async def deliver_trade_events(
|
||||||
account=accounts_def.inverse[trade.order.account],
|
account=accounts_def.inverse[trade.order.account],
|
||||||
|
|
||||||
# everyone doin camel case..
|
# everyone doin camel case..
|
||||||
status=status.status.lower(), # force lower case
|
status=status_key, # force lower case
|
||||||
|
|
||||||
filled=status.filled,
|
filled=status.filled,
|
||||||
reason=status.whyHeld,
|
reason=status.whyHeld,
|
||||||
|
|
||||||
# this seems to not be necessarily up to date in the
|
# this seems to not be necessarily up to date in the
|
||||||
# execDetails event.. so we have to send it here I guess?
|
# execDetails event.. so we have to send it here I guess?
|
||||||
remaining=status.remaining,
|
remaining=remaining,
|
||||||
|
|
||||||
broker_details={'name': 'ib'},
|
broker_details={'name': 'ib'},
|
||||||
)
|
)
|
||||||
|
|
@ -870,17 +1011,25 @@ async def deliver_trade_events(
|
||||||
if err['reqid'] == -1:
|
if err['reqid'] == -1:
|
||||||
log.error(f'TWS external order error:\n{pformat(err)}')
|
log.error(f'TWS external order error:\n{pformat(err)}')
|
||||||
|
|
||||||
# TODO: what schema for this msg if we're going to make it
|
# TODO: we don't want to relay data feed / lookup errors
|
||||||
# portable across all backends?
|
# so we need some further filtering logic here..
|
||||||
# msg = BrokerdError(**err)
|
# for most cases the 'status' block above should take
|
||||||
|
# care of this.
|
||||||
|
# await ems_stream.send(BrokerdStatus(
|
||||||
|
# status='error',
|
||||||
|
# reqid=err['reqid'],
|
||||||
|
# reason=err['reason'],
|
||||||
|
# time_ns=time.time_ns(),
|
||||||
|
# account=accounts_def.inverse[trade.order.account],
|
||||||
|
# broker_details={'name': 'ib'},
|
||||||
|
# ))
|
||||||
|
|
||||||
case 'position':
|
case 'position':
|
||||||
|
|
||||||
cid, msg = pack_position(item)
|
cid, msg = pack_position(item)
|
||||||
log.info(f'New IB position msg: {msg}')
|
log.info(f'New IB position msg: {msg}')
|
||||||
# acctid = msg.account = accounts_def.inverse[msg.account]
|
|
||||||
# cuck ib and it's shitty fifo sys for pps!
|
# cuck ib and it's shitty fifo sys for pps!
|
||||||
# await ems_stream.send(msg)
|
continue
|
||||||
|
|
||||||
case 'event':
|
case 'event':
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -101,3 +101,30 @@ def percent_change(
|
||||||
new: float,
|
new: float,
|
||||||
) -> float:
|
) -> float:
|
||||||
return pnl(init, new) * 100.
|
return pnl(init, new) * 100.
|
||||||
|
|
||||||
|
|
||||||
|
def diff_dict(
|
||||||
|
d1: dict,
|
||||||
|
d2: dict,
|
||||||
|
|
||||||
|
) -> dict:
|
||||||
|
d1_keys = set(d1.keys())
|
||||||
|
d2_keys = set(d2.keys())
|
||||||
|
shared_keys = d1_keys.intersection(d2_keys)
|
||||||
|
shared_deltas = {o: (d1[o], d2[o]) for o in shared_keys if d1[o] != d2[o]}
|
||||||
|
added_keys = d2_keys - d1_keys
|
||||||
|
added_deltas = {o: (None, d2[o]) for o in added_keys}
|
||||||
|
deltas = {**shared_deltas, **added_deltas}
|
||||||
|
return parse_deltas(deltas)
|
||||||
|
|
||||||
|
|
||||||
|
def parse_deltas(deltas: dict) -> dict:
|
||||||
|
res = {}
|
||||||
|
for k, v in deltas.items():
|
||||||
|
if isinstance(v[0], dict):
|
||||||
|
tmp = diff_dict(v[0], v[1])
|
||||||
|
if tmp:
|
||||||
|
res[k] = tmp
|
||||||
|
else:
|
||||||
|
res[k] = v[1]
|
||||||
|
return res
|
||||||
|
|
|
||||||
|
|
@ -83,7 +83,13 @@ class OrderBook:
|
||||||
"""Cancel an order (or alert) in the EMS.
|
"""Cancel an order (or alert) in the EMS.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
cmd = self._sent_orders[uuid]
|
cmd = self._sent_orders.get(uuid)
|
||||||
|
if not cmd:
|
||||||
|
log.error(
|
||||||
|
f'Unknown order {uuid}!?\n'
|
||||||
|
f'Maybe there is a stale entry or line?\n'
|
||||||
|
f'You should report this as a bug!'
|
||||||
|
)
|
||||||
msg = Cancel(
|
msg = Cancel(
|
||||||
oid=uuid,
|
oid=uuid,
|
||||||
symbol=cmd.symbol,
|
symbol=cmd.symbol,
|
||||||
|
|
@ -149,10 +155,17 @@ async def relay_order_cmds_from_sync_code(
|
||||||
book = get_orders()
|
book = get_orders()
|
||||||
async with book._from_order_book.subscribe() as orders_stream:
|
async with book._from_order_book.subscribe() as orders_stream:
|
||||||
async for cmd in orders_stream:
|
async for cmd in orders_stream:
|
||||||
if cmd.symbol == symbol_key:
|
sym = cmd.symbol
|
||||||
log.info(f'Send order cmd:\n{pformat(cmd)}')
|
msg = pformat(cmd)
|
||||||
|
if sym == symbol_key:
|
||||||
|
log.info(f'Send order cmd:\n{msg}')
|
||||||
# send msg over IPC / wire
|
# send msg over IPC / wire
|
||||||
await to_ems_stream.send(cmd)
|
await to_ems_stream.send(cmd)
|
||||||
|
else:
|
||||||
|
log.warning(
|
||||||
|
f'Ignoring unmatched order cmd for {sym} != {symbol_key}:'
|
||||||
|
f'\n{msg}'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@acm
|
@acm
|
||||||
|
|
@ -220,11 +233,19 @@ async def open_ems(
|
||||||
fqsn=fqsn,
|
fqsn=fqsn,
|
||||||
exec_mode=mode,
|
exec_mode=mode,
|
||||||
|
|
||||||
) as (ctx, (positions, accounts)),
|
) as (
|
||||||
|
ctx,
|
||||||
|
(
|
||||||
|
positions,
|
||||||
|
accounts,
|
||||||
|
dialogs,
|
||||||
|
)
|
||||||
|
),
|
||||||
|
|
||||||
# open 2-way trade command stream
|
# open 2-way trade command stream
|
||||||
ctx.open_stream() as trades_stream,
|
ctx.open_stream() as trades_stream,
|
||||||
):
|
):
|
||||||
|
# start sync code order msg delivery task
|
||||||
async with trio.open_nursery() as n:
|
async with trio.open_nursery() as n:
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
relay_order_cmds_from_sync_code,
|
relay_order_cmds_from_sync_code,
|
||||||
|
|
@ -232,4 +253,10 @@ async def open_ems(
|
||||||
trades_stream
|
trades_stream
|
||||||
)
|
)
|
||||||
|
|
||||||
yield book, trades_stream, positions, accounts
|
yield (
|
||||||
|
book,
|
||||||
|
trades_stream,
|
||||||
|
positions,
|
||||||
|
accounts,
|
||||||
|
dialogs,
|
||||||
|
)
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,8 @@
|
||||||
In da suit parlances: "Execution management systems"
|
In da suit parlances: "Execution management systems"
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
from collections import defaultdict, ChainMap
|
||||||
from contextlib import asynccontextmanager
|
from contextlib import asynccontextmanager
|
||||||
from dataclasses import dataclass, field
|
|
||||||
from math import isnan
|
from math import isnan
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
import time
|
import time
|
||||||
|
|
@ -27,6 +27,7 @@ from typing import (
|
||||||
AsyncIterator,
|
AsyncIterator,
|
||||||
Any,
|
Any,
|
||||||
Callable,
|
Callable,
|
||||||
|
Optional,
|
||||||
)
|
)
|
||||||
|
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
|
|
@ -41,9 +42,16 @@ from ..data.types import Struct
|
||||||
from .._daemon import maybe_spawn_brokerd
|
from .._daemon import maybe_spawn_brokerd
|
||||||
from . import _paper_engine as paper
|
from . import _paper_engine as paper
|
||||||
from ._messages import (
|
from ._messages import (
|
||||||
Status, Order,
|
Order,
|
||||||
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
|
Status,
|
||||||
BrokerdFill, BrokerdError, BrokerdPosition,
|
# Cancel,
|
||||||
|
BrokerdCancel,
|
||||||
|
BrokerdOrder,
|
||||||
|
# BrokerdOrderAck,
|
||||||
|
BrokerdStatus,
|
||||||
|
BrokerdFill,
|
||||||
|
BrokerdError,
|
||||||
|
BrokerdPosition,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -90,8 +98,7 @@ def mk_check(
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
class _DarkBook(Struct):
|
||||||
class _DarkBook:
|
|
||||||
'''
|
'''
|
||||||
EMS-trigger execution book.
|
EMS-trigger execution book.
|
||||||
|
|
||||||
|
|
@ -116,17 +123,24 @@ class _DarkBook:
|
||||||
dict, # cmd / msg type
|
dict, # cmd / msg type
|
||||||
]
|
]
|
||||||
]
|
]
|
||||||
] = field(default_factory=dict)
|
] = {}
|
||||||
|
|
||||||
# tracks most recent values per symbol each from data feed
|
# tracks most recent values per symbol each from data feed
|
||||||
lasts: dict[
|
lasts: dict[
|
||||||
str,
|
str,
|
||||||
float,
|
float,
|
||||||
] = field(default_factory=dict)
|
] = {}
|
||||||
|
|
||||||
# mapping of piker ems order ids to current brokerd order flow message
|
# _ems_entries: dict[str, str] = {}
|
||||||
_ems_entries: dict[str, str] = field(default_factory=dict)
|
_active: dict = {}
|
||||||
_ems2brokerd_ids: dict[str, str] = field(default_factory=bidict)
|
|
||||||
|
# mapping of ems dialog ids to msg flow history
|
||||||
|
_msgflows: defaultdict[
|
||||||
|
int,
|
||||||
|
ChainMap[dict[str, dict]],
|
||||||
|
] = defaultdict(ChainMap)
|
||||||
|
|
||||||
|
_ems2brokerd_ids: dict[str, str] = bidict()
|
||||||
|
|
||||||
|
|
||||||
# XXX: this is in place to prevent accidental positions that are too
|
# XXX: this is in place to prevent accidental positions that are too
|
||||||
|
|
@ -181,6 +195,7 @@ async def clear_dark_triggers(
|
||||||
for oid, (
|
for oid, (
|
||||||
pred,
|
pred,
|
||||||
tf,
|
tf,
|
||||||
|
# TODO: send this msg instead?
|
||||||
cmd,
|
cmd,
|
||||||
percent_away,
|
percent_away,
|
||||||
abs_diff_away
|
abs_diff_away
|
||||||
|
|
@ -188,9 +203,9 @@ async def clear_dark_triggers(
|
||||||
tuple(execs.items())
|
tuple(execs.items())
|
||||||
):
|
):
|
||||||
if (
|
if (
|
||||||
not pred or
|
not pred
|
||||||
ttype not in tf or
|
or ttype not in tf
|
||||||
not pred(price)
|
or not pred(price)
|
||||||
):
|
):
|
||||||
# log.runtime(
|
# log.runtime(
|
||||||
# f'skipping quote for {sym} '
|
# f'skipping quote for {sym} '
|
||||||
|
|
@ -200,30 +215,29 @@ async def clear_dark_triggers(
|
||||||
# majority of iterations will be non-matches
|
# majority of iterations will be non-matches
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
brokerd_msg: Optional[BrokerdOrder] = None
|
||||||
match cmd:
|
match cmd:
|
||||||
# alert: nothing to do but relay a status
|
# alert: nothing to do but relay a status
|
||||||
# back to the requesting ems client
|
# back to the requesting ems client
|
||||||
case {
|
case Order(action='alert'):
|
||||||
'action': 'alert',
|
resp = 'triggered'
|
||||||
}:
|
|
||||||
resp = 'alert_triggered'
|
|
||||||
|
|
||||||
# executable order submission
|
# executable order submission
|
||||||
case {
|
case Order(
|
||||||
'action': action,
|
action=action,
|
||||||
'symbol': symbol,
|
symbol=symbol,
|
||||||
'account': account,
|
account=account,
|
||||||
'size': size,
|
size=size,
|
||||||
}:
|
):
|
||||||
bfqsn: str = symbol.replace(f'.{broker}', '')
|
bfqsn: str = symbol.replace(f'.{broker}', '')
|
||||||
submit_price = price + abs_diff_away
|
submit_price = price + abs_diff_away
|
||||||
resp = 'dark_triggered' # hidden on client-side
|
resp = 'triggered' # hidden on client-side
|
||||||
|
|
||||||
log.info(
|
log.info(
|
||||||
f'Dark order triggered for price {price}\n'
|
f'Dark order triggered for price {price}\n'
|
||||||
f'Submitting order @ price {submit_price}')
|
f'Submitting order @ price {submit_price}')
|
||||||
|
|
||||||
live_req = BrokerdOrder(
|
brokerd_msg = BrokerdOrder(
|
||||||
action=action,
|
action=action,
|
||||||
oid=oid,
|
oid=oid,
|
||||||
account=account,
|
account=account,
|
||||||
|
|
@ -232,7 +246,8 @@ async def clear_dark_triggers(
|
||||||
price=submit_price,
|
price=submit_price,
|
||||||
size=size,
|
size=size,
|
||||||
)
|
)
|
||||||
await brokerd_orders_stream.send(live_req)
|
|
||||||
|
await brokerd_orders_stream.send(brokerd_msg)
|
||||||
|
|
||||||
# mark this entry as having sent an order
|
# mark this entry as having sent an order
|
||||||
# request. the entry will be replaced once the
|
# request. the entry will be replaced once the
|
||||||
|
|
@ -240,18 +255,19 @@ async def clear_dark_triggers(
|
||||||
# a ``BrokerdOrderAck`` msg including the
|
# a ``BrokerdOrderAck`` msg including the
|
||||||
# allocated unique ``BrokerdOrderAck.reqid`` key
|
# allocated unique ``BrokerdOrderAck.reqid`` key
|
||||||
# generated by the broker's own systems.
|
# generated by the broker's own systems.
|
||||||
book._ems_entries[oid] = live_req
|
# book._ems_entries[oid] = live_req
|
||||||
|
# book._msgflows[oid].maps.insert(0, live_req)
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
raise ValueError(f'Invalid dark book entry: {cmd}')
|
raise ValueError(f'Invalid dark book entry: {cmd}')
|
||||||
|
|
||||||
# fallthrough logic
|
# fallthrough logic
|
||||||
resp = Status(
|
status = Status(
|
||||||
oid=oid, # ems dialog id
|
oid=oid, # ems dialog id
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
resp=resp,
|
resp=resp,
|
||||||
trigger_price=price,
|
req=cmd,
|
||||||
brokerd_msg=cmd,
|
brokerd_msg=brokerd_msg,
|
||||||
)
|
)
|
||||||
|
|
||||||
# remove exec-condition from set
|
# remove exec-condition from set
|
||||||
|
|
@ -262,9 +278,18 @@ async def clear_dark_triggers(
|
||||||
f'pred for {oid} was already removed!?'
|
f'pred for {oid} was already removed!?'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# update actives
|
||||||
|
if cmd.action == 'alert':
|
||||||
|
# don't register the alert status (so it won't
|
||||||
|
# be reloaded by clients) since it's now
|
||||||
|
# complete / closed.
|
||||||
|
book._active.pop(oid)
|
||||||
|
else:
|
||||||
|
book._active[oid] = status
|
||||||
|
|
||||||
# send response to client-side
|
# send response to client-side
|
||||||
try:
|
try:
|
||||||
await ems_client_order_stream.send(resp)
|
await ems_client_order_stream.send(status)
|
||||||
except (
|
except (
|
||||||
trio.ClosedResourceError,
|
trio.ClosedResourceError,
|
||||||
):
|
):
|
||||||
|
|
@ -281,8 +306,7 @@ async def clear_dark_triggers(
|
||||||
# print(f'execs scan took: {time.time() - start}')
|
# print(f'execs scan took: {time.time() - start}')
|
||||||
|
|
||||||
|
|
||||||
@dataclass
|
class TradesRelay(Struct):
|
||||||
class TradesRelay:
|
|
||||||
|
|
||||||
# for now we keep only a single connection open with
|
# for now we keep only a single connection open with
|
||||||
# each ``brokerd`` for simplicity.
|
# each ``brokerd`` for simplicity.
|
||||||
|
|
@ -318,7 +342,10 @@ class Router(Struct):
|
||||||
|
|
||||||
# order id to client stream map
|
# order id to client stream map
|
||||||
clients: set[tractor.MsgStream] = set()
|
clients: set[tractor.MsgStream] = set()
|
||||||
dialogues: dict[str, list[tractor.MsgStream]] = {}
|
dialogues: dict[
|
||||||
|
str,
|
||||||
|
list[tractor.MsgStream]
|
||||||
|
] = {}
|
||||||
|
|
||||||
# brokername to trades-dialogues streams with ``brokerd`` actors
|
# brokername to trades-dialogues streams with ``brokerd`` actors
|
||||||
relays: dict[str, TradesRelay] = {}
|
relays: dict[str, TradesRelay] = {}
|
||||||
|
|
@ -341,11 +368,12 @@ class Router(Struct):
|
||||||
loglevel: str,
|
loglevel: str,
|
||||||
|
|
||||||
) -> tuple[dict, tractor.MsgStream]:
|
) -> tuple[dict, tractor.MsgStream]:
|
||||||
'''Open and yield ``brokerd`` trades dialogue context-stream if none
|
'''
|
||||||
already exists.
|
Open and yield ``brokerd`` trades dialogue context-stream if
|
||||||
|
none already exists.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
relay = self.relays.get(feed.mod.name)
|
relay: TradesRelay = self.relays.get(feed.mod.name)
|
||||||
|
|
||||||
if (
|
if (
|
||||||
relay is None
|
relay is None
|
||||||
|
|
@ -381,6 +409,22 @@ class Router(Struct):
|
||||||
|
|
||||||
relay.consumers -= 1
|
relay.consumers -= 1
|
||||||
|
|
||||||
|
async def client_broadcast(
|
||||||
|
self,
|
||||||
|
msg: dict,
|
||||||
|
|
||||||
|
) -> None:
|
||||||
|
for client_stream in self.clients.copy():
|
||||||
|
try:
|
||||||
|
await client_stream.send(msg)
|
||||||
|
except(
|
||||||
|
trio.ClosedResourceError,
|
||||||
|
trio.BrokenResourceError,
|
||||||
|
):
|
||||||
|
self.clients.remove(client_stream)
|
||||||
|
log.warning(
|
||||||
|
f'client for {client_stream} was already closed?')
|
||||||
|
|
||||||
|
|
||||||
_router: Router = None
|
_router: Router = None
|
||||||
|
|
||||||
|
|
@ -452,7 +496,6 @@ async def open_brokerd_trades_dialogue(
|
||||||
async with (
|
async with (
|
||||||
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
|
open_trades_endpoint as (brokerd_ctx, (positions, accounts,)),
|
||||||
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
brokerd_ctx.open_stream() as brokerd_trades_stream,
|
||||||
|
|
||||||
):
|
):
|
||||||
# XXX: really we only want one stream per `emsd` actor
|
# XXX: really we only want one stream per `emsd` actor
|
||||||
# to relay global `brokerd` order events unless we're
|
# to relay global `brokerd` order events unless we're
|
||||||
|
|
@ -502,14 +545,9 @@ async def open_brokerd_trades_dialogue(
|
||||||
|
|
||||||
task_status.started(relay)
|
task_status.started(relay)
|
||||||
|
|
||||||
await translate_and_relay_brokerd_events(
|
|
||||||
broker,
|
|
||||||
brokerd_trades_stream,
|
|
||||||
_router,
|
|
||||||
)
|
|
||||||
|
|
||||||
# this context should block here indefinitely until
|
# this context should block here indefinitely until
|
||||||
# the ``brokerd`` task either dies or is cancelled
|
# the ``brokerd`` task either dies or is cancelled
|
||||||
|
await trio.sleep_forever()
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
# parent context must have been closed
|
# parent context must have been closed
|
||||||
|
|
@ -561,15 +599,14 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
broker ems
|
broker ems
|
||||||
'error' -> log it locally (for now)
|
'error' -> log it locally (for now)
|
||||||
'status' -> relabel as 'broker_<status>', if complete send 'executed'
|
('status' | 'fill'} -> relayed through see ``Status`` msg type.
|
||||||
'fill' -> 'broker_filled'
|
|
||||||
|
|
||||||
Currently handled status values from IB:
|
Currently handled status values from IB:
|
||||||
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
|
{'presubmitted', 'submitted', 'cancelled', 'inactive'}
|
||||||
|
|
||||||
'''
|
'''
|
||||||
book = router.get_dark_book(broker)
|
book: _DarkBook = router.get_dark_book(broker)
|
||||||
relay = router.relays[broker]
|
relay: TradesRelay = router.relays[broker]
|
||||||
|
|
||||||
assert relay.brokerd_dialogue == brokerd_trades_stream
|
assert relay.brokerd_dialogue == brokerd_trades_stream
|
||||||
|
|
||||||
|
|
@ -601,30 +638,16 @@ async def translate_and_relay_brokerd_events(
|
||||||
|
|
||||||
# fan-out-relay position msgs immediately by
|
# fan-out-relay position msgs immediately by
|
||||||
# broadcasting updates on all client streams
|
# broadcasting updates on all client streams
|
||||||
for client_stream in router.clients.copy():
|
await router.client_broadcast(pos_msg)
|
||||||
try:
|
|
||||||
await client_stream.send(pos_msg)
|
|
||||||
except(
|
|
||||||
trio.ClosedResourceError,
|
|
||||||
trio.BrokenResourceError,
|
|
||||||
):
|
|
||||||
router.clients.remove(client_stream)
|
|
||||||
log.warning(
|
|
||||||
f'client for {client_stream} was already closed?')
|
|
||||||
|
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# BrokerdOrderAck
|
# BrokerdOrderAck
|
||||||
|
# initial response to brokerd order request
|
||||||
case {
|
case {
|
||||||
'name': 'ack',
|
'name': 'ack',
|
||||||
'reqid': reqid, # brokerd generated order-request id
|
'reqid': reqid, # brokerd generated order-request id
|
||||||
'oid': oid, # ems order-dialog id
|
'oid': oid, # ems order-dialog id
|
||||||
} if (
|
}:
|
||||||
entry := book._ems_entries.get(oid)
|
|
||||||
):
|
|
||||||
# initial response to brokerd order request
|
|
||||||
# if name == 'ack':
|
|
||||||
|
|
||||||
# register the brokerd request id (that was generated
|
# register the brokerd request id (that was generated
|
||||||
# / created internally by the broker backend) with our
|
# / created internally by the broker backend) with our
|
||||||
# local ems order id for reverse lookup later.
|
# local ems order id for reverse lookup later.
|
||||||
|
|
@ -639,23 +662,24 @@ async def translate_and_relay_brokerd_events(
|
||||||
# new order which has not yet be registered into the
|
# new order which has not yet be registered into the
|
||||||
# local ems book, insert it now and handle 2 cases:
|
# local ems book, insert it now and handle 2 cases:
|
||||||
|
|
||||||
# - the order has previously been requested to be
|
# 1. the order has previously been requested to be
|
||||||
# cancelled by the ems controlling client before we
|
# cancelled by the ems controlling client before we
|
||||||
# received this ack, in which case we relay that cancel
|
# received this ack, in which case we relay that cancel
|
||||||
# signal **asap** to the backend broker
|
# signal **asap** to the backend broker
|
||||||
action = getattr(entry, 'action', None)
|
# status = book._active.get(oid)
|
||||||
if action and action == 'cancel':
|
status_msg = book._active[oid]
|
||||||
|
req = status_msg.req
|
||||||
|
if req and req.action == 'cancel':
|
||||||
# assign newly providerd broker backend request id
|
# assign newly providerd broker backend request id
|
||||||
entry.reqid = reqid
|
# and tell broker to cancel immediately
|
||||||
|
status_msg.reqid = reqid
|
||||||
|
await brokerd_trades_stream.send(req)
|
||||||
|
|
||||||
# tell broker to cancel immediately
|
# 2. the order is now active and will be mirrored in
|
||||||
await brokerd_trades_stream.send(entry)
|
|
||||||
|
|
||||||
# - the order is now active and will be mirrored in
|
|
||||||
# our book -> registered as live flow
|
# our book -> registered as live flow
|
||||||
else:
|
else:
|
||||||
# update the flow with the ack msg
|
# TODO: should we relay this ack state?
|
||||||
book._ems_entries[oid] = BrokerdOrderAck(**brokerd_msg)
|
status_msg.resp = 'pending'
|
||||||
|
|
||||||
# no msg to client necessary
|
# no msg to client necessary
|
||||||
continue
|
continue
|
||||||
|
|
@ -666,11 +690,9 @@ async def translate_and_relay_brokerd_events(
|
||||||
'oid': oid, # ems order-dialog id
|
'oid': oid, # ems order-dialog id
|
||||||
'reqid': reqid, # brokerd generated order-request id
|
'reqid': reqid, # brokerd generated order-request id
|
||||||
'symbol': sym,
|
'symbol': sym,
|
||||||
'broker_details': details,
|
} if status_msg := book._active.get(oid):
|
||||||
# 'reason': reason,
|
|
||||||
}:
|
|
||||||
msg = BrokerdError(**brokerd_msg)
|
msg = BrokerdError(**brokerd_msg)
|
||||||
resp = 'broker_errored'
|
|
||||||
log.error(pformat(msg)) # XXX make one when it's blank?
|
log.error(pformat(msg)) # XXX make one when it's blank?
|
||||||
|
|
||||||
# TODO: figure out how this will interact with EMS clients
|
# TODO: figure out how this will interact with EMS clients
|
||||||
|
|
@ -680,43 +702,64 @@ async def translate_and_relay_brokerd_events(
|
||||||
# some unexpected failure - something we need to think more
|
# some unexpected failure - something we need to think more
|
||||||
# about. In most default situations, with composed orders
|
# about. In most default situations, with composed orders
|
||||||
# (ex. brackets), most brokers seem to use a oca policy.
|
# (ex. brackets), most brokers seem to use a oca policy.
|
||||||
|
ems_client_order_stream = router.dialogues[oid]
|
||||||
|
status_msg.resp = 'error'
|
||||||
|
status_msg.brokerd_msg = msg
|
||||||
|
book._active[oid] = status_msg
|
||||||
|
await ems_client_order_stream.send(status_msg)
|
||||||
|
|
||||||
# BrokerdStatus
|
# BrokerdStatus
|
||||||
case {
|
case {
|
||||||
'name': 'status',
|
'name': 'status',
|
||||||
'status': status,
|
'status': status,
|
||||||
'reqid': reqid, # brokerd generated order-request id
|
'reqid': reqid, # brokerd generated order-request id
|
||||||
# TODO: feels like the wrong msg for this field?
|
|
||||||
'remaining': remaining,
|
|
||||||
|
|
||||||
} if (
|
} if (
|
||||||
oid := book._ems2brokerd_ids.inverse.get(reqid)
|
(oid := book._ems2brokerd_ids.inverse.get(reqid))
|
||||||
|
and status in (
|
||||||
|
'canceled',
|
||||||
|
'open',
|
||||||
|
'closed',
|
||||||
|
)
|
||||||
):
|
):
|
||||||
msg = BrokerdStatus(**brokerd_msg)
|
msg = BrokerdStatus(**brokerd_msg)
|
||||||
|
|
||||||
# TODO: should we flatten out these cases and/or should
|
# TODO: maybe pack this into a composite type that
|
||||||
# they maybe even eventually be separate messages?
|
# contains both the IPC stream as well the
|
||||||
if status == 'cancelled':
|
# msg-chain/dialog.
|
||||||
log.info(f'Cancellation for {oid} is complete!')
|
ems_client_order_stream = router.dialogues[oid]
|
||||||
|
status_msg = book._active[oid]
|
||||||
|
old_resp = status_msg.resp
|
||||||
|
status_msg.resp = status
|
||||||
|
|
||||||
if status == 'filled':
|
# retrieve existing live flow
|
||||||
# conditional execution is fully complete, no more
|
old_reqid = status_msg.reqid
|
||||||
# fills for the noted order
|
if old_reqid and old_reqid != reqid:
|
||||||
if not remaining:
|
log.warning(
|
||||||
|
f'Brokerd order id change for {oid}:\n'
|
||||||
|
f'{old_reqid} -> {reqid}'
|
||||||
|
)
|
||||||
|
|
||||||
resp = 'broker_executed'
|
status_msg.reqid = reqid # THIS LINE IS CRITICAL!
|
||||||
|
status_msg.brokerd_msg = msg
|
||||||
|
status_msg.src = msg.broker_details['name']
|
||||||
|
await ems_client_order_stream.send(status_msg)
|
||||||
|
|
||||||
# be sure to pop this stream from our dialogue set
|
if status == 'closed':
|
||||||
# since the order dialogue should be done.
|
log.info(f'Execution for {oid} is complete!')
|
||||||
log.info(f'Execution for {oid} is complete!')
|
|
||||||
|
|
||||||
|
# only if we already rxed a fill then probably
|
||||||
|
# this clear is fully complete? (frickin ib..)
|
||||||
|
if old_resp == 'fill':
|
||||||
|
status_msg = book._active.pop(oid)
|
||||||
|
|
||||||
|
elif status == 'canceled':
|
||||||
|
log.cancel(f'Cancellation for {oid} is complete!')
|
||||||
|
|
||||||
|
else: # open
|
||||||
|
# relayed from backend but probably not handled so
|
||||||
# just log it
|
# just log it
|
||||||
else:
|
log.info(f'{broker} opened order {msg}')
|
||||||
log.info(f'{broker} filled {msg}')
|
|
||||||
|
|
||||||
else:
|
|
||||||
# one of {submitted, cancelled}
|
|
||||||
resp = 'broker_' + msg.status
|
|
||||||
|
|
||||||
# BrokerdFill
|
# BrokerdFill
|
||||||
case {
|
case {
|
||||||
|
|
@ -728,82 +771,111 @@ async def translate_and_relay_brokerd_events(
|
||||||
):
|
):
|
||||||
# proxy through the "fill" result(s)
|
# proxy through the "fill" result(s)
|
||||||
msg = BrokerdFill(**brokerd_msg)
|
msg = BrokerdFill(**brokerd_msg)
|
||||||
resp = 'broker_filled'
|
log.info(f'Fill for {oid} cleared with:\n{pformat(msg)}')
|
||||||
log.info(f'\nFill for {oid} cleared with:\n{pformat(resp)}')
|
|
||||||
|
|
||||||
# unknown valid message case?
|
ems_client_order_stream = router.dialogues[oid]
|
||||||
# case {
|
|
||||||
# 'name': name,
|
|
||||||
# 'symbol': sym,
|
|
||||||
# 'reqid': reqid, # brokerd generated order-request id
|
|
||||||
# # 'oid': oid, # ems order-dialog id
|
|
||||||
# 'broker_details': details,
|
|
||||||
|
|
||||||
# } if (
|
# wtf a fill can come after 'closed' from ib?
|
||||||
# book._ems2brokerd_ids.inverse.get(reqid) is None
|
status_msg = book._active[oid]
|
||||||
# ):
|
|
||||||
# # TODO: pretty sure we can drop this now?
|
|
||||||
|
|
||||||
# # XXX: paper clearing special cases
|
# only if we already rxed a 'closed'
|
||||||
# # paper engine race case: ``Client.submit_limit()`` hasn't
|
# this clear is fully complete? (frickin ib..)
|
||||||
# # returned yet and provided an output reqid to register
|
# if status_msg.resp == 'closed':
|
||||||
# # locally, so we need to retreive the oid that was already
|
# status_msg = book._active.pop(oid)
|
||||||
# # packed at submission since we already know it ahead of
|
|
||||||
# # time
|
|
||||||
# paper = details.get('paper_info')
|
|
||||||
# ext = details.get('external')
|
|
||||||
|
|
||||||
# if paper:
|
status_msg.resp = 'fill'
|
||||||
# # paperboi keeps the ems id up front
|
status_msg.reqid = reqid
|
||||||
# oid = paper['oid']
|
status_msg.brokerd_msg = msg
|
||||||
|
await ems_client_order_stream.send(status_msg)
|
||||||
|
|
||||||
# elif ext:
|
# ``Status`` containing an embedded order msg which
|
||||||
# # may be an order msg specified as "external" to the
|
# should be loaded as a "pre-existing open order" from the
|
||||||
# # piker ems flow (i.e. generated by some other
|
# brokerd backend.
|
||||||
# # external broker backend client (like tws for ib)
|
case {
|
||||||
# log.error(f"External trade event {name}@{ext}")
|
'name': 'status',
|
||||||
|
'resp': status,
|
||||||
|
'reqid': reqid, # brokerd generated order-request id
|
||||||
|
}:
|
||||||
|
if (
|
||||||
|
status != 'open'
|
||||||
|
):
|
||||||
|
# TODO: check for an oid we might know since it was
|
||||||
|
# registered from a previous order/status load?
|
||||||
|
log.error(
|
||||||
|
f'Unknown/transient status msg:\n'
|
||||||
|
f'{pformat(brokerd_msg)}\n'
|
||||||
|
'Unable to relay message to client side!?'
|
||||||
|
)
|
||||||
|
|
||||||
# else:
|
# TODO: we probably want some kind of "tagging" system
|
||||||
# # something is out of order, we don't have an oid for
|
# for external order submissions like this eventually
|
||||||
# # this broker-side message.
|
# to be able to more formally handle multi-player
|
||||||
# log.error(
|
# trading...
|
||||||
# f'Unknown oid: {oid} for msg {name}:\n'
|
else:
|
||||||
# f'{pformat(brokerd_msg)}\n'
|
# existing open backend order which we broadcast to
|
||||||
# 'Unable to relay message to client side!?'
|
# all currently connected clients.
|
||||||
# )
|
log.info(
|
||||||
|
f'Relaying existing open order:\n {brokerd_msg}'
|
||||||
|
)
|
||||||
|
|
||||||
# continue
|
# use backend request id as our ems id though this
|
||||||
|
# may end up with collisions?
|
||||||
|
status_msg = Status(**brokerd_msg)
|
||||||
|
order = Order(**status_msg.req)
|
||||||
|
assert order.price and order.size
|
||||||
|
status_msg.req = order
|
||||||
|
|
||||||
|
assert status_msg.src # source tag?
|
||||||
|
oid = str(status_msg.reqid)
|
||||||
|
|
||||||
|
# attempt to avoid collisions
|
||||||
|
status_msg.reqid = oid
|
||||||
|
assert status_msg.resp == 'open'
|
||||||
|
|
||||||
|
# register this existing broker-side dialog
|
||||||
|
book._ems2brokerd_ids[oid] = reqid
|
||||||
|
book._active[oid] = status_msg
|
||||||
|
|
||||||
|
# fan-out-relay position msgs immediately by
|
||||||
|
# broadcasting updates on all client streams
|
||||||
|
await router.client_broadcast(status_msg)
|
||||||
|
|
||||||
|
# don't fall through
|
||||||
|
continue
|
||||||
|
|
||||||
|
# brokerd error
|
||||||
|
case {
|
||||||
|
'name': 'status',
|
||||||
|
'status': 'error',
|
||||||
|
}:
|
||||||
|
log.error(f'Broker error:\n{pformat(brokerd_msg)}')
|
||||||
|
# XXX: we presume the brokerd cancels its own order
|
||||||
|
|
||||||
|
# TOO FAST ``BrokerdStatus`` that arrives
|
||||||
|
# before the ``BrokerdAck``.
|
||||||
|
case {
|
||||||
|
# XXX: sometimes there is a race with the backend (like
|
||||||
|
# `ib` where the pending stauts will be related before
|
||||||
|
# the ack, in which case we just ignore the faster
|
||||||
|
# pending msg and wait for our expected ack to arrive
|
||||||
|
# later (i.e. the first block below should enter).
|
||||||
|
'name': 'status',
|
||||||
|
'status': status,
|
||||||
|
'reqid': reqid,
|
||||||
|
}:
|
||||||
|
status_msg = book._active[oid]
|
||||||
|
log.warning(
|
||||||
|
'Unhandled broker status for dialog:\n'
|
||||||
|
f'{pformat(status_msg)}\n'
|
||||||
|
f'{pformat(brokerd_msg)}\n'
|
||||||
|
)
|
||||||
|
|
||||||
case _:
|
case _:
|
||||||
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
|
raise ValueError(f'Brokerd message {brokerd_msg} is invalid')
|
||||||
|
|
||||||
# retrieve existing live flow
|
# XXX: ugh sometimes we don't access it?
|
||||||
entry = book._ems_entries[oid]
|
if status_msg:
|
||||||
assert entry.oid == oid
|
del status_msg
|
||||||
|
|
||||||
old_reqid = entry.reqid
|
|
||||||
if old_reqid and old_reqid != reqid:
|
|
||||||
log.warning(
|
|
||||||
f'Brokerd order id change for {oid}:\n'
|
|
||||||
f'{old_reqid} -> {reqid}'
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create and relay response status message
|
|
||||||
# to requesting EMS client
|
|
||||||
try:
|
|
||||||
ems_client_order_stream = router.dialogues[oid]
|
|
||||||
await ems_client_order_stream.send(
|
|
||||||
Status(
|
|
||||||
oid=oid,
|
|
||||||
resp=resp,
|
|
||||||
time_ns=time.time_ns(),
|
|
||||||
broker_reqid=reqid,
|
|
||||||
brokerd_msg=msg,
|
|
||||||
)
|
|
||||||
)
|
|
||||||
except KeyError:
|
|
||||||
log.error(
|
|
||||||
f'Received `brokerd` msg for unknown client with oid: {oid}')
|
|
||||||
|
|
||||||
# TODO: do we want this to keep things cleaned up?
|
# TODO: do we want this to keep things cleaned up?
|
||||||
# it might require a special status from brokerd to affirm the
|
# it might require a special status from brokerd to affirm the
|
||||||
|
|
@ -829,27 +901,36 @@ async def process_client_order_cmds(
|
||||||
async for cmd in client_order_stream:
|
async for cmd in client_order_stream:
|
||||||
log.info(f'Received order cmd:\n{pformat(cmd)}')
|
log.info(f'Received order cmd:\n{pformat(cmd)}')
|
||||||
|
|
||||||
oid = cmd['oid']
|
# CAWT DAMN we need struct support!
|
||||||
|
oid = str(cmd['oid'])
|
||||||
|
|
||||||
# register this stream as an active dialogue for this order id
|
# register this stream as an active dialogue for this order id
|
||||||
# such that translated message from the brokerd backend can be
|
# such that translated message from the brokerd backend can be
|
||||||
# routed (relayed) to **just** that client stream (and in theory
|
# routed (relayed) to **just** that client stream (and in theory
|
||||||
# others who are registered for such order affiliated msgs).
|
# others who are registered for such order affiliated msgs).
|
||||||
client_dialogues[oid] = client_order_stream
|
client_dialogues[oid] = client_order_stream
|
||||||
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
|
reqid = dark_book._ems2brokerd_ids.inverse.get(oid)
|
||||||
live_entry = dark_book._ems_entries.get(oid)
|
|
||||||
|
# any dark/live status which is current
|
||||||
|
status = dark_book._active.get(oid)
|
||||||
|
|
||||||
match cmd:
|
match cmd:
|
||||||
# existing live-broker order cancel
|
# existing live-broker order cancel
|
||||||
case {
|
case {
|
||||||
'action': 'cancel',
|
'action': 'cancel',
|
||||||
'oid': oid,
|
'oid': oid,
|
||||||
} if live_entry:
|
} if (
|
||||||
reqid = live_entry.reqid
|
(status := dark_book._active.get(oid))
|
||||||
msg = BrokerdCancel(
|
and status.resp in ('open', 'pending')
|
||||||
|
):
|
||||||
|
reqid = status.reqid
|
||||||
|
order = status.req
|
||||||
|
to_brokerd_msg = BrokerdCancel(
|
||||||
oid=oid,
|
oid=oid,
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
account=live_entry.account,
|
# account=live_entry.account,
|
||||||
|
account=order.account,
|
||||||
)
|
)
|
||||||
|
|
||||||
# NOTE: cancel response will be relayed back in messages
|
# NOTE: cancel response will be relayed back in messages
|
||||||
|
|
@ -859,39 +940,53 @@ async def process_client_order_cmds(
|
||||||
log.info(
|
log.info(
|
||||||
f'Submitting cancel for live order {reqid}'
|
f'Submitting cancel for live order {reqid}'
|
||||||
)
|
)
|
||||||
await brokerd_order_stream.send(msg)
|
await brokerd_order_stream.send(to_brokerd_msg)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# this might be a cancel for an order that hasn't been
|
# this might be a cancel for an order that hasn't been
|
||||||
# acked yet by a brokerd, so register a cancel for when
|
# acked yet by a brokerd, so register a cancel for when
|
||||||
# the order ack does show up later such that the brokerd
|
# the order ack does show up later such that the brokerd
|
||||||
# order request can be cancelled at that time.
|
# order request can be cancelled at that time.
|
||||||
dark_book._ems_entries[oid] = msg
|
# dark_book._ems_entries[oid] = msg
|
||||||
|
# special case for now..
|
||||||
|
status.req = to_brokerd_msg
|
||||||
|
|
||||||
# dark trigger cancel
|
# dark trigger cancel
|
||||||
case {
|
case {
|
||||||
'action': 'cancel',
|
'action': 'cancel',
|
||||||
'oid': oid,
|
'oid': oid,
|
||||||
} if not live_entry:
|
} if (
|
||||||
try:
|
status and status.resp == 'dark_open'
|
||||||
# remove from dark book clearing
|
# or status and status.req
|
||||||
dark_book.orders[symbol].pop(oid, None)
|
):
|
||||||
|
# remove from dark book clearing
|
||||||
|
entry = dark_book.orders[symbol].pop(oid, None)
|
||||||
|
if entry:
|
||||||
|
(
|
||||||
|
pred,
|
||||||
|
tickfilter,
|
||||||
|
cmd,
|
||||||
|
percent_away,
|
||||||
|
abs_diff_away
|
||||||
|
) = entry
|
||||||
|
|
||||||
# tell client side that we've cancelled the
|
# tell client side that we've cancelled the
|
||||||
# dark-trigger order
|
# dark-trigger order
|
||||||
await client_order_stream.send(
|
status.resp = 'canceled'
|
||||||
Status(
|
status.req = cmd
|
||||||
resp='dark_cancelled',
|
|
||||||
oid=oid,
|
await client_order_stream.send(status)
|
||||||
time_ns=time.time_ns(),
|
|
||||||
)
|
|
||||||
)
|
|
||||||
# de-register this client dialogue
|
# de-register this client dialogue
|
||||||
router.dialogues.pop(oid)
|
router.dialogues.pop(oid)
|
||||||
|
dark_book._active.pop(oid)
|
||||||
|
|
||||||
except KeyError:
|
else:
|
||||||
log.exception(f'No dark order for {symbol}?')
|
log.exception(f'No dark order for {symbol}?')
|
||||||
|
|
||||||
|
# TODO: eventually we should be receiving
|
||||||
|
# this struct on the wire unpacked in a scoped protocol
|
||||||
|
# setup with ``tractor``.
|
||||||
|
|
||||||
# live order submission
|
# live order submission
|
||||||
case {
|
case {
|
||||||
'oid': oid,
|
'oid': oid,
|
||||||
|
|
@ -899,11 +994,9 @@ async def process_client_order_cmds(
|
||||||
'price': trigger_price,
|
'price': trigger_price,
|
||||||
'size': size,
|
'size': size,
|
||||||
'action': ('buy' | 'sell') as action,
|
'action': ('buy' | 'sell') as action,
|
||||||
'exec_mode': 'live',
|
'exec_mode': ('live' | 'paper'),
|
||||||
}:
|
}:
|
||||||
# TODO: eventually we should be receiving
|
# TODO: relay this order msg directly?
|
||||||
# this struct on the wire unpacked in a scoped protocol
|
|
||||||
# setup with ``tractor``.
|
|
||||||
req = Order(**cmd)
|
req = Order(**cmd)
|
||||||
broker = req.brokers[0]
|
broker = req.brokers[0]
|
||||||
|
|
||||||
|
|
@ -912,13 +1005,13 @@ async def process_client_order_cmds(
|
||||||
# aren't expectig their own name, but should they?
|
# aren't expectig their own name, but should they?
|
||||||
sym = fqsn.replace(f'.{broker}', '')
|
sym = fqsn.replace(f'.{broker}', '')
|
||||||
|
|
||||||
if live_entry is not None:
|
if status is not None:
|
||||||
# sanity check on emsd id
|
|
||||||
assert live_entry.oid == oid
|
|
||||||
reqid = live_entry.reqid
|
|
||||||
# if we already had a broker order id then
|
# if we already had a broker order id then
|
||||||
# this is likely an order update commmand.
|
# this is likely an order update commmand.
|
||||||
log.info(f"Modifying live {broker} order: {reqid}")
|
log.info(f"Modifying live {broker} order: {reqid}")
|
||||||
|
reqid = status.reqid
|
||||||
|
status.req = req
|
||||||
|
status.resp = 'pending'
|
||||||
|
|
||||||
msg = BrokerdOrder(
|
msg = BrokerdOrder(
|
||||||
oid=oid, # no ib support for oids...
|
oid=oid, # no ib support for oids...
|
||||||
|
|
@ -935,6 +1028,18 @@ async def process_client_order_cmds(
|
||||||
account=req.account,
|
account=req.account,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
if status is None:
|
||||||
|
status = Status(
|
||||||
|
oid=oid,
|
||||||
|
reqid=reqid,
|
||||||
|
resp='pending',
|
||||||
|
time_ns=time.time_ns(),
|
||||||
|
brokerd_msg=msg,
|
||||||
|
req=req,
|
||||||
|
)
|
||||||
|
|
||||||
|
dark_book._active[oid] = status
|
||||||
|
|
||||||
# send request to backend
|
# send request to backend
|
||||||
# XXX: the trades data broker response loop
|
# XXX: the trades data broker response loop
|
||||||
# (``translate_and_relay_brokerd_events()`` above) will
|
# (``translate_and_relay_brokerd_events()`` above) will
|
||||||
|
|
@ -950,7 +1055,7 @@ async def process_client_order_cmds(
|
||||||
# client, before that ack, when the ack does arrive we
|
# client, before that ack, when the ack does arrive we
|
||||||
# immediately take the reqid from the broker and cancel
|
# immediately take the reqid from the broker and cancel
|
||||||
# that live order asap.
|
# that live order asap.
|
||||||
dark_book._ems_entries[oid] = msg
|
# dark_book._msgflows[oid].maps.insert(0, msg.to_dict())
|
||||||
|
|
||||||
# dark-order / alert submission
|
# dark-order / alert submission
|
||||||
case {
|
case {
|
||||||
|
|
@ -966,9 +1071,11 @@ async def process_client_order_cmds(
|
||||||
# submit order to local EMS book and scan loop,
|
# submit order to local EMS book and scan loop,
|
||||||
# effectively a local clearing engine, which
|
# effectively a local clearing engine, which
|
||||||
# scans for conditions and triggers matching executions
|
# scans for conditions and triggers matching executions
|
||||||
exec_mode in ('dark', 'paper')
|
exec_mode in ('dark',)
|
||||||
or action == 'alert'
|
or action == 'alert'
|
||||||
):
|
):
|
||||||
|
req = Order(**cmd)
|
||||||
|
|
||||||
# Auto-gen scanner predicate:
|
# Auto-gen scanner predicate:
|
||||||
# we automatically figure out what the alert check
|
# we automatically figure out what the alert check
|
||||||
# condition should be based on the current first
|
# condition should be based on the current first
|
||||||
|
|
@ -1015,23 +1122,25 @@ async def process_client_order_cmds(
|
||||||
)[oid] = (
|
)[oid] = (
|
||||||
pred,
|
pred,
|
||||||
tickfilter,
|
tickfilter,
|
||||||
cmd,
|
req,
|
||||||
percent_away,
|
percent_away,
|
||||||
abs_diff_away
|
abs_diff_away
|
||||||
)
|
)
|
||||||
resp = 'dark_submitted'
|
resp = 'dark_open'
|
||||||
|
|
||||||
# alerts have special msgs to distinguish
|
# alerts have special msgs to distinguish
|
||||||
if action == 'alert':
|
# if action == 'alert':
|
||||||
resp = 'alert_submitted'
|
# resp = 'open'
|
||||||
|
|
||||||
await client_order_stream.send(
|
status = Status(
|
||||||
Status(
|
resp=resp,
|
||||||
resp=resp,
|
oid=oid,
|
||||||
oid=oid,
|
time_ns=time.time_ns(),
|
||||||
time_ns=time.time_ns(),
|
req=req,
|
||||||
)
|
src='dark',
|
||||||
)
|
)
|
||||||
|
dark_book._active[oid] = status
|
||||||
|
await client_order_stream.send(status)
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
@ -1099,10 +1208,9 @@ async def _emsd_main(
|
||||||
):
|
):
|
||||||
|
|
||||||
# XXX: this should be initial price quote from target provider
|
# XXX: this should be initial price quote from target provider
|
||||||
first_quote = feed.first_quotes[fqsn]
|
first_quote: dict = feed.first_quotes[fqsn]
|
||||||
|
book: _DarkBook = _router.get_dark_book(broker)
|
||||||
book = _router.get_dark_book(broker)
|
book.lasts[fqsn]: float = first_quote['last']
|
||||||
book.lasts[fqsn] = first_quote['last']
|
|
||||||
|
|
||||||
# open a stream with the brokerd backend for order
|
# open a stream with the brokerd backend for order
|
||||||
# flow dialogue
|
# flow dialogue
|
||||||
|
|
@ -1129,12 +1237,25 @@ async def _emsd_main(
|
||||||
await ems_ctx.started((
|
await ems_ctx.started((
|
||||||
relay.positions,
|
relay.positions,
|
||||||
list(relay.accounts),
|
list(relay.accounts),
|
||||||
|
book._active,
|
||||||
))
|
))
|
||||||
|
|
||||||
# establish 2-way stream with requesting order-client and
|
# establish 2-way stream with requesting order-client and
|
||||||
# begin handling inbound order requests and updates
|
# begin handling inbound order requests and updates
|
||||||
async with ems_ctx.open_stream() as ems_client_order_stream:
|
async with ems_ctx.open_stream() as ems_client_order_stream:
|
||||||
|
|
||||||
|
# register the client side before startingn the
|
||||||
|
# brokerd-side relay task to ensure the client is
|
||||||
|
# delivered all exisiting open orders on startup.
|
||||||
|
_router.clients.add(ems_client_order_stream)
|
||||||
|
|
||||||
|
n.start_soon(
|
||||||
|
translate_and_relay_brokerd_events,
|
||||||
|
broker,
|
||||||
|
brokerd_stream,
|
||||||
|
_router,
|
||||||
|
)
|
||||||
|
|
||||||
# trigger scan and exec loop
|
# trigger scan and exec loop
|
||||||
n.start_soon(
|
n.start_soon(
|
||||||
clear_dark_triggers,
|
clear_dark_triggers,
|
||||||
|
|
@ -1149,7 +1270,6 @@ async def _emsd_main(
|
||||||
|
|
||||||
# start inbound (from attached client) order request processing
|
# start inbound (from attached client) order request processing
|
||||||
try:
|
try:
|
||||||
_router.clients.add(ems_client_order_stream)
|
|
||||||
|
|
||||||
# main entrypoint, run here until cancelled.
|
# main entrypoint, run here until cancelled.
|
||||||
await process_client_order_cmds(
|
await process_client_order_cmds(
|
||||||
|
|
|
||||||
|
|
@ -18,24 +18,92 @@
|
||||||
Clearing sub-system message and protocols.
|
Clearing sub-system message and protocols.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
from typing import Optional, Union
|
# from collections import (
|
||||||
|
# ChainMap,
|
||||||
|
# deque,
|
||||||
|
# )
|
||||||
|
from typing import (
|
||||||
|
Optional,
|
||||||
|
Literal,
|
||||||
|
)
|
||||||
|
|
||||||
from ..data._source import Symbol
|
from ..data._source import Symbol
|
||||||
from ..data.types import Struct
|
from ..data.types import Struct
|
||||||
|
|
||||||
|
|
||||||
|
# TODO: a composite for tracking msg flow on 2-legged
|
||||||
|
# dialogs.
|
||||||
|
# class Dialog(ChainMap):
|
||||||
|
# '''
|
||||||
|
# Msg collection abstraction to easily track the state changes of
|
||||||
|
# a msg flow in one high level, query-able and immutable construct.
|
||||||
|
|
||||||
|
# The main use case is to query data from a (long-running)
|
||||||
|
# msg-transaction-sequence
|
||||||
|
|
||||||
|
|
||||||
|
# '''
|
||||||
|
# def update(
|
||||||
|
# self,
|
||||||
|
# msg,
|
||||||
|
# ) -> None:
|
||||||
|
# self.maps.insert(0, msg.to_dict())
|
||||||
|
|
||||||
|
# def flatten(self) -> dict:
|
||||||
|
# return dict(self)
|
||||||
|
|
||||||
|
|
||||||
# TODO: ``msgspec`` stuff worth paying attention to:
|
# TODO: ``msgspec`` stuff worth paying attention to:
|
||||||
# - schema evolution: https://jcristharif.com/msgspec/usage.html#schema-evolution
|
# - schema evolution:
|
||||||
|
# https://jcristharif.com/msgspec/usage.html#schema-evolution
|
||||||
|
# - for eg. ``BrokerdStatus``, instead just have separate messages?
|
||||||
# - use literals for a common msg determined by diff keys?
|
# - use literals for a common msg determined by diff keys?
|
||||||
# - https://jcristharif.com/msgspec/usage.html#literal
|
# - https://jcristharif.com/msgspec/usage.html#literal
|
||||||
# - for eg. ``BrokerdStatus``, instead just have separate messages?
|
|
||||||
|
|
||||||
# --------------
|
# --------------
|
||||||
# Client -> emsd
|
# Client -> emsd
|
||||||
# --------------
|
# --------------
|
||||||
|
|
||||||
|
class Order(Struct):
|
||||||
|
|
||||||
|
# TODO: ideally we can combine these 2 fields into
|
||||||
|
# 1 and just use the size polarity to determine a buy/sell.
|
||||||
|
# i would like to see this become more like
|
||||||
|
# https://jcristharif.com/msgspec/usage.html#literal
|
||||||
|
# action: Literal[
|
||||||
|
# 'live',
|
||||||
|
# 'dark',
|
||||||
|
# 'alert',
|
||||||
|
# ]
|
||||||
|
|
||||||
|
action: Literal[
|
||||||
|
'buy',
|
||||||
|
'sell',
|
||||||
|
'alert',
|
||||||
|
]
|
||||||
|
# determines whether the create execution
|
||||||
|
# will be submitted to the ems or directly to
|
||||||
|
# the backend broker
|
||||||
|
exec_mode: Literal[
|
||||||
|
'dark',
|
||||||
|
'live',
|
||||||
|
# 'paper', no right?
|
||||||
|
]
|
||||||
|
|
||||||
|
# internal ``emdsd`` unique "order id"
|
||||||
|
oid: str # uuid4
|
||||||
|
symbol: str | Symbol
|
||||||
|
account: str # should we set a default as '' ?
|
||||||
|
|
||||||
|
price: float
|
||||||
|
size: float # -ve is "sell", +ve is "buy"
|
||||||
|
|
||||||
|
brokers: Optional[list[str]] = []
|
||||||
|
|
||||||
|
|
||||||
class Cancel(Struct):
|
class Cancel(Struct):
|
||||||
'''Cancel msg for removing a dark (ems triggered) or
|
'''
|
||||||
|
Cancel msg for removing a dark (ems triggered) or
|
||||||
broker-submitted (live) trigger/order.
|
broker-submitted (live) trigger/order.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
|
|
@ -44,32 +112,6 @@ class Cancel(Struct):
|
||||||
symbol: str
|
symbol: str
|
||||||
|
|
||||||
|
|
||||||
class Order(Struct):
|
|
||||||
|
|
||||||
# TODO: use ``msgspec.Literal``
|
|
||||||
# https://jcristharif.com/msgspec/usage.html#literal
|
|
||||||
action: str # {'buy', 'sell', 'alert'}
|
|
||||||
# internal ``emdsd`` unique "order id"
|
|
||||||
oid: str # uuid4
|
|
||||||
symbol: Union[str, Symbol]
|
|
||||||
account: str # should we set a default as '' ?
|
|
||||||
|
|
||||||
price: float
|
|
||||||
# TODO: could we drop the ``.action`` field above and instead just
|
|
||||||
# use +/- values here? Would make the msg smaller at the sake of a
|
|
||||||
# teensie fp precision?
|
|
||||||
size: float
|
|
||||||
brokers: list[str]
|
|
||||||
|
|
||||||
# Assigned once initial ack is received
|
|
||||||
# ack_time_ns: Optional[int] = None
|
|
||||||
|
|
||||||
# determines whether the create execution
|
|
||||||
# will be submitted to the ems or directly to
|
|
||||||
# the backend broker
|
|
||||||
exec_mode: str # {'dark', 'live', 'paper'}
|
|
||||||
|
|
||||||
|
|
||||||
# --------------
|
# --------------
|
||||||
# Client <- emsd
|
# Client <- emsd
|
||||||
# --------------
|
# --------------
|
||||||
|
|
@ -79,37 +121,39 @@ class Order(Struct):
|
||||||
class Status(Struct):
|
class Status(Struct):
|
||||||
|
|
||||||
name: str = 'status'
|
name: str = 'status'
|
||||||
oid: str # uuid4
|
|
||||||
time_ns: int
|
time_ns: int
|
||||||
|
oid: str # uuid4 ems-order dialog id
|
||||||
|
|
||||||
# {
|
resp: Literal[
|
||||||
# 'dark_submitted',
|
'pending', # acked by broker but not yet open
|
||||||
# 'dark_cancelled',
|
'open',
|
||||||
# 'dark_triggered',
|
'dark_open', # dark/algo triggered order is open in ems clearing loop
|
||||||
|
'triggered', # above triggered order sent to brokerd, or an alert closed
|
||||||
# 'broker_submitted',
|
'closed', # fully cleared all size/units
|
||||||
# 'broker_cancelled',
|
'fill', # partial execution
|
||||||
# 'broker_executed',
|
'canceled',
|
||||||
# 'broker_filled',
|
'error',
|
||||||
# 'broker_errored',
|
]
|
||||||
|
|
||||||
# 'alert_submitted',
|
|
||||||
# 'alert_triggered',
|
|
||||||
|
|
||||||
# }
|
|
||||||
resp: str # "response", see above
|
|
||||||
|
|
||||||
# trigger info
|
|
||||||
trigger_price: Optional[float] = None
|
|
||||||
# price: float
|
|
||||||
|
|
||||||
# broker: Optional[str] = None
|
|
||||||
|
|
||||||
# this maps normally to the ``BrokerdOrder.reqid`` below, an id
|
# this maps normally to the ``BrokerdOrder.reqid`` below, an id
|
||||||
# normally allocated internally by the backend broker routing system
|
# normally allocated internally by the backend broker routing system
|
||||||
broker_reqid: Optional[Union[int, str]] = None
|
reqid: Optional[int | str] = None
|
||||||
|
|
||||||
# for relaying backend msg data "through" the ems layer
|
# the (last) source order/request msg if provided
|
||||||
|
# (eg. the Order/Cancel which causes this msg) and
|
||||||
|
# acts as a back-reference to the corresponding
|
||||||
|
# request message which was the source of this msg.
|
||||||
|
req: Optional[Order | Cancel] = None
|
||||||
|
|
||||||
|
# XXX: better design/name here?
|
||||||
|
# flag that can be set to indicate a message for an order
|
||||||
|
# event that wasn't originated by piker's emsd (eg. some external
|
||||||
|
# trading system which does it's own order control but that you
|
||||||
|
# might want to "track" using piker UIs/systems).
|
||||||
|
src: Optional[str] = None
|
||||||
|
|
||||||
|
# for relaying a boxed brokerd-dialog-side msg data "through" the
|
||||||
|
# ems layer to clients.
|
||||||
brokerd_msg: dict = {}
|
brokerd_msg: dict = {}
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -131,25 +175,28 @@ class BrokerdCancel(Struct):
|
||||||
# for setting a unique order id then this value will be relayed back
|
# for setting a unique order id then this value will be relayed back
|
||||||
# on the emsd order request stream as the ``BrokerdOrderAck.reqid``
|
# on the emsd order request stream as the ``BrokerdOrderAck.reqid``
|
||||||
# field
|
# field
|
||||||
reqid: Optional[Union[int, str]] = None
|
reqid: Optional[int | str] = None
|
||||||
|
|
||||||
|
|
||||||
class BrokerdOrder(Struct):
|
class BrokerdOrder(Struct):
|
||||||
|
|
||||||
action: str # {buy, sell}
|
|
||||||
oid: str
|
oid: str
|
||||||
account: str
|
account: str
|
||||||
time_ns: int
|
time_ns: int
|
||||||
|
|
||||||
|
# TODO: if we instead rely on a +ve/-ve size to determine
|
||||||
|
# the action we more or less don't need this field right?
|
||||||
|
action: str = '' # {buy, sell}
|
||||||
|
|
||||||
# "broker request id": broker specific/internal order id if this is
|
# "broker request id": broker specific/internal order id if this is
|
||||||
# None, creates a new order otherwise if the id is valid the backend
|
# None, creates a new order otherwise if the id is valid the backend
|
||||||
# api must modify the existing matching order. If the broker allows
|
# api must modify the existing matching order. If the broker allows
|
||||||
# for setting a unique order id then this value will be relayed back
|
# for setting a unique order id then this value will be relayed back
|
||||||
# on the emsd order request stream as the ``BrokerdOrderAck.reqid``
|
# on the emsd order request stream as the ``BrokerdOrderAck.reqid``
|
||||||
# field
|
# field
|
||||||
reqid: Optional[Union[int, str]] = None
|
reqid: Optional[int | str] = None
|
||||||
|
|
||||||
symbol: str # symbol.<providername> ?
|
symbol: str # fqsn
|
||||||
price: float
|
price: float
|
||||||
size: float
|
size: float
|
||||||
|
|
||||||
|
|
@ -170,7 +217,7 @@ class BrokerdOrderAck(Struct):
|
||||||
name: str = 'ack'
|
name: str = 'ack'
|
||||||
|
|
||||||
# defined and provided by backend
|
# defined and provided by backend
|
||||||
reqid: Union[int, str]
|
reqid: int | str
|
||||||
|
|
||||||
# emsd id originally sent in matching request msg
|
# emsd id originally sent in matching request msg
|
||||||
oid: str
|
oid: str
|
||||||
|
|
@ -180,30 +227,22 @@ class BrokerdOrderAck(Struct):
|
||||||
class BrokerdStatus(Struct):
|
class BrokerdStatus(Struct):
|
||||||
|
|
||||||
name: str = 'status'
|
name: str = 'status'
|
||||||
reqid: Union[int, str]
|
reqid: int | str
|
||||||
time_ns: int
|
time_ns: int
|
||||||
|
status: Literal[
|
||||||
|
'open',
|
||||||
|
'canceled',
|
||||||
|
'fill',
|
||||||
|
'pending',
|
||||||
|
'error',
|
||||||
|
]
|
||||||
|
|
||||||
# XXX: should be best effort set for every update
|
account: str
|
||||||
account: str = ''
|
|
||||||
|
|
||||||
# TODO: instead (ack, pending, open, fill, clos(ed), cancelled)
|
|
||||||
# {
|
|
||||||
# 'submitted',
|
|
||||||
# 'cancelled',
|
|
||||||
# 'filled',
|
|
||||||
# }
|
|
||||||
status: str
|
|
||||||
|
|
||||||
filled: float = 0.0
|
filled: float = 0.0
|
||||||
reason: str = ''
|
reason: str = ''
|
||||||
remaining: float = 0.0
|
remaining: float = 0.0
|
||||||
|
|
||||||
# XXX: better design/name here?
|
# external: bool = False
|
||||||
# flag that can be set to indicate a message for an order
|
|
||||||
# event that wasn't originated by piker's emsd (eg. some external
|
|
||||||
# trading system which does it's own order control but that you
|
|
||||||
# might want to "track" using piker UIs/systems).
|
|
||||||
external: bool = False
|
|
||||||
|
|
||||||
# XXX: not required schema as of yet
|
# XXX: not required schema as of yet
|
||||||
broker_details: dict = {
|
broker_details: dict = {
|
||||||
|
|
@ -218,7 +257,7 @@ class BrokerdFill(Struct):
|
||||||
|
|
||||||
'''
|
'''
|
||||||
name: str = 'fill'
|
name: str = 'fill'
|
||||||
reqid: Union[int, str]
|
reqid: int | str
|
||||||
time_ns: int
|
time_ns: int
|
||||||
|
|
||||||
# order exeuction related
|
# order exeuction related
|
||||||
|
|
@ -248,7 +287,7 @@ class BrokerdError(Struct):
|
||||||
|
|
||||||
# if no brokerd order request was actually submitted (eg. we errored
|
# if no brokerd order request was actually submitted (eg. we errored
|
||||||
# at the ``pikerd`` layer) then there will be ``reqid`` allocated.
|
# at the ``pikerd`` layer) then there will be ``reqid`` allocated.
|
||||||
reqid: Optional[Union[int, str]] = None
|
reqid: Optional[int | str] = None
|
||||||
|
|
||||||
symbol: str
|
symbol: str
|
||||||
reason: str
|
reason: str
|
||||||
|
|
|
||||||
|
|
@ -45,8 +45,13 @@ from ..data._normalize import iterticks
|
||||||
from ..data._source import unpack_fqsn
|
from ..data._source import unpack_fqsn
|
||||||
from ..log import get_logger
|
from ..log import get_logger
|
||||||
from ._messages import (
|
from ._messages import (
|
||||||
BrokerdCancel, BrokerdOrder, BrokerdOrderAck, BrokerdStatus,
|
BrokerdCancel,
|
||||||
BrokerdFill, BrokerdPosition, BrokerdError
|
BrokerdOrder,
|
||||||
|
BrokerdOrderAck,
|
||||||
|
BrokerdStatus,
|
||||||
|
BrokerdFill,
|
||||||
|
BrokerdPosition,
|
||||||
|
BrokerdError,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -94,6 +99,10 @@ class PaperBoi:
|
||||||
'''
|
'''
|
||||||
is_modify: bool = False
|
is_modify: bool = False
|
||||||
|
|
||||||
|
if action == 'alert':
|
||||||
|
# bypass all fill simulation
|
||||||
|
return reqid
|
||||||
|
|
||||||
entry = self._reqids.get(reqid)
|
entry = self._reqids.get(reqid)
|
||||||
if entry:
|
if entry:
|
||||||
# order is already existing, this is a modify
|
# order is already existing, this is a modify
|
||||||
|
|
@ -104,10 +113,6 @@ class PaperBoi:
|
||||||
# register order internally
|
# register order internally
|
||||||
self._reqids[reqid] = (oid, symbol, action, price)
|
self._reqids[reqid] = (oid, symbol, action, price)
|
||||||
|
|
||||||
if action == 'alert':
|
|
||||||
# bypass all fill simulation
|
|
||||||
return reqid
|
|
||||||
|
|
||||||
# TODO: net latency model
|
# TODO: net latency model
|
||||||
# we checkpoint here quickly particulalry
|
# we checkpoint here quickly particulalry
|
||||||
# for dark orders since we want the dark_executed
|
# for dark orders since we want the dark_executed
|
||||||
|
|
@ -119,7 +124,9 @@ class PaperBoi:
|
||||||
size = -size
|
size = -size
|
||||||
|
|
||||||
msg = BrokerdStatus(
|
msg = BrokerdStatus(
|
||||||
status='submitted',
|
status='open',
|
||||||
|
# account=f'paper_{self.broker}',
|
||||||
|
account='paper',
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
filled=0.0,
|
filled=0.0,
|
||||||
|
|
@ -136,7 +143,14 @@ class PaperBoi:
|
||||||
) or (
|
) or (
|
||||||
action == 'sell' and (clear_price := self.last_bid[0]) >= price
|
action == 'sell' and (clear_price := self.last_bid[0]) >= price
|
||||||
):
|
):
|
||||||
await self.fake_fill(symbol, clear_price, size, action, reqid, oid)
|
await self.fake_fill(
|
||||||
|
symbol,
|
||||||
|
clear_price,
|
||||||
|
size,
|
||||||
|
action,
|
||||||
|
reqid,
|
||||||
|
oid,
|
||||||
|
)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# register this submissions as a paper live order
|
# register this submissions as a paper live order
|
||||||
|
|
@ -178,7 +192,9 @@ class PaperBoi:
|
||||||
await trio.sleep(0.05)
|
await trio.sleep(0.05)
|
||||||
|
|
||||||
msg = BrokerdStatus(
|
msg = BrokerdStatus(
|
||||||
status='cancelled',
|
status='canceled',
|
||||||
|
# account=f'paper_{self.broker}',
|
||||||
|
account='paper',
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
broker_details={'name': 'paperboi'},
|
broker_details={'name': 'paperboi'},
|
||||||
|
|
@ -230,25 +246,23 @@ class PaperBoi:
|
||||||
self._trade_ledger.update(fill_msg.to_dict())
|
self._trade_ledger.update(fill_msg.to_dict())
|
||||||
|
|
||||||
if order_complete:
|
if order_complete:
|
||||||
|
|
||||||
msg = BrokerdStatus(
|
msg = BrokerdStatus(
|
||||||
|
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
time_ns=time.time_ns(),
|
time_ns=time.time_ns(),
|
||||||
|
# account=f'paper_{self.broker}',
|
||||||
status='filled',
|
account='paper',
|
||||||
|
status='closed',
|
||||||
filled=size,
|
filled=size,
|
||||||
remaining=0 if order_complete else remaining,
|
remaining=0 if order_complete else remaining,
|
||||||
|
# broker_details={
|
||||||
broker_details={
|
# 'paper_info': {
|
||||||
'paper_info': {
|
# 'oid': oid,
|
||||||
'oid': oid,
|
# },
|
||||||
},
|
# 'action': action,
|
||||||
'action': action,
|
# 'size': size,
|
||||||
'size': size,
|
# 'price': price,
|
||||||
'price': price,
|
# 'name': self.broker,
|
||||||
'name': self.broker,
|
# },
|
||||||
},
|
|
||||||
)
|
)
|
||||||
await self.ems_trades_stream.send(msg)
|
await self.ems_trades_stream.send(msg)
|
||||||
|
|
||||||
|
|
@ -393,69 +407,72 @@ async def handle_order_requests(
|
||||||
# order_request: dict
|
# order_request: dict
|
||||||
async for request_msg in ems_order_stream:
|
async for request_msg in ems_order_stream:
|
||||||
|
|
||||||
action = request_msg['action']
|
# action = request_msg['action']
|
||||||
|
match request_msg:
|
||||||
if action in {'buy', 'sell'}:
|
# if action in {'buy', 'sell'}:
|
||||||
|
case {'action': ('buy' | 'sell')}:
|
||||||
account = request_msg['account']
|
order = BrokerdOrder(**request_msg)
|
||||||
if account != 'paper':
|
account = order.account
|
||||||
log.error(
|
if account != 'paper':
|
||||||
'This is a paper account,'
|
log.error(
|
||||||
' only a `paper` selection is valid'
|
'This is a paper account,'
|
||||||
)
|
' only a `paper` selection is valid'
|
||||||
await ems_order_stream.send(BrokerdError(
|
)
|
||||||
oid=request_msg['oid'],
|
await ems_order_stream.send(BrokerdError(
|
||||||
symbol=request_msg['symbol'],
|
# oid=request_msg['oid'],
|
||||||
reason=f'Paper only. No account found: `{account}` ?',
|
oid=order.oid,
|
||||||
))
|
# symbol=request_msg['symbol'],
|
||||||
continue
|
symbol=order.symbol,
|
||||||
|
reason=f'Paper only. No account found: `{account}` ?',
|
||||||
|
))
|
||||||
|
continue
|
||||||
|
|
||||||
# validate
|
# validate
|
||||||
order = BrokerdOrder(**request_msg)
|
# order = BrokerdOrder(**request_msg)
|
||||||
|
|
||||||
if order.reqid is None:
|
# if order.reqid is None:
|
||||||
reqid = str(uuid.uuid4())
|
# reqid =
|
||||||
else:
|
# else:
|
||||||
reqid = order.reqid
|
reqid = order.reqid or str(uuid.uuid4())
|
||||||
|
|
||||||
# deliver ack that order has been submitted to broker routing
|
# deliver ack that order has been submitted to broker routing
|
||||||
await ems_order_stream.send(
|
await ems_order_stream.send(
|
||||||
BrokerdOrderAck(
|
BrokerdOrderAck(
|
||||||
|
|
||||||
# ems order request id
|
# ems order request id
|
||||||
oid=order.oid,
|
oid=order.oid,
|
||||||
|
|
||||||
# broker specific request id
|
# broker specific request id
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
|
|
||||||
|
)
|
||||||
)
|
)
|
||||||
)
|
|
||||||
|
|
||||||
# call our client api to submit the order
|
# call our client api to submit the order
|
||||||
reqid = await client.submit_limit(
|
reqid = await client.submit_limit(
|
||||||
|
|
||||||
oid=order.oid,
|
oid=order.oid,
|
||||||
symbol=order.symbol,
|
symbol=order.symbol,
|
||||||
price=order.price,
|
price=order.price,
|
||||||
action=order.action,
|
action=order.action,
|
||||||
size=order.size,
|
size=order.size,
|
||||||
|
|
||||||
# XXX: by default 0 tells ``ib_insync`` methods that
|
# XXX: by default 0 tells ``ib_insync`` methods that
|
||||||
# there is no existing order so ask the client to create
|
# there is no existing order so ask the client to create
|
||||||
# a new one (which it seems to do by allocating an int
|
# a new one (which it seems to do by allocating an int
|
||||||
# counter - collision prone..)
|
# counter - collision prone..)
|
||||||
reqid=reqid,
|
reqid=reqid,
|
||||||
)
|
)
|
||||||
|
|
||||||
elif action == 'cancel':
|
# elif action == 'cancel':
|
||||||
msg = BrokerdCancel(**request_msg)
|
case {'action': 'cancel'}:
|
||||||
|
msg = BrokerdCancel(**request_msg)
|
||||||
|
await client.submit_cancel(
|
||||||
|
reqid=msg.reqid
|
||||||
|
)
|
||||||
|
|
||||||
await client.submit_cancel(
|
case _:
|
||||||
reqid=msg.reqid
|
log.error(f'Unknown order command: {request_msg}')
|
||||||
)
|
|
||||||
|
|
||||||
else:
|
|
||||||
log.error(f'Unknown order command: {request_msg}')
|
|
||||||
|
|
||||||
|
|
||||||
@tractor.context
|
@tractor.context
|
||||||
|
|
|
||||||
|
|
@ -37,7 +37,7 @@ import time
|
||||||
from math import isnan
|
from math import isnan
|
||||||
|
|
||||||
from bidict import bidict
|
from bidict import bidict
|
||||||
import msgpack
|
from msgspec.msgpack import encode, decode
|
||||||
import pyqtgraph as pg
|
import pyqtgraph as pg
|
||||||
import numpy as np
|
import numpy as np
|
||||||
import tractor
|
import tractor
|
||||||
|
|
@ -774,12 +774,13 @@ async def stream_quotes(
|
||||||
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
|
async with open_websocket_url(f'ws://{host}:{port}/ws') as ws:
|
||||||
# send subs topics to server
|
# send subs topics to server
|
||||||
resp = await ws.send_message(
|
resp = await ws.send_message(
|
||||||
msgpack.dumps({'streams': list(tbks.values())})
|
|
||||||
|
encode({'streams': list(tbks.values())})
|
||||||
)
|
)
|
||||||
log.info(resp)
|
log.info(resp)
|
||||||
|
|
||||||
async def recv() -> dict[str, Any]:
|
async def recv() -> dict[str, Any]:
|
||||||
return msgpack.loads((await ws.get_message()), encoding='utf-8')
|
return decode((await ws.get_message()), encoding='utf-8')
|
||||||
|
|
||||||
streams = (await recv())['streams']
|
streams = (await recv())['streams']
|
||||||
log.info(f"Subscribed to {streams}")
|
log.info(f"Subscribed to {streams}")
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,7 @@
|
||||||
Built-in (extension) types.
|
Built-in (extension) types.
|
||||||
|
|
||||||
"""
|
"""
|
||||||
|
import sys
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
from pprint import pformat
|
from pprint import pformat
|
||||||
|
|
||||||
|
|
@ -42,7 +43,12 @@ class Struct(
|
||||||
}
|
}
|
||||||
|
|
||||||
def __repr__(self):
|
def __repr__(self):
|
||||||
return f'Struct({pformat(self.to_dict())})'
|
# only turn on pprint when we detect a python REPL
|
||||||
|
# at runtime B)
|
||||||
|
if hasattr(sys, 'ps1'):
|
||||||
|
return f'Struct({pformat(self.to_dict())})'
|
||||||
|
|
||||||
|
return super().__repr__()
|
||||||
|
|
||||||
def copy(
|
def copy(
|
||||||
self,
|
self,
|
||||||
|
|
|
||||||
19
piker/pp.py
19
piker/pp.py
|
|
@ -134,6 +134,8 @@ class Position(Struct):
|
||||||
# unique backend symbol id
|
# unique backend symbol id
|
||||||
bsuid: str
|
bsuid: str
|
||||||
|
|
||||||
|
split_ratio: Optional[int] = None
|
||||||
|
|
||||||
# ordered record of known constituent trade messages
|
# ordered record of known constituent trade messages
|
||||||
clears: dict[
|
clears: dict[
|
||||||
Union[str, int, Status], # trade id
|
Union[str, int, Status], # trade id
|
||||||
|
|
@ -159,6 +161,9 @@ class Position(Struct):
|
||||||
clears = d.pop('clears')
|
clears = d.pop('clears')
|
||||||
expiry = d.pop('expiry')
|
expiry = d.pop('expiry')
|
||||||
|
|
||||||
|
if self.split_ratio is None:
|
||||||
|
d.pop('split_ratio')
|
||||||
|
|
||||||
# TODO: we need to figure out how to have one top level
|
# TODO: we need to figure out how to have one top level
|
||||||
# listing venue here even when the backend isn't providing
|
# listing venue here even when the backend isn't providing
|
||||||
# it via the trades ledger..
|
# it via the trades ledger..
|
||||||
|
|
@ -384,12 +389,22 @@ class Position(Struct):
|
||||||
asize_h.append(accum_size)
|
asize_h.append(accum_size)
|
||||||
ppu_h.append(ppu_h[-1])
|
ppu_h.append(ppu_h[-1])
|
||||||
|
|
||||||
return ppu_h[-1] if ppu_h else 0
|
final_ppu = ppu_h[-1] if ppu_h else 0
|
||||||
|
|
||||||
|
# handle any split info entered (for now) manually by user
|
||||||
|
if self.split_ratio is not None:
|
||||||
|
final_ppu /= self.split_ratio
|
||||||
|
|
||||||
|
return final_ppu
|
||||||
|
|
||||||
def calc_size(self) -> float:
|
def calc_size(self) -> float:
|
||||||
size: float = 0
|
size: float = 0
|
||||||
for tid, entry in self.clears.items():
|
for tid, entry in self.clears.items():
|
||||||
size += entry['size']
|
size += entry['size']
|
||||||
|
|
||||||
|
if self.split_ratio is not None:
|
||||||
|
size = round(size * self.split_ratio)
|
||||||
|
|
||||||
return size
|
return size
|
||||||
|
|
||||||
def minimize_clears(
|
def minimize_clears(
|
||||||
|
|
@ -848,6 +863,7 @@ def open_pps(
|
||||||
size = entry['size']
|
size = entry['size']
|
||||||
# TODO: remove but, handle old field name for now
|
# TODO: remove but, handle old field name for now
|
||||||
ppu = entry.get('ppu', entry.get('be_price', 0))
|
ppu = entry.get('ppu', entry.get('be_price', 0))
|
||||||
|
split_ratio = entry.get('split_ratio')
|
||||||
|
|
||||||
expiry = entry.get('expiry')
|
expiry = entry.get('expiry')
|
||||||
if expiry:
|
if expiry:
|
||||||
|
|
@ -857,6 +873,7 @@ def open_pps(
|
||||||
Symbol.from_fqsn(fqsn, info={}),
|
Symbol.from_fqsn(fqsn, info={}),
|
||||||
size=size,
|
size=size,
|
||||||
ppu=ppu,
|
ppu=ppu,
|
||||||
|
split_ratio=split_ratio,
|
||||||
expiry=expiry,
|
expiry=expiry,
|
||||||
bsuid=entry['bsuid'],
|
bsuid=entry['bsuid'],
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -140,9 +140,9 @@ class LineEditor:
|
||||||
|
|
||||||
) -> LevelLine:
|
) -> LevelLine:
|
||||||
|
|
||||||
staged_line = self._active_staged_line
|
# staged_line = self._active_staged_line
|
||||||
if not staged_line:
|
# if not staged_line:
|
||||||
raise RuntimeError("No line is currently staged!?")
|
# raise RuntimeError("No line is currently staged!?")
|
||||||
|
|
||||||
# for now, until submission reponse arrives
|
# for now, until submission reponse arrives
|
||||||
line.hide_labels()
|
line.hide_labels()
|
||||||
|
|
|
||||||
|
|
@ -49,16 +49,21 @@ from ._position import (
|
||||||
SettingsPane,
|
SettingsPane,
|
||||||
)
|
)
|
||||||
from ._forms import FieldsForm
|
from ._forms import FieldsForm
|
||||||
# from ._label import FormatLabel
|
|
||||||
from ._window import MultiStatus
|
from ._window import MultiStatus
|
||||||
from ..clearing._messages import Order, BrokerdPosition
|
from ..clearing._messages import (
|
||||||
|
Order,
|
||||||
|
Status,
|
||||||
|
# BrokerdOrder,
|
||||||
|
# BrokerdStatus,
|
||||||
|
BrokerdPosition,
|
||||||
|
)
|
||||||
from ._forms import open_form_input_handling
|
from ._forms import open_form_input_handling
|
||||||
|
|
||||||
|
|
||||||
log = get_logger(__name__)
|
log = get_logger(__name__)
|
||||||
|
|
||||||
|
|
||||||
class OrderDialog(Struct):
|
class Dialog(Struct):
|
||||||
'''
|
'''
|
||||||
Trade dialogue meta-data describing the lifetime
|
Trade dialogue meta-data describing the lifetime
|
||||||
of an order submission to ``emsd`` from a chart.
|
of an order submission to ``emsd`` from a chart.
|
||||||
|
|
@ -141,7 +146,7 @@ class OrderMode:
|
||||||
current_pp: Optional[PositionTracker] = None
|
current_pp: Optional[PositionTracker] = None
|
||||||
active: bool = False
|
active: bool = False
|
||||||
name: str = 'order'
|
name: str = 'order'
|
||||||
dialogs: dict[str, OrderDialog] = field(default_factory=dict)
|
dialogs: dict[str, Dialog] = field(default_factory=dict)
|
||||||
|
|
||||||
_colors = {
|
_colors = {
|
||||||
'alert': 'alert_yellow',
|
'alert': 'alert_yellow',
|
||||||
|
|
@ -152,10 +157,7 @@ class OrderMode:
|
||||||
|
|
||||||
def line_from_order(
|
def line_from_order(
|
||||||
self,
|
self,
|
||||||
|
|
||||||
order: Order,
|
order: Order,
|
||||||
symbol: Symbol,
|
|
||||||
|
|
||||||
**line_kwargs,
|
**line_kwargs,
|
||||||
|
|
||||||
) -> LevelLine:
|
) -> LevelLine:
|
||||||
|
|
@ -173,8 +175,8 @@ class OrderMode:
|
||||||
color=self._colors[order.action],
|
color=self._colors[order.action],
|
||||||
|
|
||||||
dotted=True if (
|
dotted=True if (
|
||||||
order.exec_mode == 'dark' and
|
order.exec_mode == 'dark'
|
||||||
order.action != 'alert'
|
and order.action != 'alert'
|
||||||
) else False,
|
) else False,
|
||||||
|
|
||||||
**line_kwargs,
|
**line_kwargs,
|
||||||
|
|
@ -236,7 +238,6 @@ class OrderMode:
|
||||||
|
|
||||||
line = self.line_from_order(
|
line = self.line_from_order(
|
||||||
order,
|
order,
|
||||||
symbol,
|
|
||||||
|
|
||||||
show_markers=True,
|
show_markers=True,
|
||||||
# just for the stage line to avoid
|
# just for the stage line to avoid
|
||||||
|
|
@ -262,25 +263,28 @@ class OrderMode:
|
||||||
|
|
||||||
def submit_order(
|
def submit_order(
|
||||||
self,
|
self,
|
||||||
|
send_msg: bool = True,
|
||||||
|
order: Optional[Order] = None,
|
||||||
|
|
||||||
) -> OrderDialog:
|
) -> Dialog:
|
||||||
'''
|
'''
|
||||||
Send execution order to EMS return a level line to
|
Send execution order to EMS return a level line to
|
||||||
represent the order on a chart.
|
represent the order on a chart.
|
||||||
|
|
||||||
'''
|
'''
|
||||||
staged = self._staged_order
|
if not order:
|
||||||
symbol: Symbol = staged.symbol
|
staged = self._staged_order
|
||||||
oid = str(uuid.uuid4())
|
oid = str(uuid.uuid4())
|
||||||
|
# symbol: Symbol = staged.symbol
|
||||||
|
|
||||||
# format order data for ems
|
# format order data for ems
|
||||||
order = staged.copy()
|
order = staged.copy()
|
||||||
order.oid = oid
|
order.oid = oid
|
||||||
order.symbol = symbol.front_fqsn()
|
|
||||||
|
order.symbol = order.symbol.front_fqsn()
|
||||||
|
|
||||||
line = self.line_from_order(
|
line = self.line_from_order(
|
||||||
order,
|
order,
|
||||||
symbol,
|
|
||||||
|
|
||||||
show_markers=True,
|
show_markers=True,
|
||||||
only_show_markers_on_hover=True,
|
only_show_markers_on_hover=True,
|
||||||
|
|
@ -298,17 +302,17 @@ class OrderMode:
|
||||||
# color once the submission ack arrives.
|
# color once the submission ack arrives.
|
||||||
self.lines.submit_line(
|
self.lines.submit_line(
|
||||||
line=line,
|
line=line,
|
||||||
uuid=oid,
|
uuid=order.oid,
|
||||||
)
|
)
|
||||||
|
|
||||||
dialog = OrderDialog(
|
dialog = Dialog(
|
||||||
uuid=oid,
|
uuid=order.oid,
|
||||||
order=order,
|
order=order,
|
||||||
symbol=symbol,
|
symbol=order.symbol,
|
||||||
line=line,
|
line=line,
|
||||||
last_status_close=self.multistatus.open_status(
|
last_status_close=self.multistatus.open_status(
|
||||||
f'submitting {self._trigger_type}-{order.action}',
|
f'submitting {order.exec_mode}-{order.action}',
|
||||||
final_msg=f'submitted {self._trigger_type}-{order.action}',
|
final_msg=f'submitted {order.exec_mode}-{order.action}',
|
||||||
clear_on_next=True,
|
clear_on_next=True,
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
|
|
@ -318,14 +322,21 @@ class OrderMode:
|
||||||
|
|
||||||
# enter submission which will be popped once a response
|
# enter submission which will be popped once a response
|
||||||
# from the EMS is received to move the order to a different# status
|
# from the EMS is received to move the order to a different# status
|
||||||
self.dialogs[oid] = dialog
|
self.dialogs[order.oid] = dialog
|
||||||
|
|
||||||
# hook up mouse drag handlers
|
# hook up mouse drag handlers
|
||||||
line._on_drag_start = self.order_line_modify_start
|
line._on_drag_start = self.order_line_modify_start
|
||||||
line._on_drag_end = self.order_line_modify_complete
|
line._on_drag_end = self.order_line_modify_complete
|
||||||
|
|
||||||
# send order cmd to ems
|
# send order cmd to ems
|
||||||
self.book.send(order)
|
if send_msg:
|
||||||
|
self.book.send(order)
|
||||||
|
else:
|
||||||
|
# just register for control over this order
|
||||||
|
# TODO: some kind of mini-perms system here based on
|
||||||
|
# an out-of-band tagging/auth sub-sys for multiplayer
|
||||||
|
# order control?
|
||||||
|
self.book._sent_orders[order.oid] = order
|
||||||
|
|
||||||
return dialog
|
return dialog
|
||||||
|
|
||||||
|
|
@ -363,7 +374,7 @@ class OrderMode:
|
||||||
self,
|
self,
|
||||||
uuid: str
|
uuid: str
|
||||||
|
|
||||||
) -> OrderDialog:
|
) -> Dialog:
|
||||||
'''
|
'''
|
||||||
Order submitted status event handler.
|
Order submitted status event handler.
|
||||||
|
|
||||||
|
|
@ -418,7 +429,7 @@ class OrderMode:
|
||||||
self,
|
self,
|
||||||
|
|
||||||
uuid: str,
|
uuid: str,
|
||||||
msg: Dict[str, Any],
|
msg: Status,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
|
|
@ -442,7 +453,7 @@ class OrderMode:
|
||||||
|
|
||||||
# TODO: add in standard fill/exec info that maybe we
|
# TODO: add in standard fill/exec info that maybe we
|
||||||
# pack in a broker independent way?
|
# pack in a broker independent way?
|
||||||
f'{msg["resp"]}: {msg["trigger_price"]}',
|
f'{msg.resp}: {msg.req.price}',
|
||||||
],
|
],
|
||||||
)
|
)
|
||||||
log.runtime(result)
|
log.runtime(result)
|
||||||
|
|
@ -502,7 +513,7 @@ class OrderMode:
|
||||||
oid = dialog.uuid
|
oid = dialog.uuid
|
||||||
|
|
||||||
cancel_status_close = self.multistatus.open_status(
|
cancel_status_close = self.multistatus.open_status(
|
||||||
f'cancelling order {oid[:6]}',
|
f'cancelling order {oid}',
|
||||||
group_key=key,
|
group_key=key,
|
||||||
)
|
)
|
||||||
dialog.last_status_close = cancel_status_close
|
dialog.last_status_close = cancel_status_close
|
||||||
|
|
@ -512,6 +523,45 @@ class OrderMode:
|
||||||
|
|
||||||
return ids
|
return ids
|
||||||
|
|
||||||
|
def load_unknown_dialog_from_msg(
|
||||||
|
self,
|
||||||
|
msg: Status,
|
||||||
|
|
||||||
|
) -> Dialog:
|
||||||
|
|
||||||
|
# NOTE: the `.order` attr **must** be set with the
|
||||||
|
# equivalent order msg in order to be loaded.
|
||||||
|
order = Order(**msg.req)
|
||||||
|
oid = str(msg.oid)
|
||||||
|
symbol = order.symbol
|
||||||
|
|
||||||
|
# TODO: MEGA UGGG ZONEEEE!
|
||||||
|
src = msg.src
|
||||||
|
if (
|
||||||
|
src
|
||||||
|
and src != 'dark'
|
||||||
|
and src not in symbol
|
||||||
|
):
|
||||||
|
fqsn = symbol + '.' + src
|
||||||
|
brokername = src
|
||||||
|
else:
|
||||||
|
fqsn = symbol
|
||||||
|
*head, brokername = fqsn.rsplit('.')
|
||||||
|
|
||||||
|
# fill out complex fields
|
||||||
|
order.oid = str(order.oid)
|
||||||
|
order.brokers = [brokername]
|
||||||
|
order.symbol = Symbol.from_fqsn(
|
||||||
|
fqsn=fqsn,
|
||||||
|
info={},
|
||||||
|
)
|
||||||
|
dialog = self.submit_order(
|
||||||
|
send_msg=False,
|
||||||
|
order=order,
|
||||||
|
)
|
||||||
|
assert self.dialogs[oid] == dialog
|
||||||
|
return dialog
|
||||||
|
|
||||||
|
|
||||||
@asynccontextmanager
|
@asynccontextmanager
|
||||||
async def open_order_mode(
|
async def open_order_mode(
|
||||||
|
|
@ -549,6 +599,7 @@ async def open_order_mode(
|
||||||
trades_stream,
|
trades_stream,
|
||||||
position_msgs,
|
position_msgs,
|
||||||
brokerd_accounts,
|
brokerd_accounts,
|
||||||
|
ems_dialog_msgs,
|
||||||
),
|
),
|
||||||
trio.open_nursery() as tn,
|
trio.open_nursery() as tn,
|
||||||
|
|
||||||
|
|
@ -596,10 +647,10 @@ async def open_order_mode(
|
||||||
|
|
||||||
sym = msg['symbol']
|
sym = msg['symbol']
|
||||||
if (
|
if (
|
||||||
sym == symkey or
|
(sym == symkey) or (
|
||||||
# mega-UGH, i think we need to fix the FQSN stuff sooner
|
# mega-UGH, i think we need to fix the FQSN
|
||||||
# then later..
|
# stuff sooner then later..
|
||||||
sym == symkey.removesuffix(f'.{broker}')
|
sym == symkey.removesuffix(f'.{broker}'))
|
||||||
):
|
):
|
||||||
pps_by_account[acctid] = msg
|
pps_by_account[acctid] = msg
|
||||||
|
|
||||||
|
|
@ -653,7 +704,7 @@ async def open_order_mode(
|
||||||
# setup order mode sidepane widgets
|
# setup order mode sidepane widgets
|
||||||
form: FieldsForm = chart.sidepane
|
form: FieldsForm = chart.sidepane
|
||||||
form.vbox.setSpacing(
|
form.vbox.setSpacing(
|
||||||
int((1 + 5/8)*_font.px_size)
|
int((1 + 5 / 8) * _font.px_size)
|
||||||
)
|
)
|
||||||
|
|
||||||
from ._feedstatus import mk_feed_label
|
from ._feedstatus import mk_feed_label
|
||||||
|
|
@ -703,7 +754,7 @@ async def open_order_mode(
|
||||||
order_pane.order_mode = mode
|
order_pane.order_mode = mode
|
||||||
|
|
||||||
# select a pp to track
|
# select a pp to track
|
||||||
tracker = trackers[pp_account]
|
tracker: PositionTracker = trackers[pp_account]
|
||||||
mode.current_pp = tracker
|
mode.current_pp = tracker
|
||||||
tracker.show()
|
tracker.show()
|
||||||
tracker.hide_info()
|
tracker.hide_info()
|
||||||
|
|
@ -755,151 +806,181 @@ async def open_order_mode(
|
||||||
# to handle input since the ems connection is ready
|
# to handle input since the ems connection is ready
|
||||||
started.set()
|
started.set()
|
||||||
|
|
||||||
|
for oid, msg in ems_dialog_msgs.items():
|
||||||
|
|
||||||
|
# HACK ALERT: ensure a resp field is filled out since
|
||||||
|
# techincally the call below expects a ``Status``. TODO:
|
||||||
|
# parse into proper ``Status`` equivalents ems-side?
|
||||||
|
# msg.setdefault('resp', msg['broker_details']['resp'])
|
||||||
|
# msg.setdefault('oid', msg['broker_details']['oid'])
|
||||||
|
msg['brokerd_msg'] = msg
|
||||||
|
|
||||||
|
await process_trade_msg(
|
||||||
|
mode,
|
||||||
|
book,
|
||||||
|
msg,
|
||||||
|
)
|
||||||
|
|
||||||
tn.start_soon(
|
tn.start_soon(
|
||||||
process_trades_and_update_ui,
|
process_trades_and_update_ui,
|
||||||
tn,
|
|
||||||
feed,
|
|
||||||
mode,
|
|
||||||
trades_stream,
|
trades_stream,
|
||||||
|
mode,
|
||||||
book,
|
book,
|
||||||
)
|
)
|
||||||
|
|
||||||
yield mode
|
yield mode
|
||||||
|
|
||||||
|
|
||||||
async def process_trades_and_update_ui(
|
async def process_trades_and_update_ui(
|
||||||
|
|
||||||
n: trio.Nursery,
|
|
||||||
feed: Feed,
|
|
||||||
mode: OrderMode,
|
|
||||||
trades_stream: tractor.MsgStream,
|
trades_stream: tractor.MsgStream,
|
||||||
|
mode: OrderMode,
|
||||||
book: OrderBook,
|
book: OrderBook,
|
||||||
|
|
||||||
) -> None:
|
) -> None:
|
||||||
|
|
||||||
get_index = mode.chart.get_index
|
|
||||||
global _pnl_tasks
|
|
||||||
|
|
||||||
# this is where we receive **back** messages
|
# this is where we receive **back** messages
|
||||||
# about executions **from** the EMS actor
|
# about executions **from** the EMS actor
|
||||||
async for msg in trades_stream:
|
async for msg in trades_stream:
|
||||||
|
await process_trade_msg(
|
||||||
|
mode,
|
||||||
|
book,
|
||||||
|
msg,
|
||||||
|
)
|
||||||
|
|
||||||
fmsg = pformat(msg)
|
|
||||||
log.info(f'Received order msg:\n{fmsg}')
|
|
||||||
|
|
||||||
name = msg['name']
|
async def process_trade_msg(
|
||||||
if name in (
|
mode: OrderMode,
|
||||||
'position',
|
book: OrderBook,
|
||||||
|
msg: dict,
|
||||||
|
|
||||||
|
) -> tuple[Dialog, Status]:
|
||||||
|
|
||||||
|
get_index = mode.chart.get_index
|
||||||
|
fmsg = pformat(msg)
|
||||||
|
log.debug(f'Received order msg:\n{fmsg}')
|
||||||
|
name = msg['name']
|
||||||
|
|
||||||
|
if name in (
|
||||||
|
'position',
|
||||||
|
):
|
||||||
|
sym = mode.chart.linked.symbol
|
||||||
|
pp_msg_symbol = msg['symbol'].lower()
|
||||||
|
fqsn = sym.front_fqsn()
|
||||||
|
broker, key = sym.front_feed()
|
||||||
|
if (
|
||||||
|
pp_msg_symbol == fqsn
|
||||||
|
or pp_msg_symbol == fqsn.removesuffix(f'.{broker}')
|
||||||
):
|
):
|
||||||
sym = mode.chart.linked.symbol
|
log.info(f'{fqsn} matched pp msg: {fmsg}')
|
||||||
pp_msg_symbol = msg['symbol'].lower()
|
tracker = mode.trackers[msg['account']]
|
||||||
fqsn = sym.front_fqsn()
|
tracker.live_pp.update_from_msg(msg)
|
||||||
broker, key = sym.front_feed()
|
# update order pane widgets
|
||||||
if (
|
tracker.update_from_pp()
|
||||||
pp_msg_symbol == fqsn
|
mode.pane.update_status_ui(tracker)
|
||||||
or pp_msg_symbol == fqsn.removesuffix(f'.{broker}')
|
|
||||||
):
|
|
||||||
log.info(f'{fqsn} matched pp msg: {fmsg}')
|
|
||||||
tracker = mode.trackers[msg['account']]
|
|
||||||
tracker.live_pp.update_from_msg(msg)
|
|
||||||
# update order pane widgets
|
|
||||||
tracker.update_from_pp()
|
|
||||||
mode.pane.update_status_ui(tracker)
|
|
||||||
|
|
||||||
if tracker.live_pp.size:
|
if tracker.live_pp.size:
|
||||||
# display pnl
|
# display pnl
|
||||||
mode.pane.display_pnl(tracker)
|
mode.pane.display_pnl(tracker)
|
||||||
|
|
||||||
# short circuit to next msg to avoid
|
# short circuit to next msg to avoid
|
||||||
# unnecessary msg content lookups
|
# unnecessary msg content lookups
|
||||||
continue
|
return
|
||||||
|
|
||||||
resp = msg['resp']
|
msg = Status(**msg)
|
||||||
oid = msg['oid']
|
resp = msg.resp
|
||||||
|
oid = msg.oid
|
||||||
|
dialog: Dialog = mode.dialogs.get(oid)
|
||||||
|
|
||||||
dialog = mode.dialogs.get(oid)
|
match msg:
|
||||||
if dialog is None:
|
case Status(resp='dark_open' | 'open'):
|
||||||
log.warning(f'received msg for untracked dialog:\n{fmsg}')
|
|
||||||
|
|
||||||
# TODO: enable pure tracking / mirroring of dialogs
|
if dialog is not None:
|
||||||
# is desired.
|
# show line label once order is live
|
||||||
continue
|
mode.on_submit(oid)
|
||||||
|
|
||||||
# record message to dialog tracking
|
else:
|
||||||
dialog.msgs[oid] = msg
|
log.warning(
|
||||||
|
f'received msg for untracked dialog:\n{fmsg}'
|
||||||
|
)
|
||||||
|
assert msg.resp in ('open', 'dark_open'), f'Unknown msg: {msg}'
|
||||||
|
|
||||||
# response to 'action' request (buy/sell)
|
sym = mode.chart.linked.symbol
|
||||||
if resp in (
|
fqsn = sym.front_fqsn()
|
||||||
'dark_submitted',
|
order = Order(**msg.req)
|
||||||
'broker_submitted'
|
if (
|
||||||
):
|
((order.symbol + f'.{msg.src}') == fqsn)
|
||||||
|
|
||||||
# show line label once order is live
|
# a existing dark order for the same symbol
|
||||||
mode.on_submit(oid)
|
or (
|
||||||
|
order.symbol == fqsn
|
||||||
|
and (msg.src == 'dark') or (msg.src in fqsn)
|
||||||
|
)
|
||||||
|
):
|
||||||
|
dialog = mode.load_unknown_dialog_from_msg(msg)
|
||||||
|
mode.on_submit(oid)
|
||||||
|
# return dialog, msg
|
||||||
|
|
||||||
# resp to 'cancel' request or error condition
|
case Status(resp='error'):
|
||||||
# for action request
|
|
||||||
elif resp in (
|
|
||||||
'broker_inactive',
|
|
||||||
'broker_errored',
|
|
||||||
):
|
|
||||||
# delete level line from view
|
# delete level line from view
|
||||||
mode.on_cancel(oid)
|
mode.on_cancel(oid)
|
||||||
broker_msg = msg['brokerd_msg']
|
broker_msg = msg.brokerd_msg
|
||||||
log.error(
|
log.error(
|
||||||
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
|
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
|
||||||
)
|
)
|
||||||
|
|
||||||
elif resp in (
|
case Status(resp='canceled'):
|
||||||
'broker_cancelled',
|
|
||||||
'dark_cancelled'
|
|
||||||
):
|
|
||||||
# delete level line from view
|
# delete level line from view
|
||||||
mode.on_cancel(oid)
|
mode.on_cancel(oid)
|
||||||
broker_msg = msg['brokerd_msg']
|
req = Order(**msg.req)
|
||||||
log.cancel(
|
log.cancel(f'Canceled {req.action}:{oid}')
|
||||||
f'Order {oid}->{resp} with:\n{pformat(broker_msg)}'
|
|
||||||
)
|
|
||||||
|
|
||||||
elif resp in (
|
case Status(
|
||||||
'dark_triggered'
|
resp='triggered',
|
||||||
|
# req=Order(exec_mode='dark') # TODO:
|
||||||
|
req={'exec_mode': 'dark'},
|
||||||
):
|
):
|
||||||
|
# TODO: UX for a "pending" clear/live order
|
||||||
log.info(f'Dark order triggered for {fmsg}')
|
log.info(f'Dark order triggered for {fmsg}')
|
||||||
|
|
||||||
elif resp in (
|
case Status(
|
||||||
'alert_triggered'
|
resp='triggered',
|
||||||
|
# req=Order(exec_mode='live', action='alert') as req, # TODO
|
||||||
|
req={'exec_mode': 'live', 'action': 'alert'} as req,
|
||||||
):
|
):
|
||||||
# should only be one "fill" for an alert
|
# should only be one "fill" for an alert
|
||||||
# add a triangle and remove the level line
|
# add a triangle and remove the level line
|
||||||
|
req = Order(**req)
|
||||||
mode.on_fill(
|
mode.on_fill(
|
||||||
oid,
|
oid,
|
||||||
price=msg['trigger_price'],
|
price=req.price,
|
||||||
arrow_index=get_index(time.time()),
|
arrow_index=get_index(time.time()),
|
||||||
)
|
)
|
||||||
mode.lines.remove_line(uuid=oid)
|
mode.lines.remove_line(uuid=oid)
|
||||||
|
msg.req = req
|
||||||
await mode.on_exec(oid, msg)
|
await mode.on_exec(oid, msg)
|
||||||
|
|
||||||
# response to completed 'action' request for buy/sell
|
# response to completed 'dialog' for order request
|
||||||
elif resp in (
|
case Status(
|
||||||
'broker_executed',
|
resp='closed',
|
||||||
|
# req=Order() as req, # TODO
|
||||||
|
req=req,
|
||||||
):
|
):
|
||||||
# right now this is just triggering a system alert
|
msg.req = Order(**req)
|
||||||
await mode.on_exec(oid, msg)
|
await mode.on_exec(oid, msg)
|
||||||
|
mode.lines.remove_line(uuid=oid)
|
||||||
if msg['brokerd_msg']['remaining'] == 0:
|
|
||||||
mode.lines.remove_line(uuid=oid)
|
|
||||||
|
|
||||||
# each clearing tick is responded individually
|
# each clearing tick is responded individually
|
||||||
elif resp in (
|
case Status(resp='fill'):
|
||||||
'broker_filled',
|
|
||||||
):
|
|
||||||
|
|
||||||
|
# handle out-of-piker fills reporting?
|
||||||
known_order = book._sent_orders.get(oid)
|
known_order = book._sent_orders.get(oid)
|
||||||
if not known_order:
|
if not known_order:
|
||||||
log.warning(f'order {oid} is unknown')
|
log.warning(f'order {oid} is unknown')
|
||||||
continue
|
return
|
||||||
|
|
||||||
action = known_order.action
|
action = known_order.action
|
||||||
details = msg['brokerd_msg']
|
details = msg.brokerd_msg
|
||||||
|
|
||||||
# TODO: some kinda progress system
|
# TODO: some kinda progress system
|
||||||
mode.on_fill(
|
mode.on_fill(
|
||||||
|
|
@ -914,3 +995,9 @@ async def process_trades_and_update_ui(
|
||||||
# TODO: how should we look this up?
|
# TODO: how should we look this up?
|
||||||
# tracker = mode.trackers[msg['account']]
|
# tracker = mode.trackers[msg['account']]
|
||||||
# tracker.live_pp.fills.append(msg)
|
# tracker.live_pp.fills.append(msg)
|
||||||
|
|
||||||
|
# record message to dialog tracking
|
||||||
|
if dialog:
|
||||||
|
dialog.msgs[oid] = msg
|
||||||
|
|
||||||
|
return dialog, msg
|
||||||
|
|
|
||||||
Loading…
Reference in New Issue