OrderManager: avoid callback clashing

This commit is contained in:
Jacob Plaster
2019-02-07 15:33:25 +00:00
parent fbb08afca9
commit d3f3456627

View File

@@ -19,9 +19,13 @@ class OrderManager:
def __init__(self, bfxapi, logLevel='INFO'): def __init__(self, bfxapi, logLevel='INFO'):
self.bfxapi = bfxapi self.bfxapi = bfxapi
self.pending_orders = {} self.pending_orders = {}
self.pending_callbacks = {}
self.closed_orders = {} self.closed_orders = {}
self.open_orders = {} self.open_orders = {}
self.pending_order_close_callbacks = {}
self.pending_order_confirm_callbacks = {}
self.pending_update_confirm_callbacks = {}
self.pending_cancel_confirm_callbacks = {}
self.logger = CustomLogger('BfxOrderManager', logLevel=logLevel) self.logger = CustomLogger('BfxOrderManager', logLevel=logLevel)
def get_open_orders(self): def get_open_orders(self):
@@ -33,30 +37,18 @@ class OrderManager:
def get_pending_orders(self): def get_pending_orders(self):
return list(self.pending_orders.values()) return list(self.pending_orders.values())
async def _confirm_order(self, order, isClosed=False):
"""
Called every time an order signal has been received. This function
manages the local list of open orders.
"""
if order.cid in self.pending_orders:
await self._execute_confirm_callback(order.cid, order)
if isClosed:
await self._execute_close_callback(order.cid, order)
order.set_confirmed()
# remove from pending orders list
del self.pending_orders[order.cid]
self.bfxapi._emit('order_confirmed', order)
else:
await self._execute_confirm_callback(order.id, order)
if isClosed:
await self._execute_close_callback(order.id, order)
async def confirm_order_closed(self, raw_ws_data): async def confirm_order_closed(self, raw_ws_data):
order = Order.from_raw_order(raw_ws_data[2]) order = Order.from_raw_order(raw_ws_data[2])
order.set_open_state(False) order.set_open_state(False)
if order.id in self.open_orders: if order.id in self.open_orders:
del self.open_orders[order.id] del self.open_orders[order.id]
await self._confirm_order(order, isClosed=True) if not order.is_confirmed():
order.set_confirmed()
self.bfxapi._emit('order_confirmed', order)
await self._execute_callback(order, self.pending_order_confirm_callbacks)
await self._execute_callback(order, self.pending_cancel_confirm_callbacks)
await self._execute_callback(order, self.pending_update_confirm_callbacks)
await self._execute_callback(order, self.pending_order_close_callbacks)
self.logger.info("Order closed: {} {}".format( self.logger.info("Order closed: {} {}".format(
order.symbol, order.status)) order.symbol, order.status))
self.bfxapi._emit('order_closed', order) self.bfxapi._emit('order_closed', order)
@@ -77,7 +69,7 @@ class OrderManager:
order = Order.from_raw_order(raw_ws_data[2]) order = Order.from_raw_order(raw_ws_data[2])
order.set_open_state(True) order.set_open_state(True)
self.open_orders[order.id] = order self.open_orders[order.id] = order
await self._confirm_order(order) await self._execute_callback(order, self.pending_update_confirm_callbacks)
self.logger.info("Order update: {}".format(order)) self.logger.info("Order update: {}".format(order))
self.bfxapi._emit('order_update', order) self.bfxapi._emit('order_update', order)
@@ -85,7 +77,9 @@ class OrderManager:
order = Order.from_raw_order(raw_ws_data[2]) order = Order.from_raw_order(raw_ws_data[2])
order.set_open_state(True) order.set_open_state(True)
self.open_orders[order.id] = order self.open_orders[order.id] = order
await self._confirm_order(order) order.set_confirmed()
self.bfxapi._emit('order_confirmed', order)
await self._execute_callback(order, self.pending_order_confirm_callbacks)
self.logger.info("Order new: {}".format(order)) self.logger.info("Order new: {}".format(order))
self.bfxapi._emit('order_new', order) self.bfxapi._emit('order_new', order)
@@ -96,10 +90,11 @@ class OrderManager:
hidden=False, price_trailing=None, price_aux_limit=None, hidden=False, price_trailing=None, price_aux_limit=None,
oco_stop_price=None, close=False, reduce_only=False, oco_stop_price=None, close=False, reduce_only=False,
post_only=False, oco=False, time_in_force=None, post_only=False, oco=False, time_in_force=None,
onConfirm=None, onClose=None, *args, **kwargs): onConfirm=None, onClose=None, gid=None, *args, **kwargs):
""" """
Submit a new order Submit a new order
@param gid: assign the order to a group identitfier
@param symbol: the name of the symbol i.e 'tBTCUSD @param symbol: the name of the symbol i.e 'tBTCUSD
@param price: the price you want to buy/sell at (must be positive) @param price: the price you want to buy/sell at (must be positive)
@param amount: order size: how much you want to buy/sell, @param amount: order size: how much you want to buy/sell,
@@ -133,8 +128,7 @@ class OrderManager:
"price": str(price), "price": str(price),
} }
# caclulate and add flags # caclulate and add flags
flags = self._calculate_flags( flags = self._calculate_flags(hidden, close, reduce_only, post_only, oco)
hidden, close, reduce_only, post_only, oco)
payload['flags'] = flags payload['flags'] = flags
# add extra parameters # add extra parameters
if (price_trailing): if (price_trailing):
@@ -142,19 +136,22 @@ class OrderManager:
if (price_aux_limit): if (price_aux_limit):
payload['price_aux_limit'] = price_aux_limit payload['price_aux_limit'] = price_aux_limit
if (oco_stop_price): if (oco_stop_price):
payload['price_oco_stop'] = oco_stop_price payload['price_oco_stop'] = str(oco_stop_price)
if (time_in_force): if (time_in_force):
payload['tif'] = time_in_force payload['tif'] = time_in_force
if (gid):
payload['gid'] = gid
# submit the order # submit the order
self.pending_orders[cid] = payload self.pending_orders[cid] = payload
self._create_callback(cid, onConfirm=onConfirm, onClose=onClose) self._create_callback(cid, onConfirm, self.pending_order_confirm_callbacks)
self._create_callback(cid, onClose, self.pending_order_close_callbacks)
await self.bfxapi._send_auth_command('on', payload) await self.bfxapi._send_auth_command('on', payload)
self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( self.logger.info("Order cid={} ({} {} @ {}) dispatched".format(
cid, symbol, amount, price)) cid, symbol, amount, price))
async def update_order(self, orderId, price=None, amount=None, delta=None, price_aux_limit=None, async def update_order(self, orderId, price=None, amount=None, delta=None, price_aux_limit=None,
price_trailing=None, hidden=False, close=False, reduce_only=False, price_trailing=None, hidden=False, close=False, reduce_only=False,
post_only=False, time_in_force=None, onConfirm=None, onClose=None): post_only=False, time_in_force=None, onConfirm=None):
""" """
Update an existing order Update an existing order
@@ -176,7 +173,7 @@ class OrderManager:
@param onClose: function called when the bitfinex websocket receives signal that the order @param onClose: function called when the bitfinex websocket receives signal that the order
was closed due to being filled or cancelled was closed due to being filled or cancelled
""" """
self._create_callback(orderId, onConfirm=onConfirm, onClose=onClose) self._create_callback(orderId, onConfirm, self.pending_update_confirm_callbacks)
payload = {"id": orderId} payload = {"id": orderId}
if price is not None: if price is not None:
payload['price'] = str(price) payload['price'] = str(price)
@@ -196,7 +193,7 @@ class OrderManager:
await self.bfxapi._send_auth_command('ou', payload) await self.bfxapi._send_auth_command('ou', payload)
self.logger.info("Update Order order_id={} dispatched".format(orderId)) self.logger.info("Update Order order_id={} dispatched".format(orderId))
async def cancel_order(self, orderId, onConfirm=None, onClose=None): async def cancel_order(self, orderId, onConfirm=None):
""" """
Cancel an existing open order Cancel an existing open order
@@ -207,8 +204,7 @@ class OrderManager:
@param onClose: function called when the bitfinex websocket receives signal that the order @param onClose: function called when the bitfinex websocket receives signal that the order
was closed due to being filled or cancelled was closed due to being filled or cancelled
""" """
# order = self.open_orders[orderId] self._create_callback(orderId, onConfirm, self.pending_cancel_confirm_callbacks)
self._create_callback(orderId, onConfirm=onConfirm, onClose=onClose)
await self.bfxapi._send_auth_command('oc', {'id': orderId}) await self.bfxapi._send_auth_command('oc', {'id': orderId})
self.logger.info("Order cancel order_id={} dispatched".format(orderId)) self.logger.info("Order cancel order_id={} dispatched".format(orderId))
@@ -216,42 +212,55 @@ class OrderManager:
""" """
Cancel all existing open orders Cancel all existing open orders
This function closes orders that have been tracked locally by the OrderManager. This function closes all open orders.
""" """
ids = [self.open_orders[x].id for x in self.open_orders] await self.bfxapi._send_auth_command('oc_multi', { 'all': 1 })
await self.cancel_order_multi(ids)
async def cancel_order_multi(self, orderIds): async def cancel_order_group(self, gid, onConfirm=None):
"""
Cancel a set of orders using a single group id.
"""
self._create_callback(gid, onConfirm, self.pending_cancel_confirm_callbacks)
await self.bfxapi._send_auth_command('oc_multi', { 'gid': [gid] })
async def cancel_order_multi(self, ids=None, gids=None):
""" """
Cancel existing open orders as a batch Cancel existing open orders as a batch
@param orderIds: an array of order ids @param ids: an array of order ids
@param gids: an array of group ids
""" """
task_batch = [] payload = {}
for oid in orderIds: if ids:
task_batch += [ payload['id'] = ids
asyncio.ensure_future(self.open_orders[oid].close()) if gids:
] payload['gid'] = gids
await asyncio.wait(*[task_batch]) await self.bfxapi._send_auth_command('oc_multi', payload)
def _create_callback(self, order_identifier, onConfirm=None, onClose=None): def _create_callback(self, identifier, func, callback_storage):
if order_identifier in self.pending_callbacks: if not func:
self.pending_callbacks[order_identifier] += [(onClose, onConfirm)] return
if identifier in callback_storage:
callback_storage[identifier] += [func]
else: else:
self.pending_callbacks[order_identifier] = [(onClose, onConfirm)] callback_storage[identifier] = [func]
async def _execute_close_callback(self, order_identifier, *args, **kwargs): async def _execute_callback(self, order, callback_storage):
if order_identifier in self.pending_callbacks: idents = [order.id, order.cid, order.gid]
for c in self.pending_callbacks[order_identifier]: tasks = []
if c[0]: key = None
await c[0](*args, **kwargs) for k in callback_storage.keys():
del self.pending_callbacks[order_identifier] if k in idents:
print (callback_storage[k])
async def _execute_confirm_callback(self, order_identifier, *args, **kwargs): key = k
if order_identifier in self.pending_callbacks: # call all callbacks associated with identifier
for c in self.pending_callbacks[order_identifier]: for callback in callback_storage[k]:
if c[1]: tasks += [callback(order)]
await c[1](*args, **kwargs) break
# remove from callbacks
if key:
del callback_storage[key]
await asyncio.gather(*tasks)
def _calculate_flags(self, hidden, close, reduce_only, post_only, oco): def _calculate_flags(self, hidden, close, reduce_only, post_only, oco):
flags = 0 flags = 0