Emit on_disconnection event on connection close in BfxWebsocketClient.

This commit is contained in:
Davide Casale
2023-04-19 03:53:33 +02:00
parent 986aa525d7
commit 3441d2af2f

View File

@@ -56,7 +56,7 @@ class BfxWebsocketClient:
MAXIMUM_CONNECTIONS_AMOUNT = 20
EVENTS = [
"open", "subscribed", "authenticated", "wss-error",
"open", "subscribed", "authenticated", "disconnection", "wss-error",
*PublicChannelsHandler.EVENTS,
*AuthenticatedChannelsHandler.EVENTS
]
@@ -68,10 +68,10 @@ class BfxWebsocketClient:
self.event_emitter = AsyncIOEventEmitter()
self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input)
self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter)
self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input)
if log_filename is None:
self.logger = ColorLogger("BfxWebsocketClient", level=log_level)
else: self.logger = FileLogger("BfxWebsocketClient", level=log_level, filename=log_filename)
@@ -104,7 +104,7 @@ class BfxWebsocketClient:
Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"])
reconnection = Reconnection(status=False, attempts=0, timestamp=None)
delay = _Delay(backoff_factor=1.618)
timer, tasks, on_timeout_event = None, [], asyncio.locks.Event()
def _on_wss_timeout():
@@ -178,21 +178,22 @@ class BfxWebsocketClient:
try:
await _connection()
except (websockets.exceptions.ConnectionClosedError, socket.gaierror) as error:
if isinstance(error, websockets.exceptions.ConnectionClosedError) and error.code in (1006, 1012):
if error.code == 1006:
self.logger.error("Connection lost: no close frame received " \
"or sent (1006). Attempting to reconnect...")
if isinstance(error, websockets.exceptions.ConnectionClosedError):
if error.code in (1006, 1012):
if error.code == 1006:
self.logger.error("Connection lost: no close frame received " \
"or sent (1006). Attempting to reconnect...")
if error.code == 1012:
self.logger.info("WSS server is about to restart, reconnection " \
"required (client received 20051). Attempt in progress...")
if error.code == 1012:
self.logger.info("WSS server is about to restart, reconnection " \
"required (client received 20051). Attempt in progress...")
for task in tasks:
task.cancel()
for task in tasks:
task.cancel()
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now())
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now())
timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_wss_timeout)
timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_wss_timeout)
elif isinstance(error, socket.gaierror) and reconnection.status:
self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " \
f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds. (at the moment " \
@@ -202,6 +203,9 @@ class BfxWebsocketClient:
else: raise error
if not reconnection.status:
self.event_emitter.emit("disconnection",
self.websocket.close_code, self.websocket.close_reason)
break
async def __authenticate(self, api_key, api_secret, filters=None):