From 66d0ec4c3ea59359c1cc1d63b6a75435cafec4af Mon Sep 17 00:00:00 2001 From: Jacob Plaster Date: Mon, 19 Nov 2018 13:27:12 +0000 Subject: [PATCH] Add cancel-multi, cancel and update order functions --- bfxapi/models/Order.py | 5 +- bfxapi/websockets/GenericWebsocket.py | 8 +- bfxapi/websockets/LiveWebsocket.py | 119 ++++++++++++++++++++++---- 3 files changed, 108 insertions(+), 24 deletions(-) diff --git a/bfxapi/models/Order.py b/bfxapi/models/Order.py index 3835896..d9ca095 100644 --- a/bfxapi/models/Order.py +++ b/bfxapi/models/Order.py @@ -25,7 +25,7 @@ def now_in_mills(): class Order: def __init__(self, closingOrderArray): - self.id = closingOrderArray[OrderClosedModel.ID] + self.id = closingOrderArray[OrderClosedModel.ID] self.gId = closingOrderArray[OrderClosedModel.GID] self.cId = closingOrderArray[OrderClosedModel.CID] self.symbol = closingOrderArray[OrderClosedModel.SYMBOL] @@ -47,4 +47,5 @@ class Order: def __str__(self): ''' Allow us to print the Order object in a pretty format ''' - return "Order <'{0}' mtsCreate={1}>".format(self.symbol, self.mtsCreate) + return "Order <'{0}' mtsCreate={1} {}>".format(self.symbol, self.mtsCreate, + self.status) diff --git a/bfxapi/websockets/GenericWebsocket.py b/bfxapi/websockets/GenericWebsocket.py index 80b3e35..051da05 100644 --- a/bfxapi/websockets/GenericWebsocket.py +++ b/bfxapi/websockets/GenericWebsocket.py @@ -43,12 +43,14 @@ class GenericWebsocket(object): return self.events.on(event) self.events.on(event, func) + def once(self, event, func=None): + if not func: + return self.events.once(event) + self.events.once(event, func) + def _emit(self, event, *args, **kwargs): self.events.emit(event, *args, **kwargs) - def once(self, event, func): - self.events.once(event, func) - async def on_error(self, error): self.logger.error(error) self.events.emit('error', error) diff --git a/bfxapi/websockets/LiveWebsocket.py b/bfxapi/websockets/LiveWebsocket.py index a2e0877..54a9794 100644 --- a/bfxapi/websockets/LiveWebsocket.py +++ b/bfxapi/websockets/LiveWebsocket.py @@ -72,7 +72,10 @@ class LiveBfxWebsocket(GenericWebsocket): - authenticated: called when the websocket passes authentication - notification (array): incoming account notification - error (string): error from the websocket - - order_closed (string): when an order confirmation is recieved + - order_closed (Order, Trade): when an order has been closed + - order_new (Order, Trade): when an order has been created but not closed. Note: will + not be called if order is executed and filled instantly + - order_confirmed (Order, Trade): when an order has been submitted and received - wallet_snapshot (string): Initial wallet balances (Fired once) - order_snapshot (string): Initial open orders (Fired once) - positions_snapshot (string): Initial open positions (Fired once) @@ -85,6 +88,8 @@ class LiveBfxWebsocket(GenericWebsocket): - balance_update when the state of a balance is changed - new_trade: a new trade on the market has been executed - new_candle: a new candle has been produced + - margin_info_update: new margin information has been broadcasted + - funding_info_update: new funding information has been broadcasted ''' ERRORS = { @@ -111,7 +116,7 @@ class LiveBfxWebsocket(GenericWebsocket): 20061: 'Websocket server resync complete' } - def __init__(self, API_KEY=None, API_SECRET=None, backtest=False, host='wss://api.bitfinex.com/ws/2', + def __init__(self, API_KEY=None, API_SECRET=None, backtest=False, host='wss://test.bitfinex.com/ws/2', onSeedCandleHook=None, onSeedTradeHook=None, *args, **kwargs): self.channels = {} self.API_KEY=API_KEY @@ -125,8 +130,10 @@ class LiveBfxWebsocket(GenericWebsocket): 'tu': self._trade_update_handler, 'wu': self._wallet_update_handler, 'hb': self._heart_beat_handler, - 'te': self._trade_event_handler, + 'te': self._trade_executed_handler, 'oc': self._order_closed_handler, + 'ou': self._order_update_handler, + 'on': self._order_new_handler, 'os': self._order_snapshot_handler, 'ws': self._wallet_snapshot_handler, 'ps': self._position_snapshot_handler, @@ -134,7 +141,9 @@ class LiveBfxWebsocket(GenericWebsocket): 'fcs': self._funding_credit_snapshot_handler, 'fls': self._funding_load_snapshot_handler, 'bu': self._balance_update_handler, - 'n': self._notification_handler + 'n': self._notification_handler, + 'miu': self._margin_info_update_handler, + 'fiu': self._funding_info_update_handler } self._WS_SYSTEM_HANDLERS = { @@ -197,17 +206,19 @@ class LiveBfxWebsocket(GenericWebsocket): async def _trade_update_handler(self, data): tData = data[2] # [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]] - channelData = self.channels[data[0]] - tradeObj = _parse_trade(tData, channelData.get('symbol')) - self._emit('new_trade', tradeObj) + if data[0] in self.channels: + channelData = self.channels[data[0]] + tradeObj = _parse_trade(tData, channelData.get('symbol')) + self._emit('new_trade', tradeObj) - async def _trade_event_handler(self, data): + async def _trade_executed_handler(self, data): tData = data[2] # [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]] - channelData = self.channels[data[0]] - tradeObj = _parse_trade(tData, channelData.get('symbol')) - self._emit('new_trade', tradeObj) + if data[0] in self.channels: + channelData = self.channels[data[0]] + tradeObj = _parse_trade(tData, channelData.get('symbol')) + self._emit('new_trade', tradeObj) async def _wallet_update_handler(self, data): # [0,"wu",["exchange","USD",89134.66933283,0]] @@ -218,6 +229,14 @@ class LiveBfxWebsocket(GenericWebsocket): async def _heart_beat_handler(self, data): self.logger.debug("Heartbeat - {}".format(self.host)) + async def _margin_info_update_handler(self, data): + self._emit('margin_info_update', data) + self.logger.info("Margin info update: {}".format(data)) + + async def _funding_info_update_handler(self, data): + self._emit('funding_info_update', data) + self.logger.info("Funding info update: {}".format(data)) + async def _notification_handler(self, data): # [0, 'n', [1542289340429, 'on-req', None, None, # [1151350600, None, 1542289341196, 'tBTCUSD', None, None, 0.01, None, 'EXCHANGE MARKET', @@ -238,6 +257,7 @@ class LiveBfxWebsocket(GenericWebsocket): self._emit('balance_update', data[2]) async def _order_closed_handler(self, data): + # order created and executed # [0,"oc",[1151349678,null,1542203391995,"tBTCUSD",1542203389940,1542203389966,0,0.1, # "EXCHANGE MARKET",null,null,null,0,"EXECUTED @ 18922.0(0.03299997): was PARTIALLY FILLED # @ 18909.0(0.06700003)",null,null,18909,18913.2899961,0,0,null,null,null,0,0,null,null,null, @@ -250,7 +270,42 @@ class LiveBfxWebsocket(GenericWebsocket): if order.cId in self.pendingOrders: if self.pendingOrders[order.cId][0]: await self.pendingOrders[order.cId][0](order, trade) - del self.pendingOrders[order.cId] + del self.pendingOrders[order.cId] + self._emit('order_confirmed', order, trade) + + async def _order_update_handler(self, data): + # order created but partially filled + # [0, 'ou', [1151351581, None, 1542629457873, 'tBTCUSD', 1542629458071, + # 1542629458189, 0.01, 0.01, 'EXCHANGE LIMIT', None, None, None, 0, 'ACTIVE', + # None, None, 100, 0, 0, 0, None, None, None, 0, 0, None, None, None, 'API>BFX', + # None, None, None]] + tInfo = data[2] + order = Order(tInfo) + trade = Trade(order) + self.logger.info("Order update: {} {}".format(order, trade)) + self._emit('order_update', order, trade) + if order.cId in self.pendingOrders: + if self.pendingOrders[order.cId][0]: + await self.pendingOrders[order.cId][0](order, trade) + del self.pendingOrders[order.cId] + self._emit('order_confirmed', order, trade) + + async def _order_new_handler(self, data): + # order created but not executed / created but partially filled + # [0, 'on', [1151351563, None, 1542624024383, 'tBTCUSD', 1542624024596, + # 1542624024617, 0.01, 0.01, 'EXCHANGE LIMIT', None, None, None, 0, 'ACTIVE', + # None, None, 100, 0, 0, 0, None, None, None, 0, 0, None, None, None, 'API>BFX', + # None, None, None]] + tInfo = data[2] + order = Order(tInfo) + trade = Trade(order) + self.logger.info("Order new: {} {}".format(order, trade)) + self._emit('order_new', order, trade) + if order.cId in self.pendingOrders: + if self.pendingOrders[order.cId][0]: + await self.pendingOrders[order.cId][0](order, trade) + self._emit('order_confirmed', order, trade) + del self.pendingOrders[order.cId] async def _order_snapshot_handler(self, data): self._emit('order_snapshot', data) @@ -344,11 +399,6 @@ class LiveBfxWebsocket(GenericWebsocket): # Orders are simulated in backtest mode if not self.backtest and self.API_KEY and self.API_SECRET: await self._ws_authenticate_socket() - # subscribe to data feed - # TODO: allow for multiple subscriptions - # await self._subscribe('trades', symbol=self.symbol) - # key = 'trade:1m:{}'.format(self.symbol) - # await self._subscribe('candles', key=key, symbol=self.symbol) async def send_auth_command(self, channel_name, data): payload = [0, channel_name, None, data] @@ -374,8 +424,39 @@ class LiveBfxWebsocket(GenericWebsocket): "amount": str(amount), "price": str(price) } - self.pendingOrders[order_id] = payload + self.pendingOrders[order_id] = (onComplete, onError) await self.send_auth_command('on', payload) self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( order_id, symbol, amount, price)) - self.pendingOrders[order_id] = (onComplete, onError) + + async def update_order(self, orderId, price=None, amount=None, delta=None, + price_aux_limit=None, price_trailing=None, flags=None, time_in_force=None, + onComplete=None, onError=None): + payload = { "id": orderId } + if price is not None: + payload['price'] = str(price) + if amount is not None: + payload['amount'] = str(amount) + if delta is not None: + payload['delta'] = str(delta) + if price_aux_limit is not None: + payload['price_aux_limit'] = str(price_aux_limit) + if price_trailing is not None: + payload['price_trailing'] = str(price_trailing) + if flags is not None: + payload['flags'] = str(flags) + if time_in_force is not None: + payload['time_in_force'] = str(time_in_force) + self.pendingOrders[orderId] = (onComplete, onError) + await self.send_auth_command('ou', payload) + self.logger.info("Update Order order_id={} dispatched".format(orderId)) + + async def cancel_order(self, orderId, onComplete=None, onError=None): + self.pendingOrders[orderId] = (onComplete, onError) + await self.send_auth_command('oc', { 'id': orderId }) + self.logger.info("Order cancel order_id={} dispatched".format(orderId)) + + async def cancel_order_multi(self, orderIds, onComplete=None, onError=None): + self.pendingOrders[orderIds[0]] = (onComplete, onError) + await self.send_auth_command('oc', { 'id': orderIds }) + self.logger.info("Order cancel order_ids={} dispatched".format(orderIds))