diff --git a/.travis.yml b/.travis.yml index eb6e601..1873ba2 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,12 +1,8 @@ language: python - -before_install: - - sudo apt-get update - - sudo apt-get install python3 - - sudo apt-get install python3-pip - - sudo python3 -m pip install --upgrade pip -# command to install dependencies +python: + - "3.6" install: - - python3 -m pip install -r requirements.txt --user -# command to run tests -script: python3 -m pylint --rcfile=pylint.rc bfxapi + - python3.6 -m pip install -r requirements.txt +script: + - pylint --rcfile=pylint.rc bfxapi + - pytest diff --git a/README.md b/README.md index 81788e2..8182dba 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,8 @@ The websocket exposes a collection of events that are triggered when certain dat - `all` (array|json): listen for all messages coming through - `connected:` () called when a connection is made +- `disconnected`: () called when a connection is ended (A reconnect attempt may follow) +- `stopped`: () called when max amount of connection retries is met and the socket is closed - `authenticated` (): called when the websocket passes authentication - `notification` (array): incoming account notification - `error` (array): error from the websocket diff --git a/bfxapi/tests/helpers.py b/bfxapi/tests/helpers.py index 1670beb..e1b5fee 100644 --- a/bfxapi/tests/helpers.py +++ b/bfxapi/tests/helpers.py @@ -5,7 +5,7 @@ import asyncio from .. import Client, BfxWebsocket def get_now(): - return int(round(time.time() * 1000)) + return int(round(time.time() * 1000)) class StubbedWebsocket(BfxWebsocket): def __new__(cls, *args, **kwargs): diff --git a/bfxapi/tests/test_ws_orders.py b/bfxapi/tests/test_ws_orders.py index d981721..39067e8 100644 --- a/bfxapi/tests/test_ws_orders.py +++ b/bfxapi/tests/test_ws_orders.py @@ -110,3 +110,99 @@ async def test_events_on_cancel_order(): assert close_res.price == 10 assert close_res.type == 'EXCHANGE LIMIT' +@pytest.mark.asyncio +async def test_closed_callback_on_submit_order_closed(): + client = create_stubbed_client() + # publsh connection created message + await ws_publish_connection_init(client.ws) + ## send auth accepted + await ws_publish_auth_accepted(client.ws) + async def c(order): + client.ws._emit('c1', order) + callback_wait = EventWatcher.watch(client.ws, 'c1') + # override cid generation + client.ws.orderManager._gen_unqiue_cid = lambda: 123 + await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onClose=c) + await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + callback_wait.wait_until_complete() + + +@pytest.mark.asyncio +async def test_confirmed_callback_on_submit_order_closed(): + client = create_stubbed_client() + # publsh connection created message + await ws_publish_connection_init(client.ws) + ## send auth accepted + await ws_publish_auth_accepted(client.ws) + async def c(order): + client.ws._emit('c1', order) + callback_wait = EventWatcher.watch(client.ws, 'c1') + # override cid generation + client.ws.orderManager._gen_unqiue_cid = lambda: 123 + await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c) + await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + callback_wait.wait_until_complete() + +@pytest.mark.asyncio +async def test_confirmed_callback_on_submit_new_order(): + client = create_stubbed_client() + # publsh connection created message + await ws_publish_connection_init(client.ws) + ## send auth accepted + await ws_publish_auth_accepted(client.ws) + async def c(order): + client.ws._emit('c1', order) + callback_wait = EventWatcher.watch(client.ws, 'c1') + # override cid generation + client.ws.orderManager._gen_unqiue_cid = lambda: 123 + await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c) + await client.ws.publish([0,"on",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + callback_wait.wait_until_complete() + +@pytest.mark.asyncio +async def test_confirmed_callback_on_submit_order_update(): + client = create_stubbed_client() + # publsh connection created message + await ws_publish_connection_init(client.ws) + ## send auth accepted + await ws_publish_auth_accepted(client.ws) + async def c(order): + client.ws._emit('c1', order) + callback_wait = EventWatcher.watch(client.ws, 'c1') + # override cid generation + client.ws.orderManager._gen_unqiue_cid = lambda: 123 + await client.ws.update_order(123, price=100, onConfirm=c) + await client.ws.publish([0,"ou",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + callback_wait.wait_until_complete() + +@pytest.mark.asyncio +async def test_confirmed_callback_on_submit_cancel_order(): + client = create_stubbed_client() + # publsh connection created message + await ws_publish_connection_init(client.ws) + ## send auth accepted + await ws_publish_auth_accepted(client.ws) + async def c(order): + client.ws._emit('c1', order) + callback_wait = EventWatcher.watch(client.ws, 'c1') + # override cid generation + client.ws.orderManager._gen_unqiue_cid = lambda: 123 + await client.ws.cancel_order(123, onConfirm=c) + await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + callback_wait.wait_until_complete() + +@pytest.mark.asyncio +async def test_confirmed_callback_on_submit_cancel_group_order(): + client = create_stubbed_client() + # publsh connection created message + await ws_publish_connection_init(client.ws) + ## send auth accepted + await ws_publish_auth_accepted(client.ws) + async def c(order): + client.ws._emit('c1', order) + callback_wait = EventWatcher.watch(client.ws, 'c1') + # override cid generation + client.ws.orderManager._gen_unqiue_cid = lambda: 123 + await client.ws.cancel_order_group(123, onConfirm=c) + await client.ws.publish([0,"oc",[1548262833910,123,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + callback_wait.wait_until_complete() diff --git a/bfxapi/utils/testing_tools.py b/bfxapi/utils/testing_tools.py new file mode 100644 index 0000000..143df5d --- /dev/null +++ b/bfxapi/utils/testing_tools.py @@ -0,0 +1,90 @@ +import time +import json +import asyncio + +from .. import Client, BfxWebsocket + +def get_now(): + return int(round(time.time() * 1000)) + +class StubbedWebsocket(BfxWebsocket): + def __new__(cls, *args, **kwargs): + instance = super(StubbedWebsocket, cls).__new__(cls, *args, **kwargs) + instance.sent_items = [] + instance.published_items = [] + return instance + + async def _main(self, host): + print ("Faking wesocket connection to {}".format(host)) + + def get_ws(self): + return self + + async def publish(self, data, is_json=True): + self.published_items += [{ + 'time': get_now(), + 'data': data + }] + # convert to string and push through the websocket + data = json.dumps(data) if is_json else data + return await self.on_message(data) + + async def publish_auth_confirmation(self): + return self.publish({"event":"auth","status":"OK","chanId":0,"userId":269499,"auth_id":"58aa0472-b1a9-4690-8ab8-300d68e66aaf","caps":{"orders":{"read":1,"write":1},"account":{"read":1,"write":0},"funding":{"read":1,"write":1},"history":{"read":1,"write":0},"wallets":{"read":1,"write":1},"withdraw":{"read":0,"write":1},"positions":{"read":1,"write":1}}}) + + async def send(self, data_string): + self.sent_items += [{ + 'time': get_now(), + 'data': data_string + }] + + def get_published_items(self): + return self.published_items + + def get_sent_items(self): + return self.sent_items + + def get_last_sent_item(self): + return self.sent_items[-1:][0] + + def get_sent_items_count(self): + return len(self.sent_items) + +class EventWatcher(): + + def __init__(self, ws, event): + self.value = None + self.event = event + ws.once(event, self._finish) + + def _finish(self, value): + self.value = value or {} + + @classmethod + def watch(cls, ws, event): + return EventWatcher(ws, event) + + def wait_until_complete(self, max_wait_time=5): + counter = 0 + while self.value == None: + if counter > 5: + raise Exception('Wait time limit exceeded for event {}'.format(self.event)) + time.sleep(1) + counter += 1 + return self.value + +def create_stubbed_client(*args, **kwargs): + client = Client(*args, **kwargs) + # no support for rest stubbing yet + client.rest = None + client.ws = StubbedWebsocket(*args, **kwargs) + return client + +async def ws_publish_auth_accepted(ws): + return await ws.publish({"event":"auth","status":"OK","chanId":0,"userId":269499,"auth_id":"58aa0472-b1a9-4690-8ab8-300d68e66aaf","caps":{"orders":{"read":1,"write":1},"account":{"read":1,"write":0},"funding":{"read":1,"write":1},"history":{"read":1,"write":0},"wallets":{"read":1,"write":1},"withdraw":{"read":0,"write":1},"positions":{"read":1,"write":1}}}) + +async def ws_publish_connection_init(ws): + return await ws.publish({"event":"info","version":2,"serverId":"748c00f2-250b-46bb-8519-ce1d7d68e4f0","platform":{"status":1}}) + +async def ws_publish_conf_accepted(ws, flags_code): + return await ws.publish({"event":"conf","status":"OK","flags":flags_code}) diff --git a/bfxapi/websockets/BfxWebsocket.py b/bfxapi/websockets/BfxWebsocket.py index b8bf4a6..0b311e3 100644 --- a/bfxapi/websockets/BfxWebsocket.py +++ b/bfxapi/websockets/BfxWebsocket.py @@ -201,7 +201,7 @@ class BfxWebsocket(GenericWebsocket): err_string = self.ERRORS[data.get('code', 10000)] err_string = "{} - {}".format(self.ERRORS[data.get('code', 10000)], data.get("msg", "")) - self._emit('error', Exception(err_string)) + self._emit('error', err_string) async def _system_auth_handler(self, data): if data.get('status') == 'FAILED': @@ -310,6 +310,9 @@ class BfxWebsocket(GenericWebsocket): async def _candle_handler(self, data): subscription = self.subscriptionManager.get(data[0]) + # if candle data is empty + if data[1] == []: + return if type(data[1][0]) is list: # Process the batch of seed candles on # websocket subscription @@ -385,7 +388,9 @@ class BfxWebsocket(GenericWebsocket): # enable order book checksums if self.manageOrderBooks: await self.enable_flag(Flags.CHECKSUM) - # resubscribe to any channels + # set any existing subscriptions to not subscribed + self.subscriptionManager.set_all_unsubscribed() + # re-subscribe to existing channels await self.subscriptionManager.resubscribe_all() async def _send_auth_command(self, channel_name, data): @@ -426,8 +431,11 @@ class BfxWebsocket(GenericWebsocket): async def cancel_order(self, *args, **kwargs): return await self.orderManager.cancel_order(*args, **kwargs) + async def cancel_order_group(self, *args, **kwargs): + return await self.orderManager.cancel_order_group(*args, **kwargs) + async def cancel_all_orders(self, *args, **kwargs): return await self.orderManager.cancel_all_orders(*args, **kwargs) async def cancel_order_multi(self, *args, **kwargs): - return await self.cancel_order_multi(*args, **kwargs) + return await self.orderManager.cancel_order_multi(*args, **kwargs) diff --git a/bfxapi/websockets/GenericWebsocket.py b/bfxapi/websockets/GenericWebsocket.py index 0c44f59..8b93ec6 100644 --- a/bfxapi/websockets/GenericWebsocket.py +++ b/bfxapi/websockets/GenericWebsocket.py @@ -38,8 +38,11 @@ class GenericWebsocket: self.loop = loop or asyncio.get_event_loop() self.events = EventEmitter( scheduler=asyncio.ensure_future, loop=self.loop) + # overide 'error' event to stop it raising an exception + # self.events.on('error', self.on_error) self.ws = None self.max_retries = max_retries + self.attempt_retry = True def run(self): """ @@ -67,18 +70,22 @@ class GenericWebsocket: async def _main(self, host): retries = 0 - while retries < self.max_retries: + while retries < self.max_retries and self.attempt_retry: try: await self._connect(host) retries = 0 except (ConnectionClosed, socket.error) as e: + self._emit('disconnected') + if (not self.attempt_retry): + return self.logger.error(str(e)) retries += 1 # wait 5 seconds befor retrying - self.logger.info("Waiting 5 seconds befor retrying...") + self.logger.info("Waiting 5 seconds before retrying...") await asyncio.sleep(5) self.logger.info("Reconnect attempt {}/{}".format(retries, self.max_retries)) self.logger.info("Unable to connect to websocket.") + self._emit('stopped') def remove_all_listeners(self, event): """ @@ -111,13 +118,13 @@ class GenericWebsocket: On websocket error print and fire event """ self.logger.error(error) - self.events.emit('error', error) async def on_close(self): """ - On websocket close print and fire event + On websocket close print and fire event. This is used by the data server. """ self.logger.info("Websocket closed.") + self.attempt_retry = False await self.ws.close() self._emit('done') diff --git a/bfxapi/websockets/OrderManager.py b/bfxapi/websockets/OrderManager.py index 7c97e5a..c7494d3 100644 --- a/bfxapi/websockets/OrderManager.py +++ b/bfxapi/websockets/OrderManager.py @@ -19,9 +19,13 @@ class OrderManager: def __init__(self, bfxapi, logLevel='INFO'): self.bfxapi = bfxapi self.pending_orders = {} - self.pending_callbacks = {} self.closed_orders = {} self.open_orders = {} + + self.pending_order_close_callbacks = {} + self.pending_order_confirm_callbacks = {} + self.pending_update_confirm_callbacks = {} + self.pending_cancel_confirm_callbacks = {} self.logger = CustomLogger('BfxOrderManager', logLevel=logLevel) def get_open_orders(self): @@ -33,30 +37,18 @@ class OrderManager: def get_pending_orders(self): return list(self.pending_orders.values()) - async def _confirm_order(self, order, isClosed=False): - """ - Called every time an order signal has been received. This function - manages the local list of open orders. - """ - if order.cid in self.pending_orders: - await self._execute_confirm_callback(order.cid, order) - if isClosed: - await self._execute_close_callback(order.cid, order) - order.set_confirmed() - # remove from pending orders list - del self.pending_orders[order.cid] - self.bfxapi._emit('order_confirmed', order) - else: - await self._execute_confirm_callback(order.id, order) - if isClosed: - await self._execute_close_callback(order.id, order) - async def confirm_order_closed(self, raw_ws_data): order = Order.from_raw_order(raw_ws_data[2]) order.set_open_state(False) if order.id in self.open_orders: del self.open_orders[order.id] - await self._confirm_order(order, isClosed=True) + if not order.is_confirmed(): + order.set_confirmed() + self.bfxapi._emit('order_confirmed', order) + await self._execute_callback(order, self.pending_order_confirm_callbacks) + await self._execute_callback(order, self.pending_cancel_confirm_callbacks) + await self._execute_callback(order, self.pending_update_confirm_callbacks) + await self._execute_callback(order, self.pending_order_close_callbacks) self.logger.info("Order closed: {} {}".format( order.symbol, order.status)) self.bfxapi._emit('order_closed', order) @@ -77,7 +69,7 @@ class OrderManager: order = Order.from_raw_order(raw_ws_data[2]) order.set_open_state(True) self.open_orders[order.id] = order - await self._confirm_order(order) + await self._execute_callback(order, self.pending_update_confirm_callbacks) self.logger.info("Order update: {}".format(order)) self.bfxapi._emit('order_update', order) @@ -85,7 +77,9 @@ class OrderManager: order = Order.from_raw_order(raw_ws_data[2]) order.set_open_state(True) self.open_orders[order.id] = order - await self._confirm_order(order) + order.set_confirmed() + self.bfxapi._emit('order_confirmed', order) + await self._execute_callback(order, self.pending_order_confirm_callbacks) self.logger.info("Order new: {}".format(order)) self.bfxapi._emit('order_new', order) @@ -96,10 +90,11 @@ class OrderManager: hidden=False, price_trailing=None, price_aux_limit=None, oco_stop_price=None, close=False, reduce_only=False, post_only=False, oco=False, time_in_force=None, - onConfirm=None, onClose=None, *args, **kwargs): + onConfirm=None, onClose=None, gid=None, *args, **kwargs): """ Submit a new order + @param gid: assign the order to a group identitfier @param symbol: the name of the symbol i.e 'tBTCUSD @param price: the price you want to buy/sell at (must be positive) @param amount: order size: how much you want to buy/sell, @@ -133,8 +128,7 @@ class OrderManager: "price": str(price), } # caclulate and add flags - flags = self._calculate_flags( - hidden, close, reduce_only, post_only, oco) + flags = self._calculate_flags(hidden, close, reduce_only, post_only, oco) payload['flags'] = flags # add extra parameters if (price_trailing): @@ -142,19 +136,22 @@ class OrderManager: if (price_aux_limit): payload['price_aux_limit'] = price_aux_limit if (oco_stop_price): - payload['price_oco_stop'] = oco_stop_price + payload['price_oco_stop'] = str(oco_stop_price) if (time_in_force): payload['tif'] = time_in_force + if (gid): + payload['gid'] = gid # submit the order self.pending_orders[cid] = payload - self._create_callback(cid, onConfirm=onConfirm, onClose=onClose) + self._create_callback(cid, onConfirm, self.pending_order_confirm_callbacks) + self._create_callback(cid, onClose, self.pending_order_close_callbacks) await self.bfxapi._send_auth_command('on', payload) self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( cid, symbol, amount, price)) async def update_order(self, orderId, price=None, amount=None, delta=None, price_aux_limit=None, price_trailing=None, hidden=False, close=False, reduce_only=False, - post_only=False, time_in_force=None, onConfirm=None, onClose=None): + post_only=False, time_in_force=None, onConfirm=None): """ Update an existing order @@ -176,7 +173,7 @@ class OrderManager: @param onClose: function called when the bitfinex websocket receives signal that the order was closed due to being filled or cancelled """ - self._create_callback(orderId, onConfirm=onConfirm, onClose=onClose) + self._create_callback(orderId, onConfirm, self.pending_update_confirm_callbacks) payload = {"id": orderId} if price is not None: payload['price'] = str(price) @@ -196,7 +193,7 @@ class OrderManager: await self.bfxapi._send_auth_command('ou', payload) self.logger.info("Update Order order_id={} dispatched".format(orderId)) - async def cancel_order(self, orderId, onConfirm=None, onClose=None): + async def cancel_order(self, orderId, onConfirm=None): """ Cancel an existing open order @@ -207,8 +204,7 @@ class OrderManager: @param onClose: function called when the bitfinex websocket receives signal that the order was closed due to being filled or cancelled """ - # order = self.open_orders[orderId] - self._create_callback(orderId, onConfirm=onConfirm, onClose=onClose) + self._create_callback(orderId, onConfirm, self.pending_cancel_confirm_callbacks) await self.bfxapi._send_auth_command('oc', {'id': orderId}) self.logger.info("Order cancel order_id={} dispatched".format(orderId)) @@ -216,42 +212,55 @@ class OrderManager: """ Cancel all existing open orders - This function closes orders that have been tracked locally by the OrderManager. + This function closes all open orders. """ - ids = [self.open_orders[x].id for x in self.open_orders] - await self.cancel_order_multi(ids) + await self.bfxapi._send_auth_command('oc_multi', { 'all': 1 }) - async def cancel_order_multi(self, orderIds): + async def cancel_order_group(self, gid, onConfirm=None): + """ + Cancel a set of orders using a single group id. + """ + self._create_callback(gid, onConfirm, self.pending_cancel_confirm_callbacks) + await self.bfxapi._send_auth_command('oc_multi', { 'gid': [gid] }) + + async def cancel_order_multi(self, ids=None, gids=None): """ Cancel existing open orders as a batch - @param orderIds: an array of order ids + @param ids: an array of order ids + @param gids: an array of group ids """ - task_batch = [] - for oid in orderIds: - task_batch += [ - asyncio.ensure_future(self.open_orders[oid].close()) - ] - await asyncio.wait(*[task_batch]) + payload = {} + if ids: + payload['id'] = ids + if gids: + payload['gid'] = gids + await self.bfxapi._send_auth_command('oc_multi', payload) - def _create_callback(self, order_identifier, onConfirm=None, onClose=None): - if order_identifier in self.pending_callbacks: - self.pending_callbacks[order_identifier] += [(onClose, onConfirm)] + def _create_callback(self, identifier, func, callback_storage): + if not func: + return + if identifier in callback_storage: + callback_storage[identifier] += [func] else: - self.pending_callbacks[order_identifier] = [(onClose, onConfirm)] + callback_storage[identifier] = [func] - async def _execute_close_callback(self, order_identifier, *args, **kwargs): - if order_identifier in self.pending_callbacks: - for c in self.pending_callbacks[order_identifier]: - if c[0]: - await c[0](*args, **kwargs) - del self.pending_callbacks[order_identifier] - - async def _execute_confirm_callback(self, order_identifier, *args, **kwargs): - if order_identifier in self.pending_callbacks: - for c in self.pending_callbacks[order_identifier]: - if c[1]: - await c[1](*args, **kwargs) + async def _execute_callback(self, order, callback_storage): + idents = [order.id, order.cid, order.gid] + tasks = [] + key = None + for k in callback_storage.keys(): + if k in idents: + print (callback_storage[k]) + key = k + # call all callbacks associated with identifier + for callback in callback_storage[k]: + tasks += [callback(order)] + break + # remove from callbacks + if key: + del callback_storage[key] + await asyncio.gather(*tasks) def _calculate_flags(self, hidden, close, reduce_only, post_only, oco): flags = 0 diff --git a/bfxapi/websockets/SubscriptionManager.py b/bfxapi/websockets/SubscriptionManager.py index 8caf995..93a3d94 100644 --- a/bfxapi/websockets/SubscriptionManager.py +++ b/bfxapi/websockets/SubscriptionManager.py @@ -72,6 +72,13 @@ class SubscriptionManager: def get(self, chan_id): return self.subscriptions_chanid[chan_id] + def set_all_unsubscribed(self): + """ + Sets all f the subscriptions ot state 'unsubscribed' + """ + for sub in self.subscriptions_chanid.values(): + sub.confirm_unsubscribe() + async def unsubscribe(self, chan_id, onComplete=None): """ Unsubscribe from the channel with the given chanId diff --git a/requirements.txt b/requirements.txt index da5244a..53031d4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,6 @@ asyncio==3.4.3 websockets==7.0 pylint==2.2.2 pytest-asyncio==0.6.0 +six==1.12.0 +pyee==5.0.0 +aiohttp==3.4.4