diff --git a/bfxapi/client.py b/bfxapi/client.py index db54b73..dd64d06 100644 --- a/bfxapi/client.py +++ b/bfxapi/client.py @@ -13,6 +13,7 @@ class Client: *, rest_host: str = REST_HOST, wss_host: str = WSS_HOST, + wss_timeout: int = 60 * 15, log_filename: Optional[str] = None, log_level: str = "INFO" ): @@ -29,6 +30,7 @@ class Client: self.wss = BfxWebsocketClient( host=wss_host, credentials=credentials, + wss_timeout=wss_timeout, log_filename=log_filename, log_level=log_level ) diff --git a/bfxapi/websocket/client/bfx_websocket_bucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py index 2d5e248..01a117c 100644 --- a/bfxapi/websocket/client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -70,7 +70,7 @@ class BfxWebsocketBucket: if (chan_id := message[0]) and message[1] != _HEARTBEAT: self.handler.handle(self.subscriptions[chan_id], *message[1:]) except websockets.ConnectionClosedError as error: - if error.code == 1006: + if error.code in (1006, 1012): self.on_open_event.clear() reconnection = True continue diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 5071ab6..18747d5 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -13,7 +13,7 @@ from .bfx_websocket_bucket import _HEARTBEAT, F, _require_websocket_connection, from .bfx_websocket_inputs import BfxWebsocketInputs from ..handlers import PublicChannelsHandler, AuthenticatedChannelsHandler from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, \ - ZeroConnectionsError, OutdatedClientVersion + ZeroConnectionsError, ReconnectionTimeoutError, OutdatedClientVersion from ...utils.json_encoder import JSONEncoder @@ -61,10 +61,12 @@ class BfxWebsocketClient: *AuthenticatedChannelsHandler.EVENTS ] - def __init__(self, host, credentials = None, log_filename = None, log_level = "INFO"): + def __init__(self, host, credentials = None, wss_timeout = 60 * 15, log_filename = None, log_level = "INFO"): self.websocket, self.buckets, self.authentication = None, [], False - self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter() + self.host, self.credentials, self.wss_timeout = host, credentials, wss_timeout + + self.event_emitter = AsyncIOEventEmitter() self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input) @@ -102,8 +104,14 @@ class BfxWebsocketClient: #pylint: disable-next=too-many-statements async def __connect(self): Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"]) + reconnection = Reconnection(status=False, attempts=0, timestamp=None) + delay, timer, on_timeout_event = None, None, asyncio.locks.Event() - reconnection, delay = Reconnection(status=False, attempts=0, timestamp=None), None + def _on_timeout(): + on_timeout_event.set() + + raise ReconnectionTimeoutError("Connection has been offline for too long " \ + f"without being able to reconnect (wss_timeout is set to {self.wss_timeout}s).") async def _connection(): nonlocal reconnection @@ -116,6 +124,8 @@ class BfxWebsocketClient: reconnection = Reconnection(status=False, attempts=0, timestamp=None) + timer.cancel() + self.websocket, self.authentication = websocket, False if len(self.buckets) == 0 or \ @@ -158,6 +168,9 @@ class BfxWebsocketClient: if reconnection.status: await asyncio.sleep(delay.next()) + if on_timeout_event.is_set(): + break + try: await _connection() except (websockets.ConnectionClosedError, socket.gaierror) as error: @@ -172,6 +185,8 @@ class BfxWebsocketClient: reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()) + timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_timeout) + delay = _Delay(backoff_factor=1.618) elif isinstance(error, socket.gaierror) and reconnection.status: self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " \ diff --git a/bfxapi/websocket/exceptions.py b/bfxapi/websocket/exceptions.py index e1ff53e..a130831 100644 --- a/bfxapi/websocket/exceptions.py +++ b/bfxapi/websocket/exceptions.py @@ -6,6 +6,7 @@ __all__ = [ "ConnectionNotOpen", "TooManySubscriptions", "ZeroConnectionsError", + "ReconnectionTimeoutError", "WebsocketAuthenticationRequired", "InvalidAuthenticationCredentials", "EventNotSupported", @@ -33,6 +34,11 @@ class ZeroConnectionsError(BfxWebsocketException): This error indicates an attempt to subscribe to a public channel while the number of connections is 0. """ +class ReconnectionTimeoutError(BfxWebsocketException): + """ + This error indicates that the connection has been offline for too long without being able to reconnect. + """ + class WebsocketAuthenticationRequired(BfxWebsocketException): """ This error indicates an attempt to access a protected resource without logging in first.