From d3f34566278caacc9ab872016c7508bef87cfed3 Mon Sep 17 00:00:00 2001 From: Jacob Plaster Date: Thu, 7 Feb 2019 15:33:25 +0000 Subject: [PATCH] OrderManager: avoid callback clashing --- bfxapi/websockets/OrderManager.py | 127 ++++++++++++++++-------------- 1 file changed, 68 insertions(+), 59 deletions(-) diff --git a/bfxapi/websockets/OrderManager.py b/bfxapi/websockets/OrderManager.py index 7c97e5a..c7494d3 100644 --- a/bfxapi/websockets/OrderManager.py +++ b/bfxapi/websockets/OrderManager.py @@ -19,9 +19,13 @@ class OrderManager: def __init__(self, bfxapi, logLevel='INFO'): self.bfxapi = bfxapi self.pending_orders = {} - self.pending_callbacks = {} self.closed_orders = {} self.open_orders = {} + + self.pending_order_close_callbacks = {} + self.pending_order_confirm_callbacks = {} + self.pending_update_confirm_callbacks = {} + self.pending_cancel_confirm_callbacks = {} self.logger = CustomLogger('BfxOrderManager', logLevel=logLevel) def get_open_orders(self): @@ -33,30 +37,18 @@ class OrderManager: def get_pending_orders(self): return list(self.pending_orders.values()) - async def _confirm_order(self, order, isClosed=False): - """ - Called every time an order signal has been received. This function - manages the local list of open orders. - """ - if order.cid in self.pending_orders: - await self._execute_confirm_callback(order.cid, order) - if isClosed: - await self._execute_close_callback(order.cid, order) - order.set_confirmed() - # remove from pending orders list - del self.pending_orders[order.cid] - self.bfxapi._emit('order_confirmed', order) - else: - await self._execute_confirm_callback(order.id, order) - if isClosed: - await self._execute_close_callback(order.id, order) - async def confirm_order_closed(self, raw_ws_data): order = Order.from_raw_order(raw_ws_data[2]) order.set_open_state(False) if order.id in self.open_orders: del self.open_orders[order.id] - await self._confirm_order(order, isClosed=True) + if not order.is_confirmed(): + order.set_confirmed() + self.bfxapi._emit('order_confirmed', order) + await self._execute_callback(order, self.pending_order_confirm_callbacks) + await self._execute_callback(order, self.pending_cancel_confirm_callbacks) + await self._execute_callback(order, self.pending_update_confirm_callbacks) + await self._execute_callback(order, self.pending_order_close_callbacks) self.logger.info("Order closed: {} {}".format( order.symbol, order.status)) self.bfxapi._emit('order_closed', order) @@ -77,7 +69,7 @@ class OrderManager: order = Order.from_raw_order(raw_ws_data[2]) order.set_open_state(True) self.open_orders[order.id] = order - await self._confirm_order(order) + await self._execute_callback(order, self.pending_update_confirm_callbacks) self.logger.info("Order update: {}".format(order)) self.bfxapi._emit('order_update', order) @@ -85,7 +77,9 @@ class OrderManager: order = Order.from_raw_order(raw_ws_data[2]) order.set_open_state(True) self.open_orders[order.id] = order - await self._confirm_order(order) + order.set_confirmed() + self.bfxapi._emit('order_confirmed', order) + await self._execute_callback(order, self.pending_order_confirm_callbacks) self.logger.info("Order new: {}".format(order)) self.bfxapi._emit('order_new', order) @@ -96,10 +90,11 @@ class OrderManager: hidden=False, price_trailing=None, price_aux_limit=None, oco_stop_price=None, close=False, reduce_only=False, post_only=False, oco=False, time_in_force=None, - onConfirm=None, onClose=None, *args, **kwargs): + onConfirm=None, onClose=None, gid=None, *args, **kwargs): """ Submit a new order + @param gid: assign the order to a group identitfier @param symbol: the name of the symbol i.e 'tBTCUSD @param price: the price you want to buy/sell at (must be positive) @param amount: order size: how much you want to buy/sell, @@ -133,8 +128,7 @@ class OrderManager: "price": str(price), } # caclulate and add flags - flags = self._calculate_flags( - hidden, close, reduce_only, post_only, oco) + flags = self._calculate_flags(hidden, close, reduce_only, post_only, oco) payload['flags'] = flags # add extra parameters if (price_trailing): @@ -142,19 +136,22 @@ class OrderManager: if (price_aux_limit): payload['price_aux_limit'] = price_aux_limit if (oco_stop_price): - payload['price_oco_stop'] = oco_stop_price + payload['price_oco_stop'] = str(oco_stop_price) if (time_in_force): payload['tif'] = time_in_force + if (gid): + payload['gid'] = gid # submit the order self.pending_orders[cid] = payload - self._create_callback(cid, onConfirm=onConfirm, onClose=onClose) + self._create_callback(cid, onConfirm, self.pending_order_confirm_callbacks) + self._create_callback(cid, onClose, self.pending_order_close_callbacks) await self.bfxapi._send_auth_command('on', payload) self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( cid, symbol, amount, price)) async def update_order(self, orderId, price=None, amount=None, delta=None, price_aux_limit=None, price_trailing=None, hidden=False, close=False, reduce_only=False, - post_only=False, time_in_force=None, onConfirm=None, onClose=None): + post_only=False, time_in_force=None, onConfirm=None): """ Update an existing order @@ -176,7 +173,7 @@ class OrderManager: @param onClose: function called when the bitfinex websocket receives signal that the order was closed due to being filled or cancelled """ - self._create_callback(orderId, onConfirm=onConfirm, onClose=onClose) + self._create_callback(orderId, onConfirm, self.pending_update_confirm_callbacks) payload = {"id": orderId} if price is not None: payload['price'] = str(price) @@ -196,7 +193,7 @@ class OrderManager: await self.bfxapi._send_auth_command('ou', payload) self.logger.info("Update Order order_id={} dispatched".format(orderId)) - async def cancel_order(self, orderId, onConfirm=None, onClose=None): + async def cancel_order(self, orderId, onConfirm=None): """ Cancel an existing open order @@ -207,8 +204,7 @@ class OrderManager: @param onClose: function called when the bitfinex websocket receives signal that the order was closed due to being filled or cancelled """ - # order = self.open_orders[orderId] - self._create_callback(orderId, onConfirm=onConfirm, onClose=onClose) + self._create_callback(orderId, onConfirm, self.pending_cancel_confirm_callbacks) await self.bfxapi._send_auth_command('oc', {'id': orderId}) self.logger.info("Order cancel order_id={} dispatched".format(orderId)) @@ -216,42 +212,55 @@ class OrderManager: """ Cancel all existing open orders - This function closes orders that have been tracked locally by the OrderManager. + This function closes all open orders. """ - ids = [self.open_orders[x].id for x in self.open_orders] - await self.cancel_order_multi(ids) + await self.bfxapi._send_auth_command('oc_multi', { 'all': 1 }) - async def cancel_order_multi(self, orderIds): + async def cancel_order_group(self, gid, onConfirm=None): + """ + Cancel a set of orders using a single group id. + """ + self._create_callback(gid, onConfirm, self.pending_cancel_confirm_callbacks) + await self.bfxapi._send_auth_command('oc_multi', { 'gid': [gid] }) + + async def cancel_order_multi(self, ids=None, gids=None): """ Cancel existing open orders as a batch - @param orderIds: an array of order ids + @param ids: an array of order ids + @param gids: an array of group ids """ - task_batch = [] - for oid in orderIds: - task_batch += [ - asyncio.ensure_future(self.open_orders[oid].close()) - ] - await asyncio.wait(*[task_batch]) + payload = {} + if ids: + payload['id'] = ids + if gids: + payload['gid'] = gids + await self.bfxapi._send_auth_command('oc_multi', payload) - def _create_callback(self, order_identifier, onConfirm=None, onClose=None): - if order_identifier in self.pending_callbacks: - self.pending_callbacks[order_identifier] += [(onClose, onConfirm)] + def _create_callback(self, identifier, func, callback_storage): + if not func: + return + if identifier in callback_storage: + callback_storage[identifier] += [func] else: - self.pending_callbacks[order_identifier] = [(onClose, onConfirm)] + callback_storage[identifier] = [func] - async def _execute_close_callback(self, order_identifier, *args, **kwargs): - if order_identifier in self.pending_callbacks: - for c in self.pending_callbacks[order_identifier]: - if c[0]: - await c[0](*args, **kwargs) - del self.pending_callbacks[order_identifier] - - async def _execute_confirm_callback(self, order_identifier, *args, **kwargs): - if order_identifier in self.pending_callbacks: - for c in self.pending_callbacks[order_identifier]: - if c[1]: - await c[1](*args, **kwargs) + async def _execute_callback(self, order, callback_storage): + idents = [order.id, order.cid, order.gid] + tasks = [] + key = None + for k in callback_storage.keys(): + if k in idents: + print (callback_storage[k]) + key = k + # call all callbacks associated with identifier + for callback in callback_storage[k]: + tasks += [callback(order)] + break + # remove from callbacks + if key: + del callback_storage[key] + await asyncio.gather(*tasks) def _calculate_flags(self, hidden, close, reduce_only, post_only, oco): flags = 0