From 133db74a72babc069eef2e312d16f1ba9d9d6342 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Mon, 16 Oct 2023 04:45:47 +0200 Subject: [PATCH] Add automatic deletion for buckets that reach zero subscriptions (e.g. after a call to BfxWebSocketClient::unsubscribe). --- bfxapi/websocket/_client/bfx_websocket_bucket.py | 14 ++++++++------ bfxapi/websocket/_client/bfx_websocket_client.py | 15 ++++++++++++++- bfxapi/websocket/exceptions.py | 5 +++++ 3 files changed, 27 insertions(+), 7 deletions(-) diff --git a/bfxapi/websocket/_client/bfx_websocket_bucket.py b/bfxapi/websocket/_client/bfx_websocket_bucket.py index 2fb1670..753bf73 100644 --- a/bfxapi/websocket/_client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/_client/bfx_websocket_bucket.py @@ -44,6 +44,11 @@ class BfxWebSocketBucket(Connection): return self.count == \ BfxWebSocketBucket.__MAXIMUM_SUBSCRIPTIONS_AMOUNT + @property + def ids(self) -> List[str]: + return [ pending["subId"] for pending in self.__pendings ] + \ + [ subscription["sub_id"] for subscription in self.__subscriptions.values() ] + async def start(self) -> None: async with websockets.client.connect(self._host) as websocket: self._websocket = websocket @@ -59,11 +64,6 @@ class BfxWebSocketBucket(Connection): if isinstance(message, dict): if message["event"] == "subscribed": self.__on_subscribed(message) - elif message["event"] == "unsubscribed": - if message["status"] == "OK": - chan_id = cast(int, message["chan_id"]) - - del self.__subscriptions[chan_id] if isinstance(message, list): if (chan_id := cast(int, message[0])) and \ @@ -117,12 +117,14 @@ class BfxWebSocketBucket(Connection): @Connection.require_websocket_connection async def unsubscribe(self, sub_id: str) -> None: - for chan_id, subscription in self.__subscriptions.items(): + for chan_id, subscription in list(self.__subscriptions.items()): if subscription["sub_id"] == sub_id: unsubscription = { "event": "unsubscribe", "chanId": chan_id } + del self.__subscriptions[chan_id] + await self._websocket.send(message = \ json.dumps(unsubscription)) diff --git a/bfxapi/websocket/_client/bfx_websocket_client.py b/bfxapi/websocket/_client/bfx_websocket_client.py index 1250b85..82d2801 100644 --- a/bfxapi/websocket/_client/bfx_websocket_client.py +++ b/bfxapi/websocket/_client/bfx_websocket_client.py @@ -30,7 +30,8 @@ from bfxapi.websocket.exceptions import \ ReconnectionTimeoutError, \ VersionMismatchError, \ UnknownChannelError, \ - UnknownSubscriptionError + UnknownSubscriptionError, \ + SubIdError from .bfx_websocket_bucket import BfxWebSocketBucket @@ -265,6 +266,11 @@ class BfxWebSocketClient(Connection): raise UnknownChannelError("Available channels are: " + \ "ticker, trades, book, candles and status.") + for bucket in self.__buckets: + if sub_id in bucket.ids: + raise SubIdError("sub_id must be " + \ + "unique for all subscriptions.") + for bucket in self.__buckets: if not bucket.is_full: return await bucket.subscribe( \ @@ -277,8 +283,15 @@ class BfxWebSocketClient(Connection): @Connection.require_websocket_connection async def unsubscribe(self, sub_id: str) -> None: + # pylint: disable-next=consider-using-dict-items for bucket in self.__buckets: if bucket.has(sub_id): + if bucket.count == 1: + del self.__buckets[bucket] + + return await bucket.close( \ + code=1001, reason="Going Away") + return await bucket.unsubscribe(sub_id) raise UnknownSubscriptionError("Unable to find " + \ diff --git a/bfxapi/websocket/exceptions.py b/bfxapi/websocket/exceptions.py index e7019ab..e662cbf 100644 --- a/bfxapi/websocket/exceptions.py +++ b/bfxapi/websocket/exceptions.py @@ -40,3 +40,8 @@ class VersionMismatchError(BfxBaseException): """ This error indicates a mismatch between the client version and the server WSS version. """ + +class SubIdError(BfxBaseException): + """ + Thrown when a user attempts to open more than one subscription using the same sub_id. + """