diff --git a/bfxapi/models/Order.py b/bfxapi/models/Order.py index 1b1037e..1da0a7c 100644 --- a/bfxapi/models/Order.py +++ b/bfxapi/models/Order.py @@ -24,7 +24,8 @@ def now_in_mills(): return int(round(time.time() * 1000)) class Order: - def __init__(self, closingOrderArray): + def __init__(self, bfxapi, closingOrderArray): + self.bfxapi = bfxapi self.id = closingOrderArray[OrderClosedModel.ID] self.gId = closingOrderArray[OrderClosedModel.GID] self.cId = closingOrderArray[OrderClosedModel.CID] @@ -45,11 +46,40 @@ class Order: self.placeId = closingOrderArray[OrderClosedModel.PLACE_ID] self.is_pending_bool = True self.is_confirmed_bool = False + self.is_open_bool = False + + async def update(self, price=None, amount=None, delta=None, price_aux_limit=None, + price_trailing=None, flags=None, time_in_force=None): + payload = { "id": self.id } + if price is not None: + payload['price'] = str(price) + if amount is not None: + payload['amount'] = str(amount) + if delta is not None: + payload['delta'] = str(delta) + if price_aux_limit is not None: + payload['price_aux_limit'] = str(price_aux_limit) + if price_trailing is not None: + payload['price_trailing'] = str(price_trailing) + if flags is not None: + payload['flags'] = str(flags) + if time_in_force is not None: + payload['time_in_force'] = str(time_in_force) + await self.bfxapi._send_auth_command('ou', payload) + + async def close(self): + await self.bfxapi._send_auth_command('oc', { 'id': self.id }) def set_confirmed(self): self.is_pending_bool = False self.is_confirmed_bool = True + def set_open_state(self, isOpen): + self.is_open_bool = isOpen + + def isOpen(self): + return self.is_open_bool + def isPending(self): return self.is_pending_bool diff --git a/bfxapi/websockets/OrderManager.py b/bfxapi/websockets/OrderManager.py index 9e19bc2..a85cbe5 100644 --- a/bfxapi/websockets/OrderManager.py +++ b/bfxapi/websockets/OrderManager.py @@ -1,4 +1,5 @@ import time +import asyncio from ..utils.CustomLogger import CustomLogger from ..models import Order, Trade @@ -8,30 +9,33 @@ class OrderManager: def __init__(self, bfxapi, logLevel='INFO'): self.bfxapi = bfxapi self.pending_orders = {} - self.confirmed_orders = {} - self.confirmed_trades = {} + self.pending_callbacks = {} + self.closed_orders = {} + self.open_orders = {} self.logger = CustomLogger('BfxOrderManager', logLevel=logLevel) - def get_confirmed_trades(self): - return list(self.confirmed_trades.values()) + def get_open_orders(self): + return list(self.open_orders.values()) - def get_confirmed_orders(self): - return list(self.confirmed_orders.values()) + def get_closed_orders(self): + return list(self.closed_orders.values()) def get_pending_orders(self): return list(self.pending_orders.values()) async def _confirm_order(self, order, trade): + ''' + Called once when we first recieve infomation back from the bitfinex api + that the order has been accepted. + ''' if order.cId in self.pending_orders: - if self.pending_orders[order.cId][0]: + if self.pending_callbacks[order.cId][0]: # call onComplete callback - await self.pending_orders[order.cId][0](order, trade) - # add to confirmed orders list + await self.pending_callbacks[order.cId][0](order, trade) order.set_confirmed() - self.confirmed_orders[order.cId] = order - self.confirmed_trades[order.cId] = trade # remove from pending orders list del self.pending_orders[order.cId] + del self.pending_callbacks[order.cId] self.bfxapi._emit('order_confirmed', order, trade) async def confirm_order_closed(self, raw_ws_data): @@ -40,11 +44,28 @@ class OrderManager: # "EXCHANGE MARKET",null,null,null,0,"EXECUTED @ 18922.0(0.03299997): was PARTIALLY FILLED # @ 18909.0(0.06700003)",null,null,18909,18913.2899961,0,0,null,null,null,0,0,null,null,null, # "API>BFX",null,null,null]] - order = Order(raw_ws_data[2]) + order = Order(self.bfxapi, raw_ws_data[2]) trade = Trade(order) + order.set_open_state(False) + if order.id in self.open_orders: + del self.open_orders[order.id] + await self._confirm_order(order, trade) self.logger.info("Order closed: {} {}".format(order.symbol, order.status)) self.bfxapi._emit('order_closed', order, trade) - await self._confirm_order(order, trade) + + async def build_from_order_snapshot(self, raw_ws_data): + ''' + Rebuild the user orderbook based on an incoming snapshot + ''' + osData = raw_ws_data[2] + self.open_orders = {} + for raw_order in osData: + order = Order(self.bfxapi, raw_order) + trade = Trade(order) + order.set_open_state(True) + self.open_orders[order.id] = order + # await self._confirm_order(order, trade) + self.bfxapi._emit('order_snapshot', self.open_orders) async def confirm_order_update(self, raw_ws_data): # order created but partially filled @@ -52,11 +73,13 @@ class OrderManager: # 1542629458189, 0.01, 0.01, 'EXCHANGE LIMIT', None, None, None, 0, 'ACTIVE', # None, None, 100, 0, 0, 0, None, None, None, 0, 0, None, None, None, 'API>BFX', # None, None, None]] - order = Order(raw_ws_data[2]) + order = Order(self.bfxapi, raw_ws_data[2]) + order.set_open_state(True) trade = Trade(order) + self.open_orders[order.id] = order + await self._confirm_order(order, trade) self.logger.info("Order update: {} {}".format(order, trade)) self.bfxapi._emit('order_update', order, trade) - await self._confirm_order(order, trade) async def confirm_order_new(self, raw_ws_data): # order created but not executed / created but partially filled @@ -64,59 +87,58 @@ class OrderManager: # 1542624024617, 0.01, 0.01, 'EXCHANGE LIMIT', None, None, None, 0, 'ACTIVE', # None, None, 100, 0, 0, 0, None, None, None, 0, 0, None, None, None, 'API>BFX', # None, None, None]] - order = Order(raw_ws_data[2]) + order = Order(self.bfxapi, raw_ws_data[2]) + order.set_open_state(True) trade = Trade(order) + self.open_orders[order.id] = order + await self._confirm_order(order, trade) self.logger.info("Order new: {} {}".format(order, trade)) self.bfxapi._emit('order_new', order, trade) - await self._confirm_order(order, trade) def _gen_unqiue_cid(self): return int(round(time.time() * 1000)) async def submit_order(self, symbol, price, amount, market_type, hidden=False, onComplete=None, onError=None, *args, **kwargs): - order_id = self._gen_unqiue_cid() + cId = self._gen_unqiue_cid() # send order over websocket payload = { - "cid": order_id, + "cid": cId, "type": str(market_type), "symbol": symbol, "amount": str(amount), "price": str(price) } - self.pending_orders[order_id] = (onComplete, onError) + self.pending_orders[cId] = payload + self.pending_callbacks[cId] = (onComplete, onError) await self.bfxapi._send_auth_command('on', payload) self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( - order_id, symbol, amount, price)) + cId, symbol, amount, price)) - async def update_order(self, orderId, price=None, amount=None, delta=None, - price_aux_limit=None, price_trailing=None, flags=None, time_in_force=None, - onComplete=None, onError=None): - payload = { "id": orderId } - if price is not None: - payload['price'] = str(price) - if amount is not None: - payload['amount'] = str(amount) - if delta is not None: - payload['delta'] = str(delta) - if price_aux_limit is not None: - payload['price_aux_limit'] = str(price_aux_limit) - if price_trailing is not None: - payload['price_trailing'] = str(price_trailing) - if flags is not None: - payload['flags'] = str(flags) - if time_in_force is not None: - payload['time_in_force'] = str(time_in_force) - self.pending_orders[orderId] = (onComplete, onError) - await self.bfxapi._send_auth_command('ou', payload) + async def update_order(self, orderId, *args, onComplete=None, onError=None, **kwargs): + if orderId not in self.open_orders: + raise Exception("Order id={} is not open".format(orderId)) + order = self.open_orders[orderId] + self.pending_callbacks[order.cId] = (onComplete, onError) + await order.update(*args, **kwargs) self.logger.info("Update Order order_id={} dispatched".format(orderId)) - async def cancel_order(self, orderId, onComplete=None, onError=None): - self.pending_orders[orderId] = (onComplete, onError) - await self.bfxapi._send_auth_command('oc', { 'id': orderId }) + async def close_order(self, orderId, onComplete=None, onError=None): + if orderId not in self.open_orders: + raise Exception("Order id={} is not open".format(orderId)) + order = self.open_orders[orderId] + self.pending_callbacks[order.cId] = (onComplete, onError) + await order.cancel() self.logger.info("Order cancel order_id={} dispatched".format(orderId)) - async def cancel_order_multi(self, orderIds, onComplete=None, onError=None): - self.pending_orders[orderIds[0]] = (onComplete, onError) - await self.bfxapi._send_auth_command('oc', { 'id': orderIds }) - self.logger.info("Order cancel order_ids={} dispatched".format(orderIds)) + async def close_all_orders(self): + ids = [self.open_orders[x].id for x in self.open_orders] + await self.close_order_multi(ids) + + async def close_order_multi(self, orderIds): + task_batch = [] + for oid in orderIds: + task_batch += [ + asyncio.ensure_future(self.open_orders[oid].close()) + ] + await asyncio.wait(*[ task_batch ])