Add pendings in _BfxWebsocketBucket. Add new logic for selecting the bucket with less connections. Add ._unsubscribe coroutine.

This commit is contained in:
Davide Casale
2022-11-22 17:21:21 +01:00
parent 2f561a4fba
commit 721e82b86d
3 changed files with 38 additions and 9 deletions

View File

@@ -1,10 +1,10 @@
import json, asyncio, hmac, hashlib, time, websockets import json, asyncio, hmac, hashlib, time, uuid, websockets
from pyee.asyncio import AsyncIOEventEmitter from pyee.asyncio import AsyncIOEventEmitter
from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler
from .errors import ConnectionNotOpen, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion from .errors import ConnectionNotOpen, TooManySubscriptions, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion
HEARTBEAT = "hb" HEARTBEAT = "hb"
@@ -77,12 +77,17 @@ class BfxWebsocketClient(object):
await self.websocket.send(json.dumps(data)) await self.websocket.send(json.dumps(data))
async def subscribe(self, channel, **kwargs): async def subscribe(self, channel, **kwargs):
counters = [ bucket.count for bucket in self.buckets ] counters = [ len(bucket.pendings) + len(bucket.chanIds) for bucket in self.buckets ]
index = counters.index(min(counters)) index = counters.index(min(counters))
await self.buckets[index]._subscribe(channel, **kwargs) await self.buckets[index]._subscribe(channel, **kwargs)
async def unsubscribe(self, chanId):
for bucket in self.buckets:
if chanId in bucket.chanIds.keys():
await bucket._unsubscribe(chanId=chanId)
def __require_websocket_authentication(function): def __require_websocket_authentication(function):
@_require_websocket_connection @_require_websocket_connection
async def wrapper(self, *args, **kwargs): async def wrapper(self, *args, **kwargs):
@@ -116,10 +121,12 @@ class BfxWebsocketClient(object):
return handler return handler
class _BfxWebsocketBucket(object): class _BfxWebsocketBucket(object):
MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25
def __init__(self, host, event_emitter, __bucket_open_signal): def __init__(self, host, event_emitter, __bucket_open_signal):
self.host, self.event_emitter, self.__bucket_open_signal = host, event_emitter, __bucket_open_signal self.host, self.event_emitter, self.__bucket_open_signal = host, event_emitter, __bucket_open_signal
self.websocket, self.chanIds, self.count = None, dict(), 0 self.websocket, self.chanIds, self.pendings = None, dict(), list()
self.handler = PublicChannelsHandler(event_emitter=self.event_emitter) self.handler = PublicChannelsHandler(event_emitter=self.event_emitter)
@@ -136,8 +143,8 @@ class _BfxWebsocketBucket(object):
if BfxWebsocketClient.VERSION != message["version"]: if BfxWebsocketClient.VERSION != message["version"]:
raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).") raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).")
elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]): elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]):
self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ]
self.chanIds[chanId] = message self.chanIds[chanId] = message
self.event_emitter.emit("subscribed", message) self.event_emitter.emit("subscribed", message)
elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]): elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]):
if message["status"] == "OK": if message["status"] == "OK":
@@ -148,11 +155,25 @@ class _BfxWebsocketBucket(object):
self.handler.handle(self.chanIds[chanId], *message[1:]) self.handler.handle(self.chanIds[chanId], *message[1:])
@_require_websocket_connection @_require_websocket_connection
async def _subscribe(self, channel, **kwargs): async def _subscribe(self, channel, subId=None, **kwargs):
self.count += 1 if len(self.chanIds) + len(self.pendings) == _BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT:
raise TooManySubscriptions("The client has reached the maximum number of subscriptions.")
await self.websocket.send(json.dumps({ subscription = {
"event": "subscribe", "event": "subscribe",
"channel": channel, "channel": channel,
"subId": subId or str(uuid.uuid4()),
**kwargs **kwargs
}
self.pendings.append(subscription)
await self.websocket.send(json.dumps(subscription))
@_require_websocket_connection
async def _unsubscribe(self, chanId):
await self.websocket.send(json.dumps({
"event": "unsubscribe",
"chanId": chanId
})) }))

View File

@@ -1,3 +1,3 @@
from .BfxWebsocketClient import BfxWebsocketClient from .BfxWebsocketClient import BfxWebsocketClient
from .handlers import Channels from .handlers import Channels
from .errors import BfxWebsocketException, ConnectionNotOpen, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion from .errors import BfxWebsocketException, ConnectionNotOpen, TooManySubscriptions, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion

View File

@@ -1,5 +1,6 @@
__all__ = [ __all__ = [
"ConnectionNotOpen", "ConnectionNotOpen",
"TooManySubscriptions",
"WebsocketAuthenticationRequired", "WebsocketAuthenticationRequired",
"InvalidAuthenticationCredentials", "InvalidAuthenticationCredentials",
"EventNotSupported", "EventNotSupported",
@@ -20,6 +21,13 @@ class ConnectionNotOpen(BfxWebsocketException):
pass pass
class TooManySubscriptions(BfxWebsocketException):
"""
This error indicates an attempt to subscribe to a public channel after reaching the limit of simultaneous connections.
"""
pass
class WebsocketAuthenticationRequired(BfxWebsocketException): class WebsocketAuthenticationRequired(BfxWebsocketException):
""" """
This error indicates an attempt to access a protected resource without logging in first. This error indicates an attempt to access a protected resource without logging in first.