From b8a5bcb5157874f4250c89bb73b105fbbb3be4a4 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Thu, 16 Feb 2023 20:08:05 +0100 Subject: [PATCH] Fix bugs and rewrite regions of new reconnection system. --- bfxapi/client.py | 12 ++- .../websocket/client/bfx_websocket_bucket.py | 7 +- .../websocket/client/bfx_websocket_client.py | 97 +++++++++++++------ 3 files changed, 83 insertions(+), 33 deletions(-) diff --git a/bfxapi/client.py b/bfxapi/client.py index ec72fb3..84b9d76 100644 --- a/bfxapi/client.py +++ b/bfxapi/client.py @@ -1,7 +1,7 @@ from .rest import BfxRestInterface from .websocket import BfxWebsocketClient -from typing import Optional +from typing import List, Optional from enum import Enum @@ -21,8 +21,15 @@ class Client(object): WSS_HOST: str = Constants.WSS_HOST, API_KEY: Optional[str] = None, API_SECRET: Optional[str] = None, + filter: Optional[List[str]] = None, log_level: str = "WARNING" ): + credentials = { + "API_KEY": API_KEY, + "API_SECRET": API_SECRET, + "filter": filter + } + self.rest = BfxRestInterface( host=REST_HOST, API_KEY=API_KEY, @@ -31,7 +38,6 @@ class Client(object): self.wss = BfxWebsocketClient( host=WSS_HOST, - API_KEY=API_KEY, - API_SECRET=API_SECRET, + credentials=credentials, log_level=log_level ) \ No newline at end of file diff --git a/bfxapi/websocket/client/bfx_websocket_bucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py index 15caffc..b4c573b 100644 --- a/bfxapi/websocket/client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -63,9 +63,12 @@ class BfxWebsocketBucket(object): self.event_emitter.emit("wss-error", message["code"], message["msg"]) elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT: self.handler.handle(self.subscriptions[chanId], *message[1:]) - except websockets.ConnectionClosedError: self.on_open_event.clear(); reconnection = True; continue + except websockets.ConnectionClosedError: + self.on_open_event.clear() + reconnection = True + continue - await self.websocket.wait_closed(); break + break @_require_websocket_connection async def _subscribe(self, channel, subId=None, **kwargs): diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 8f2a54d..15faf89 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -1,4 +1,4 @@ -import traceback, json, asyncio, hmac, hashlib, time, websockets +import traceback, json, asyncio, hmac, hashlib, time, websockets, socket, random from typing import cast @@ -38,10 +38,10 @@ class BfxWebsocketClient(object): *AuthenticatedChannelsHandler.EVENTS ] - def __init__(self, host, API_KEY = None, API_SECRET = None, filter = None, log_level = "WARNING"): - self.host, self.websocket, self.event_emitter = host, None, AsyncIOEventEmitter() + def __init__(self, host, credentials = None, log_level = "WARNING"): + self.websocket = None - self.API_KEY, self.API_SECRET, self.filter = API_KEY, API_SECRET, filter + self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter() self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input) @@ -71,33 +71,35 @@ class BfxWebsocketClient(object): tasks = [ bucket._connect(index) for index, bucket in enumerate(self.buckets) ] - tasks.append(self.__connect(self.API_KEY, self.API_SECRET, self.filter)) + tasks.append(self.__connect(self.credentials)) await asyncio.gather(*tasks) - async def __connect(self, API_KEY, API_SECRET, filter=None): - Reconnection = namedtuple("Reconnection", ["status", "code", "timestamp"]) + async def __connect(self, credentials = None): + Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"]) - reconnection = Reconnection(status=False, code=0, timestamp=None) + reconnection = Reconnection(status=False, attempts=0, timestamp=None) - async for websocket in websockets.connect(self.host): - self.websocket, self.authentication = websocket, False + async def _connection(): + nonlocal reconnection - if (await asyncio.gather(*[ on_open_event.wait() for on_open_event in self.on_open_events ])): - self.event_emitter.emit("open") + async with websockets.connect(self.host) as websocket: + if reconnection.status == True: + self.logger.info(f"Reconnect attempt successful (attempt N°{reconnection.attempts}): The " + + f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " + + f"(first reconnection attempt: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).") - if self.API_KEY != None and self.API_SECRET != None: - await self.__authenticate(API_KEY=API_KEY, API_SECRET=API_SECRET, filter=filter) + reconnection = Reconnection(status=False, attempts=0, timestamp=None) + + self.websocket, self.authentication = websocket, False + + if await asyncio.gather(*[on_open_event.wait() for on_open_event in self.on_open_events]): + self.event_emitter.emit("open") + + if self.credentials != None: + await self.__authenticate(**self.credentials) - try: async for message in websocket: - if reconnection.status == True: - self.logger.warning(f"Reconnect Attempt Successful (error <{reconnection.code}>): The " + - f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " + - f"(first reconnection attempt: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).") - - reconnection = Reconnection(status=False, code=0, timestamp=None) - message = json.loads(message) if isinstance(message, dict) and message["event"] == "info" and "version" in message: @@ -113,13 +115,52 @@ class BfxWebsocketClient(object): self.event_emitter.emit("wss-error", message["code"], message["msg"]) elif isinstance(message, list) and (chanId := message[0]) == 0 and message[1] != _HEARTBEAT: self.handler.handle(message[1], message[2]) - except websockets.ConnectionClosedError as error: - self.logger.error(f"Connection terminated due to an error (status code: <{error.code}>) -> {str(error)}. Attempting to reconnect...") - reconnection = Reconnection(status=True, code=error.code, timestamp=datetime.now()); - continue - + + class _Delay: + BACKOFF_MIN, BACKOFF_MAX = 1.92, 60.0 + + BACKOFF_INITIAL = 5.0 + + def __init__(self, backoff_factor): + self.__backoff_factor = backoff_factor + self.__backoff_delay = _Delay.BACKOFF_MIN + self.__initial_delay = random.random() * _Delay.BACKOFF_INITIAL + + def next(self): + backoff_delay = self.peek() + + __backoff_delay = self.__backoff_delay * self.__backoff_factor + self.__backoff_delay = min(__backoff_delay, _Delay.BACKOFF_MAX) + return backoff_delay + + def peek(self): + return (self.__backoff_delay == _Delay.BACKOFF_MIN) \ + and self.__initial_delay or self.__backoff_delay + + delay = _Delay(backoff_factor=1.618) + + while True: + if reconnection.status == True: + await asyncio.sleep(delay.next()) + + try: + await _connection() + except (websockets.ConnectionClosedError, socket.gaierror) as error: + if isinstance(error, websockets.ConnectionClosedError) and error.code == 1006: + self.logger.error("Connection lost: no close frame received " + + "or sent (1006). Attempting to reconnect...") + + reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()); + elif isinstance(error, socket.gaierror) and reconnection.status == True: + self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " + + f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds." + + f"(at the moment the client has been offline for {datetime.now() - reconnection.timestamp})") + + reconnection = reconnection._replace(attempts=reconnection.attempts + 1) + else: raise error + if reconnection.status == False: - await self.websocket.wait_closed(); break + break async def __authenticate(self, API_KEY, API_SECRET, filter=None): data = { "event": "auth", "filter": filter, "apiKey": API_KEY }