diff --git a/bfxapi/websocket/client/bfx_websocket_bucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py index 01a117c..035abf2 100644 --- a/bfxapi/websocket/client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -32,23 +32,12 @@ class BfxWebsocketBucket: self.handler = PublicChannelsHandler(event_emitter=self.event_emitter) async def connect(self): - reconnection = False + async def _connection(): + async with websockets.connect(self.host) as websocket: + self.websocket = websocket + self.on_open_event.set() + await self.__recover_state() - async for websocket in websockets.connect(self.host): - self.websocket = websocket - - self.on_open_event.set() - - if reconnection or (reconnection := False): - for pending in self.pendings: - await self.websocket.send(json.dumps(pending)) - - for _, subscription in self.subscriptions.items(): - await self.subscribe(**subscription) - - self.subscriptions.clear() - - try: async for message in websocket: message = json.loads(message) @@ -69,15 +58,21 @@ class BfxWebsocketBucket: if isinstance(message, list): if (chan_id := message[0]) and message[1] != _HEARTBEAT: self.handler.handle(self.subscriptions[chan_id], *message[1:]) - except websockets.ConnectionClosedError as error: - if error.code in (1006, 1012): - self.on_open_event.clear() - reconnection = True - continue - raise error + try: + await _connection() + except websockets.ConnectionClosedError as error: + if error.code in (1006, 1012): + self.on_open_event.clear() - break + async def __recover_state(self): + for pending in self.pendings: + await self.websocket.send(json.dumps(pending)) + + for _, subscription in self.subscriptions.items(): + await self.subscribe(**subscription) + + self.subscriptions.clear() @_require_websocket_connection async def subscribe(self, channel, sub_id=None, **kwargs): diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 18747d5..58cb817 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -61,7 +61,7 @@ class BfxWebsocketClient: *AuthenticatedChannelsHandler.EVENTS ] - def __init__(self, host, credentials = None, wss_timeout = 60 * 15, log_filename = None, log_level = "INFO"): + def __init__(self, host, credentials, *, wss_timeout = 60 * 15, log_filename = None, log_level = "INFO"): self.websocket, self.buckets, self.authentication = None, [], False self.host, self.credentials, self.wss_timeout = host, credentials, wss_timeout @@ -97,15 +97,15 @@ class BfxWebsocketClient: for _ in range(connections): self.buckets += [BfxWebsocketBucket(self.host, self.event_emitter)] - tasks = [ bucket.connect() for bucket in self.buckets ] + [ self.__connect() ] - - await asyncio.gather(*tasks) + await self.__connect() #pylint: disable-next=too-many-statements async def __connect(self): Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"]) reconnection = Reconnection(status=False, attempts=0, timestamp=None) - delay, timer, on_timeout_event = None, None, asyncio.locks.Event() + delay, timer, tasks = None, None, [] + + on_timeout_event = asyncio.locks.Event() def _on_timeout(): on_timeout_event.set() @@ -114,7 +114,7 @@ class BfxWebsocketClient: f"without being able to reconnect (wss_timeout is set to {self.wss_timeout}s).") async def _connection(): - nonlocal reconnection + nonlocal reconnection, timer, tasks async with websockets.connect(self.host) as websocket: if reconnection.status: @@ -128,6 +128,10 @@ class BfxWebsocketClient: self.websocket, self.authentication = websocket, False + coroutines = [ BfxWebsocketBucket.connect(bucket) for bucket in self.buckets ] + + tasks = [ asyncio.create_task(coroutine) for coroutine in coroutines ] + if len(self.buckets) == 0 or \ (await asyncio.gather(*[bucket.on_open_event.wait() for bucket in self.buckets])): self.event_emitter.emit("open") @@ -183,10 +187,11 @@ class BfxWebsocketClient: self.logger.info("WSS server is about to restart, reconnection " \ "required (client received 20051). Attempt in progress...") + for task in tasks: + task.cancel() + reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()) - timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_timeout) - delay = _Delay(backoff_factor=1.618) elif isinstance(error, socket.gaierror) and reconnection.status: self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " \