diff --git a/bfxapi/websocket/client/bfx_websocket_bucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py index 1f4dcde..3cca1a8 100644 --- a/bfxapi/websocket/client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -23,48 +23,44 @@ class BfxWebSocketBucket: def __init__(self, host, event_emitter, events_per_subscription): self.host, self.event_emitter, self.events_per_subscription = host, event_emitter, events_per_subscription self.websocket, self.subscriptions, self.pendings = None, {}, [] - self.on_open_event = asyncio.locks.Event() + self.condition = asyncio.locks.Condition() self.handler = PublicChannelsHandler(event_emitter=self.event_emitter, \ events_per_subscription=self.events_per_subscription) async def connect(self): - async def _connection(): - async with websockets.connect(self.host) as websocket: - self.websocket = websocket - self.on_open_event.set() - await self.__recover_state() + async with websockets.connect(self.host) as websocket: + self.websocket = websocket - async for message in websocket: - message = json.loads(message) + await self.__recover_state() - if isinstance(message, dict): - if message["event"] == "subscribed" and (chan_id := message["chanId"]): - self.pendings = [ pending \ - for pending in self.pendings if pending["subId"] != message["subId"] ] + async with self.condition: + self.condition.notify() - self.subscriptions[chan_id] = message + async for message in websocket: + message = json.loads(message) - sub_id = message["subId"] + if isinstance(message, dict): + if message["event"] == "subscribed" and (chan_id := message["chanId"]): + self.pendings = [ pending \ + for pending in self.pendings if pending["subId"] != message["subId"] ] - if "subscribed" not in self.events_per_subscription.get(sub_id, []): - self.events_per_subscription.setdefault(sub_id, []).append("subscribed") - self.event_emitter.emit("subscribed", message) - elif message["event"] == "unsubscribed" and (chan_id := message["chanId"]): - if message["status"] == "OK": - del self.subscriptions[chan_id] - elif message["event"] == "error": - self.event_emitter.emit("wss-error", message["code"], message["msg"]) + self.subscriptions[chan_id] = message - if isinstance(message, list): - if (chan_id := message[0]) and message[1] != _HEARTBEAT: - self.handler.handle(self.subscriptions[chan_id], message[1:]) + sub_id = message["subId"] - try: - await _connection() - except websockets.exceptions.ConnectionClosedError as error: - if error.code in (1006, 1012): - self.on_open_event.clear() + if "subscribed" not in self.events_per_subscription.get(sub_id, []): + self.events_per_subscription.setdefault(sub_id, []).append("subscribed") + self.event_emitter.emit("subscribed", message) + elif message["event"] == "unsubscribed" and (chan_id := message["chanId"]): + if message["status"] == "OK": + del self.subscriptions[chan_id] + elif message["event"] == "error": + self.event_emitter.emit("wss-error", message["code"], message["msg"]) + + if isinstance(message, list): + if (chan_id := message[0]) and message[1] != _HEARTBEAT: + self.handler.handle(self.subscriptions[chan_id], message[1:]) async def __recover_state(self): for pending in self.pendings: @@ -107,3 +103,9 @@ class BfxWebSocketBucket: for subscription in self.subscriptions.values(): if subscription["subId"] == sub_id: return subscription["chanId"] + + async def wait(self): + async with self.condition: + await self.condition.wait_for( + lambda: self.websocket is not None and \ + self.websocket.open) diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 2a5a935..77ff235 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -138,7 +138,7 @@ class BfxWebSocketClient: tasks = [ asyncio.create_task(coroutine) for coroutine in coroutines ] if len(self.buckets) == 0 or \ - (await asyncio.gather(*[bucket.on_open_event.wait() for bucket in self.buckets])): + (await asyncio.gather(*[bucket.wait() for bucket in self.buckets])): self.event_emitter.emit("open") if self.credentials: @@ -184,34 +184,34 @@ class BfxWebSocketClient: try: await _connection() except (websockets.exceptions.ConnectionClosedError, socket.gaierror) as error: - if isinstance(error, websockets.exceptions.ConnectionClosedError): - if error.code in (1006, 1012): - if error.code == 1006: - self.logger.error("Connection lost: no close frame received " \ - "or sent (1006). Trying to reconnect...") + for task in tasks: + task.cancel() - if error.code == 1012: - self.logger.info("WSS server is about to restart, clients need " \ - "to reconnect (server sent 20051). Reconnection attempt in progress...") + if isinstance(error, websockets.exceptions.ConnectionClosedError) and error.code in (1006, 1012): + if error.code == 1006: + self.logger.error("Connection lost: no close frame received " \ + "or sent (1006). Trying to reconnect...") - for task in tasks: - task.cancel() + if error.code == 1012: + self.logger.info("WSS server is about to restart, clients need " \ + "to reconnect (server sent 20051). Reconnection attempt in progress...") - reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()) + reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()) - if self.wss_timeout is not None: - timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_wss_timeout) + if self.wss_timeout is not None: + timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_wss_timeout) - delay = _Delay(backoff_factor=1.618) + delay = _Delay(backoff_factor=1.618) - self.authentication = False + self.authentication = False elif isinstance(error, socket.gaierror) and reconnection.status: self.logger.warning(f"Reconnection attempt was unsuccessful (no.{reconnection.attempts}). " \ f"Next reconnection attempt in {delay.peek():.2f} seconds. (at the moment " \ f"the client has been offline for {datetime.now() - reconnection.timestamp})") reconnection = reconnection._replace(attempts=reconnection.attempts + 1) - else: raise error + else: + raise error if not reconnection.status: self.event_emitter.emit("disconnection",