diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index ba2f70c..60445b3 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -56,7 +56,7 @@ class BfxWebsocketClient: MAXIMUM_CONNECTIONS_AMOUNT = 20 EVENTS = [ - "open", "subscribed", "authenticated", "wss-error", + "open", "subscribed", "authenticated", "disconnection", "wss-error", *PublicChannelsHandler.EVENTS, *AuthenticatedChannelsHandler.EVENTS ] @@ -68,10 +68,10 @@ class BfxWebsocketClient: self.event_emitter = AsyncIOEventEmitter() - self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input) - self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter) + self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input) + if log_filename is None: self.logger = ColorLogger("BfxWebsocketClient", level=log_level) else: self.logger = FileLogger("BfxWebsocketClient", level=log_level, filename=log_filename) @@ -104,7 +104,7 @@ class BfxWebsocketClient: Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"]) reconnection = Reconnection(status=False, attempts=0, timestamp=None) delay = _Delay(backoff_factor=1.618) - + timer, tasks, on_timeout_event = None, [], asyncio.locks.Event() def _on_wss_timeout(): @@ -178,21 +178,22 @@ class BfxWebsocketClient: try: await _connection() except (websockets.exceptions.ConnectionClosedError, socket.gaierror) as error: - if isinstance(error, websockets.exceptions.ConnectionClosedError) and error.code in (1006, 1012): - if error.code == 1006: - self.logger.error("Connection lost: no close frame received " \ - "or sent (1006). Attempting to reconnect...") + if isinstance(error, websockets.exceptions.ConnectionClosedError): + if error.code in (1006, 1012): + if error.code == 1006: + self.logger.error("Connection lost: no close frame received " \ + "or sent (1006). Attempting to reconnect...") - if error.code == 1012: - self.logger.info("WSS server is about to restart, reconnection " \ - "required (client received 20051). Attempt in progress...") + if error.code == 1012: + self.logger.info("WSS server is about to restart, reconnection " \ + "required (client received 20051). Attempt in progress...") - for task in tasks: - task.cancel() + for task in tasks: + task.cancel() - reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()) + reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()) - timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_wss_timeout) + timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_wss_timeout) elif isinstance(error, socket.gaierror) and reconnection.status: self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " \ f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds. (at the moment " \ @@ -202,6 +203,9 @@ class BfxWebsocketClient: else: raise error if not reconnection.status: + self.event_emitter.emit("disconnection", + self.websocket.close_code, self.websocket.close_reason) + break async def __authenticate(self, api_key, api_secret, filters=None):