diff --git a/piker/brokers/kraken/broker.py b/piker/brokers/kraken/broker.py index 3641934a..6bb3be17 100644 --- a/piker/brokers/kraken/broker.py +++ b/piker/brokers/kraken/broker.py @@ -39,7 +39,6 @@ from bidict import bidict import pendulum import trio import tractor -import wsproto from piker.pp import ( Position, @@ -49,6 +48,8 @@ from piker.pp import ( open_pps, ) from piker.clearing._messages import ( + Order, + Status, BrokerdCancel, BrokerdError, BrokerdFill, @@ -126,7 +127,7 @@ async def handle_order_requests( oid=msg['oid'], symbol=msg['symbol'], reason=( - f'TooFastEdit reqid:{reqid}, could not cancelling..' + f'Edit too fast:{reqid}, cancelling..' ), ) @@ -249,7 +250,7 @@ async def handle_order_requests( @acm async def subscribe( - ws: wsproto.WSConnection, + ws: NoBsWs, token: str, subs: list[tuple[str, dict]] = [ ('ownTrades', { @@ -632,8 +633,6 @@ async def handle_order_updates( # to do all fill/status/pp updates in that sub and just use # this one for ledger syncs? - # XXX: ASK SUPPORT ABOUT THIS! - # For eg. we could take the "last 50 trades" and do a diff # with the ledger and then only do a re-sync if something # seems amiss? @@ -696,7 +695,6 @@ async def handle_order_updates( status_msg = BrokerdStatus( reqid=reqid, time_ns=time.time_ns(), - account=acc_name, status='filled', filled=size, @@ -870,38 +868,56 @@ async def handle_order_updates( f'{update_msg}\n' 'Cancelling order for now!..' ) + # call ws api to cancel: + # https://docs.kraken.com/websockets/#message-cancelOrder + await ws.send_msg({ + 'event': 'cancelOrder', + 'token': token, + 'reqid': reqid or 0, + 'txid': [txid], + }) + continue - elif noid: # a non-ems-active order - # TODO: handle these and relay them - # through the EMS to the client / UI - # side! - log.cancel( - f'Rx unknown active order {txid}:\n' - f'{update_msg}\n' - 'Cancelling order for now!..' + # a non-ems-active order, emit live + # order embedded in status msg. + elif noid: + # parse out existing live order + descr = rest['descr'] + fqsn = descr['pair'].replace( + '/', '').lower() + price = float(descr['price']) + size = float(rest['vol']) + action = descr['type'] + + # register the userref value from + # kraken (usually an `int` staring + # at 1?) as our reqid. + reqids2txids[reqid] = txid + oid = str(reqid) + ids[oid] = reqid # NOTE!: str -> int + + # fill out ``Status`` + boxed ``Order`` + status_msg = Status( + time_ns=time.time_ns(), + resp='open', + oid=oid, + reqid=reqid, + + # embedded order info + req=Order( + action=action, + exec_mode='live', + oid=oid, + symbol=fqsn, + account=acc_name, + price=price, + size=size, + ), + src='kraken', ) - - # call ws api to cancel: - # https://docs.kraken.com/websockets/#message-cancelOrder - await ws.send_msg({ - 'event': 'cancelOrder', - 'token': token, - 'reqid': reqid or 0, - 'txid': [txid], - }) - continue - - # remap statuses to ems set. - ems_status = { - 'open': 'submitted', - 'closed': 'filled', - 'canceled': 'cancelled', - # do we even need to forward - # this state to the ems? - 'pending': 'pending', - }[status] - # TODO: i like the open / closed semantics - # more we should consider them for internals + apiflows[reqid].maps.append(status_msg) + await ems_stream.send(status_msg) + continue # send BrokerdStatus messages for all # order state updates @@ -912,7 +928,7 @@ async def handle_order_updates( account=f'kraken.{acctid}', # everyone doin camel case.. - status=ems_status, # force lower case + status=status, # force lower case filled=vlm, reason='', # why held?