diff --git a/bfxapi/websockets/BfxWebsocket.py b/bfxapi/websockets/BfxWebsocket.py index b8bf4a6..37fca9a 100644 --- a/bfxapi/websockets/BfxWebsocket.py +++ b/bfxapi/websockets/BfxWebsocket.py @@ -201,7 +201,7 @@ class BfxWebsocket(GenericWebsocket): err_string = self.ERRORS[data.get('code', 10000)] err_string = "{} - {}".format(self.ERRORS[data.get('code', 10000)], data.get("msg", "")) - self._emit('error', Exception(err_string)) + self._emit('error', err_string) async def _system_auth_handler(self, data): if data.get('status') == 'FAILED': @@ -385,7 +385,9 @@ class BfxWebsocket(GenericWebsocket): # enable order book checksums if self.manageOrderBooks: await self.enable_flag(Flags.CHECKSUM) - # resubscribe to any channels + # set any existing subscriptions to not subscribed + self.subscriptionManager.set_all_unsubscribed() + # re-subscribe to existing channels await self.subscriptionManager.resubscribe_all() async def _send_auth_command(self, channel_name, data): diff --git a/bfxapi/websockets/GenericWebsocket.py b/bfxapi/websockets/GenericWebsocket.py index 0c44f59..97af9f5 100644 --- a/bfxapi/websockets/GenericWebsocket.py +++ b/bfxapi/websockets/GenericWebsocket.py @@ -38,6 +38,8 @@ class GenericWebsocket: self.loop = loop or asyncio.get_event_loop() self.events = EventEmitter( scheduler=asyncio.ensure_future, loop=self.loop) + # overide 'error' event to stop it raising an exception + self.events.on('error', self.on_error) self.ws = None self.max_retries = max_retries @@ -72,6 +74,7 @@ class GenericWebsocket: await self._connect(host) retries = 0 except (ConnectionClosed, socket.error) as e: + self._emit('disconnected') self.logger.error(str(e)) retries += 1 # wait 5 seconds befor retrying @@ -79,6 +82,7 @@ class GenericWebsocket: await asyncio.sleep(5) self.logger.info("Reconnect attempt {}/{}".format(retries, self.max_retries)) self.logger.info("Unable to connect to websocket.") + self._emit('stopped') def remove_all_listeners(self, event): """ @@ -111,11 +115,10 @@ class GenericWebsocket: On websocket error print and fire event """ self.logger.error(error) - self.events.emit('error', error) async def on_close(self): """ - On websocket close print and fire event + On websocket close print and fire event. This is used by the data server. """ self.logger.info("Websocket closed.") await self.ws.close() diff --git a/bfxapi/websockets/SubscriptionManager.py b/bfxapi/websockets/SubscriptionManager.py index 8caf995..93a3d94 100644 --- a/bfxapi/websockets/SubscriptionManager.py +++ b/bfxapi/websockets/SubscriptionManager.py @@ -72,6 +72,13 @@ class SubscriptionManager: def get(self, chan_id): return self.subscriptions_chanid[chan_id] + def set_all_unsubscribed(self): + """ + Sets all f the subscriptions ot state 'unsubscribed' + """ + for sub in self.subscriptions_chanid.values(): + sub.confirm_unsubscribe() + async def unsubscribe(self, chan_id, onComplete=None): """ Unsubscribe from the channel with the given chanId