Add automatic deletion for buckets that reach zero subscriptions (e.g. after a call to BfxWebSocketClient::unsubscribe).

This commit is contained in:
Davide Casale
2023-10-16 04:45:47 +02:00
parent e5ec94b757
commit 133db74a72
3 changed files with 27 additions and 7 deletions

View File

@@ -44,6 +44,11 @@ class BfxWebSocketBucket(Connection):
return self.count == \ return self.count == \
BfxWebSocketBucket.__MAXIMUM_SUBSCRIPTIONS_AMOUNT BfxWebSocketBucket.__MAXIMUM_SUBSCRIPTIONS_AMOUNT
@property
def ids(self) -> List[str]:
return [ pending["subId"] for pending in self.__pendings ] + \
[ subscription["sub_id"] for subscription in self.__subscriptions.values() ]
async def start(self) -> None: async def start(self) -> None:
async with websockets.client.connect(self._host) as websocket: async with websockets.client.connect(self._host) as websocket:
self._websocket = websocket self._websocket = websocket
@@ -59,11 +64,6 @@ class BfxWebSocketBucket(Connection):
if isinstance(message, dict): if isinstance(message, dict):
if message["event"] == "subscribed": if message["event"] == "subscribed":
self.__on_subscribed(message) self.__on_subscribed(message)
elif message["event"] == "unsubscribed":
if message["status"] == "OK":
chan_id = cast(int, message["chan_id"])
del self.__subscriptions[chan_id]
if isinstance(message, list): if isinstance(message, list):
if (chan_id := cast(int, message[0])) and \ if (chan_id := cast(int, message[0])) and \
@@ -117,12 +117,14 @@ class BfxWebSocketBucket(Connection):
@Connection.require_websocket_connection @Connection.require_websocket_connection
async def unsubscribe(self, sub_id: str) -> None: async def unsubscribe(self, sub_id: str) -> None:
for chan_id, subscription in self.__subscriptions.items(): for chan_id, subscription in list(self.__subscriptions.items()):
if subscription["sub_id"] == sub_id: if subscription["sub_id"] == sub_id:
unsubscription = { unsubscription = {
"event": "unsubscribe", "event": "unsubscribe",
"chanId": chan_id } "chanId": chan_id }
del self.__subscriptions[chan_id]
await self._websocket.send(message = \ await self._websocket.send(message = \
json.dumps(unsubscription)) json.dumps(unsubscription))

View File

@@ -30,7 +30,8 @@ from bfxapi.websocket.exceptions import \
ReconnectionTimeoutError, \ ReconnectionTimeoutError, \
VersionMismatchError, \ VersionMismatchError, \
UnknownChannelError, \ UnknownChannelError, \
UnknownSubscriptionError UnknownSubscriptionError, \
SubIdError
from .bfx_websocket_bucket import BfxWebSocketBucket from .bfx_websocket_bucket import BfxWebSocketBucket
@@ -265,6 +266,11 @@ class BfxWebSocketClient(Connection):
raise UnknownChannelError("Available channels are: " + \ raise UnknownChannelError("Available channels are: " + \
"ticker, trades, book, candles and status.") "ticker, trades, book, candles and status.")
for bucket in self.__buckets:
if sub_id in bucket.ids:
raise SubIdError("sub_id must be " + \
"unique for all subscriptions.")
for bucket in self.__buckets: for bucket in self.__buckets:
if not bucket.is_full: if not bucket.is_full:
return await bucket.subscribe( \ return await bucket.subscribe( \
@@ -277,8 +283,15 @@ class BfxWebSocketClient(Connection):
@Connection.require_websocket_connection @Connection.require_websocket_connection
async def unsubscribe(self, sub_id: str) -> None: async def unsubscribe(self, sub_id: str) -> None:
# pylint: disable-next=consider-using-dict-items
for bucket in self.__buckets: for bucket in self.__buckets:
if bucket.has(sub_id): if bucket.has(sub_id):
if bucket.count == 1:
del self.__buckets[bucket]
return await bucket.close( \
code=1001, reason="Going Away")
return await bucket.unsubscribe(sub_id) return await bucket.unsubscribe(sub_id)
raise UnknownSubscriptionError("Unable to find " + \ raise UnknownSubscriptionError("Unable to find " + \

View File

@@ -40,3 +40,8 @@ class VersionMismatchError(BfxBaseException):
""" """
This error indicates a mismatch between the client version and the server WSS version. This error indicates a mismatch between the client version and the server WSS version.
""" """
class SubIdError(BfxBaseException):
"""
Thrown when a user attempts to open more than one subscription using the same sub_id.
"""