Add and implement wss_timeout in BfxWebsocketClient (and Client).

This commit is contained in:
Davide Casale
2023-04-16 20:04:46 +02:00
parent 7a9a57e878
commit 734375ec9f
4 changed files with 28 additions and 5 deletions

View File

@@ -13,6 +13,7 @@ class Client:
*, *,
rest_host: str = REST_HOST, rest_host: str = REST_HOST,
wss_host: str = WSS_HOST, wss_host: str = WSS_HOST,
wss_timeout: int = 60 * 15,
log_filename: Optional[str] = None, log_filename: Optional[str] = None,
log_level: str = "INFO" log_level: str = "INFO"
): ):
@@ -29,6 +30,7 @@ class Client:
self.wss = BfxWebsocketClient( self.wss = BfxWebsocketClient(
host=wss_host, host=wss_host,
credentials=credentials, credentials=credentials,
wss_timeout=wss_timeout,
log_filename=log_filename, log_filename=log_filename,
log_level=log_level log_level=log_level
) )

View File

@@ -70,7 +70,7 @@ class BfxWebsocketBucket:
if (chan_id := message[0]) and message[1] != _HEARTBEAT: if (chan_id := message[0]) and message[1] != _HEARTBEAT:
self.handler.handle(self.subscriptions[chan_id], *message[1:]) self.handler.handle(self.subscriptions[chan_id], *message[1:])
except websockets.ConnectionClosedError as error: except websockets.ConnectionClosedError as error:
if error.code == 1006: if error.code in (1006, 1012):
self.on_open_event.clear() self.on_open_event.clear()
reconnection = True reconnection = True
continue continue

View File

@@ -13,7 +13,7 @@ from .bfx_websocket_bucket import _HEARTBEAT, F, _require_websocket_connection,
from .bfx_websocket_inputs import BfxWebsocketInputs from .bfx_websocket_inputs import BfxWebsocketInputs
from ..handlers import PublicChannelsHandler, AuthenticatedChannelsHandler from ..handlers import PublicChannelsHandler, AuthenticatedChannelsHandler
from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, \ from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, \
ZeroConnectionsError, OutdatedClientVersion ZeroConnectionsError, ReconnectionTimeoutError, OutdatedClientVersion
from ...utils.json_encoder import JSONEncoder from ...utils.json_encoder import JSONEncoder
@@ -61,10 +61,12 @@ class BfxWebsocketClient:
*AuthenticatedChannelsHandler.EVENTS *AuthenticatedChannelsHandler.EVENTS
] ]
def __init__(self, host, credentials = None, log_filename = None, log_level = "INFO"): def __init__(self, host, credentials = None, wss_timeout = 60 * 15, log_filename = None, log_level = "INFO"):
self.websocket, self.buckets, self.authentication = None, [], False self.websocket, self.buckets, self.authentication = None, [], False
self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter() self.host, self.credentials, self.wss_timeout = host, credentials, wss_timeout
self.event_emitter = AsyncIOEventEmitter()
self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input) self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input)
@@ -102,8 +104,14 @@ class BfxWebsocketClient:
#pylint: disable-next=too-many-statements #pylint: disable-next=too-many-statements
async def __connect(self): async def __connect(self):
Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"]) Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"])
reconnection = Reconnection(status=False, attempts=0, timestamp=None)
delay, timer, on_timeout_event = None, None, asyncio.locks.Event()
reconnection, delay = Reconnection(status=False, attempts=0, timestamp=None), None def _on_timeout():
on_timeout_event.set()
raise ReconnectionTimeoutError("Connection has been offline for too long " \
f"without being able to reconnect (wss_timeout is set to {self.wss_timeout}s).")
async def _connection(): async def _connection():
nonlocal reconnection nonlocal reconnection
@@ -116,6 +124,8 @@ class BfxWebsocketClient:
reconnection = Reconnection(status=False, attempts=0, timestamp=None) reconnection = Reconnection(status=False, attempts=0, timestamp=None)
timer.cancel()
self.websocket, self.authentication = websocket, False self.websocket, self.authentication = websocket, False
if len(self.buckets) == 0 or \ if len(self.buckets) == 0 or \
@@ -158,6 +168,9 @@ class BfxWebsocketClient:
if reconnection.status: if reconnection.status:
await asyncio.sleep(delay.next()) await asyncio.sleep(delay.next())
if on_timeout_event.is_set():
break
try: try:
await _connection() await _connection()
except (websockets.ConnectionClosedError, socket.gaierror) as error: except (websockets.ConnectionClosedError, socket.gaierror) as error:
@@ -172,6 +185,8 @@ class BfxWebsocketClient:
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()) reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now())
timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_timeout)
delay = _Delay(backoff_factor=1.618) delay = _Delay(backoff_factor=1.618)
elif isinstance(error, socket.gaierror) and reconnection.status: elif isinstance(error, socket.gaierror) and reconnection.status:
self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " \ self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " \

View File

@@ -6,6 +6,7 @@ __all__ = [
"ConnectionNotOpen", "ConnectionNotOpen",
"TooManySubscriptions", "TooManySubscriptions",
"ZeroConnectionsError", "ZeroConnectionsError",
"ReconnectionTimeoutError",
"WebsocketAuthenticationRequired", "WebsocketAuthenticationRequired",
"InvalidAuthenticationCredentials", "InvalidAuthenticationCredentials",
"EventNotSupported", "EventNotSupported",
@@ -33,6 +34,11 @@ class ZeroConnectionsError(BfxWebsocketException):
This error indicates an attempt to subscribe to a public channel while the number of connections is 0. This error indicates an attempt to subscribe to a public channel while the number of connections is 0.
""" """
class ReconnectionTimeoutError(BfxWebsocketException):
"""
This error indicates that the connection has been offline for too long without being able to reconnect.
"""
class WebsocketAuthenticationRequired(BfxWebsocketException): class WebsocketAuthenticationRequired(BfxWebsocketException):
""" """
This error indicates an attempt to access a protected resource without logging in first. This error indicates an attempt to access a protected resource without logging in first.