diff --git a/bfxapi/websocket/BfxWebsocketClient.py b/bfxapi/websocket/BfxWebsocketClient.py index 5a92e83..07cc8b4 100644 --- a/bfxapi/websocket/BfxWebsocketClient.py +++ b/bfxapi/websocket/BfxWebsocketClient.py @@ -1,10 +1,10 @@ -import json, asyncio, hmac, hashlib, time, websockets +import json, asyncio, hmac, hashlib, time, uuid, websockets from pyee.asyncio import AsyncIOEventEmitter from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler -from .errors import ConnectionNotOpen, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion +from .errors import ConnectionNotOpen, TooManySubscriptions, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion HEARTBEAT = "hb" @@ -77,12 +77,17 @@ class BfxWebsocketClient(object): await self.websocket.send(json.dumps(data)) async def subscribe(self, channel, **kwargs): - counters = [ bucket.count for bucket in self.buckets ] + counters = [ len(bucket.pendings) + len(bucket.chanIds) for bucket in self.buckets ] index = counters.index(min(counters)) await self.buckets[index]._subscribe(channel, **kwargs) + async def unsubscribe(self, chanId): + for bucket in self.buckets: + if chanId in bucket.chanIds.keys(): + await bucket._unsubscribe(chanId=chanId) + def __require_websocket_authentication(function): @_require_websocket_connection async def wrapper(self, *args, **kwargs): @@ -116,10 +121,12 @@ class BfxWebsocketClient(object): return handler class _BfxWebsocketBucket(object): + MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 + def __init__(self, host, event_emitter, __bucket_open_signal): self.host, self.event_emitter, self.__bucket_open_signal = host, event_emitter, __bucket_open_signal - self.websocket, self.chanIds, self.count = None, dict(), 0 + self.websocket, self.chanIds, self.pendings = None, dict(), list() self.handler = PublicChannelsHandler(event_emitter=self.event_emitter) @@ -136,8 +143,8 @@ class _BfxWebsocketBucket(object): if BfxWebsocketClient.VERSION != message["version"]: raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).") elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]): + self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ] self.chanIds[chanId] = message - self.event_emitter.emit("subscribed", message) elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]): if message["status"] == "OK": @@ -148,11 +155,25 @@ class _BfxWebsocketBucket(object): self.handler.handle(self.chanIds[chanId], *message[1:]) @_require_websocket_connection - async def _subscribe(self, channel, **kwargs): - self.count += 1 + async def _subscribe(self, channel, subId=None, **kwargs): + if len(self.chanIds) + len(self.pendings) == _BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT: + raise TooManySubscriptions("The client has reached the maximum number of subscriptions.") - await self.websocket.send(json.dumps({ + subscription = { "event": "subscribe", "channel": channel, + "subId": subId or str(uuid.uuid4()), + **kwargs + } + + self.pendings.append(subscription) + + await self.websocket.send(json.dumps(subscription)) + + @_require_websocket_connection + async def _unsubscribe(self, chanId): + await self.websocket.send(json.dumps({ + "event": "unsubscribe", + "chanId": chanId })) \ No newline at end of file diff --git a/bfxapi/websocket/__init__.py b/bfxapi/websocket/__init__.py index f36a1b0..dee1570 100644 --- a/bfxapi/websocket/__init__.py +++ b/bfxapi/websocket/__init__.py @@ -1,3 +1,3 @@ from .BfxWebsocketClient import BfxWebsocketClient from .handlers import Channels -from .errors import BfxWebsocketException, ConnectionNotOpen, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion \ No newline at end of file +from .errors import BfxWebsocketException, ConnectionNotOpen, TooManySubscriptions, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion \ No newline at end of file diff --git a/bfxapi/websocket/errors.py b/bfxapi/websocket/errors.py index c631cca..3a1f900 100644 --- a/bfxapi/websocket/errors.py +++ b/bfxapi/websocket/errors.py @@ -1,5 +1,6 @@ __all__ = [ "ConnectionNotOpen", + "TooManySubscriptions", "WebsocketAuthenticationRequired", "InvalidAuthenticationCredentials", "EventNotSupported", @@ -20,6 +21,13 @@ class ConnectionNotOpen(BfxWebsocketException): pass +class TooManySubscriptions(BfxWebsocketException): + """ + This error indicates an attempt to subscribe to a public channel after reaching the limit of simultaneous connections. + """ + + pass + class WebsocketAuthenticationRequired(BfxWebsocketException): """ This error indicates an attempt to access a protected resource without logging in first.