diff --git a/bfxapi/websocket/client/bfx_websocket_bucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py index b4c573b..90c8d21 100644 --- a/bfxapi/websocket/client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -37,6 +37,8 @@ class BfxWebsocketBucket(object): async for websocket in websockets.connect(self.host): self.websocket = websocket + self.on_open_event.set() + if reconnection == True or (reconnection := False): for pending in self.pendings: await self.websocket.send(json.dumps(pending)) @@ -46,8 +48,6 @@ class BfxWebsocketBucket(object): self.subscriptions.clear() - self.on_open_event.set() - try: async for message in websocket: message = json.loads(message) @@ -63,11 +63,14 @@ class BfxWebsocketBucket(object): self.event_emitter.emit("wss-error", message["code"], message["msg"]) elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT: self.handler.handle(self.subscriptions[chanId], *message[1:]) - except websockets.ConnectionClosedError: - self.on_open_event.clear() - reconnection = True - continue - + except websockets.ConnectionClosedError as error: + if error.code == 1006: + self.on_open_event.clear() + reconnection = True + continue + + raise error + break @_require_websocket_connection @@ -96,4 +99,9 @@ class BfxWebsocketBucket(object): @_require_websocket_connection async def _close(self, code=1000, reason=str()): - await self.websocket.close(code=code, reason=reason) \ No newline at end of file + await self.websocket.close(code=code, reason=reason) + + def _get_chan_id(self, subId): + for subscription in self.subscriptions.values(): + if subscription["subId"] == subId: + return subscription["chanId"] \ No newline at end of file diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 15faf89..64deb0b 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -78,7 +78,7 @@ class BfxWebsocketClient(object): async def __connect(self, credentials = None): Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"]) - reconnection = Reconnection(status=False, attempts=0, timestamp=None) + reconnection, delay = Reconnection(status=False, attempts=0, timestamp=None), None async def _connection(): nonlocal reconnection @@ -137,8 +137,6 @@ class BfxWebsocketClient(object): return (self.__backoff_delay == _Delay.BACKOFF_MIN) \ and self.__initial_delay or self.__backoff_delay - delay = _Delay(backoff_factor=1.618) - while True: if reconnection.status == True: await asyncio.sleep(delay.next()) @@ -151,6 +149,8 @@ class BfxWebsocketClient(object): + "or sent (1006). Attempting to reconnect...") reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()); + + delay = _Delay(backoff_factor=1.618) elif isinstance(error, socket.gaierror) and reconnection.status == True: self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " + f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds." @@ -184,9 +184,9 @@ class BfxWebsocketClient(object): await self.buckets[index]._subscribe(channel, **kwargs) - async def unsubscribe(self, chanId): + async def unsubscribe(self, subId): for bucket in self.buckets: - if chanId in bucket.subscriptions.keys(): + if (chanId := bucket._get_chan_id(subId)): await bucket._unsubscribe(chanId=chanId) async def close(self, code=1000, reason=str()):