Fix bug about wss_timeout by changing reconnection logic (in BfxWebsocketClient and BfxWebsocketBucket).

This commit is contained in:
Davide Casale
2023-04-16 21:01:36 +02:00
parent 734375ec9f
commit 6d868a8287
2 changed files with 31 additions and 31 deletions

View File

@@ -32,23 +32,12 @@ class BfxWebsocketBucket:
self.handler = PublicChannelsHandler(event_emitter=self.event_emitter) self.handler = PublicChannelsHandler(event_emitter=self.event_emitter)
async def connect(self): async def connect(self):
reconnection = False async def _connection():
async with websockets.connect(self.host) as websocket:
async for websocket in websockets.connect(self.host):
self.websocket = websocket self.websocket = websocket
self.on_open_event.set() self.on_open_event.set()
await self.__recover_state()
if reconnection or (reconnection := False):
for pending in self.pendings:
await self.websocket.send(json.dumps(pending))
for _, subscription in self.subscriptions.items():
await self.subscribe(**subscription)
self.subscriptions.clear()
try:
async for message in websocket: async for message in websocket:
message = json.loads(message) message = json.loads(message)
@@ -69,15 +58,21 @@ class BfxWebsocketBucket:
if isinstance(message, list): if isinstance(message, list):
if (chan_id := message[0]) and message[1] != _HEARTBEAT: if (chan_id := message[0]) and message[1] != _HEARTBEAT:
self.handler.handle(self.subscriptions[chan_id], *message[1:]) self.handler.handle(self.subscriptions[chan_id], *message[1:])
try:
await _connection()
except websockets.ConnectionClosedError as error: except websockets.ConnectionClosedError as error:
if error.code in (1006, 1012): if error.code in (1006, 1012):
self.on_open_event.clear() self.on_open_event.clear()
reconnection = True
continue
raise error async def __recover_state(self):
for pending in self.pendings:
await self.websocket.send(json.dumps(pending))
break for _, subscription in self.subscriptions.items():
await self.subscribe(**subscription)
self.subscriptions.clear()
@_require_websocket_connection @_require_websocket_connection
async def subscribe(self, channel, sub_id=None, **kwargs): async def subscribe(self, channel, sub_id=None, **kwargs):

View File

@@ -61,7 +61,7 @@ class BfxWebsocketClient:
*AuthenticatedChannelsHandler.EVENTS *AuthenticatedChannelsHandler.EVENTS
] ]
def __init__(self, host, credentials = None, wss_timeout = 60 * 15, log_filename = None, log_level = "INFO"): def __init__(self, host, credentials, *, wss_timeout = 60 * 15, log_filename = None, log_level = "INFO"):
self.websocket, self.buckets, self.authentication = None, [], False self.websocket, self.buckets, self.authentication = None, [], False
self.host, self.credentials, self.wss_timeout = host, credentials, wss_timeout self.host, self.credentials, self.wss_timeout = host, credentials, wss_timeout
@@ -97,15 +97,15 @@ class BfxWebsocketClient:
for _ in range(connections): for _ in range(connections):
self.buckets += [BfxWebsocketBucket(self.host, self.event_emitter)] self.buckets += [BfxWebsocketBucket(self.host, self.event_emitter)]
tasks = [ bucket.connect() for bucket in self.buckets ] + [ self.__connect() ] await self.__connect()
await asyncio.gather(*tasks)
#pylint: disable-next=too-many-statements #pylint: disable-next=too-many-statements
async def __connect(self): async def __connect(self):
Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"]) Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"])
reconnection = Reconnection(status=False, attempts=0, timestamp=None) reconnection = Reconnection(status=False, attempts=0, timestamp=None)
delay, timer, on_timeout_event = None, None, asyncio.locks.Event() delay, timer, tasks = None, None, []
on_timeout_event = asyncio.locks.Event()
def _on_timeout(): def _on_timeout():
on_timeout_event.set() on_timeout_event.set()
@@ -114,7 +114,7 @@ class BfxWebsocketClient:
f"without being able to reconnect (wss_timeout is set to {self.wss_timeout}s).") f"without being able to reconnect (wss_timeout is set to {self.wss_timeout}s).")
async def _connection(): async def _connection():
nonlocal reconnection nonlocal reconnection, timer, tasks
async with websockets.connect(self.host) as websocket: async with websockets.connect(self.host) as websocket:
if reconnection.status: if reconnection.status:
@@ -128,6 +128,10 @@ class BfxWebsocketClient:
self.websocket, self.authentication = websocket, False self.websocket, self.authentication = websocket, False
coroutines = [ BfxWebsocketBucket.connect(bucket) for bucket in self.buckets ]
tasks = [ asyncio.create_task(coroutine) for coroutine in coroutines ]
if len(self.buckets) == 0 or \ if len(self.buckets) == 0 or \
(await asyncio.gather(*[bucket.on_open_event.wait() for bucket in self.buckets])): (await asyncio.gather(*[bucket.on_open_event.wait() for bucket in self.buckets])):
self.event_emitter.emit("open") self.event_emitter.emit("open")
@@ -183,10 +187,11 @@ class BfxWebsocketClient:
self.logger.info("WSS server is about to restart, reconnection " \ self.logger.info("WSS server is about to restart, reconnection " \
"required (client received 20051). Attempt in progress...") "required (client received 20051). Attempt in progress...")
for task in tasks:
task.cancel()
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()) reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now())
timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_timeout) timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_timeout)
delay = _Delay(backoff_factor=1.618) delay = _Delay(backoff_factor=1.618)
elif isinstance(error, socket.gaierror) and reconnection.status: elif isinstance(error, socket.gaierror) and reconnection.status:
self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " \ self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " \