From 92f6e691f54fc035d75256974cc8c6e389e9b336 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Mon, 21 Nov 2022 18:41:05 +0100 Subject: [PATCH] Add BfxWebsocketBucket class in bfxapi/websocket/BfxWebsocketClient.py. Implement running multiple websocket client concurrently using asyncio to allow more than 25 connections to public channels. Rewrite BfxWebsocketClient to handle only websocket authenticated channels. --- bfxapi/websocket/BfxWebsocketClient.py | 163 +++++++++++++------------ 1 file changed, 87 insertions(+), 76 deletions(-) diff --git a/bfxapi/websocket/BfxWebsocketClient.py b/bfxapi/websocket/BfxWebsocketClient.py index f2ed76a..5a92e83 100644 --- a/bfxapi/websocket/BfxWebsocketClient.py +++ b/bfxapi/websocket/BfxWebsocketClient.py @@ -17,16 +17,6 @@ def _require_websocket_connection(function): return wrapper -def _require_websocket_authentication(function): - @_require_websocket_connection - async def wrapper(self, *args, **kwargs): - if self.authentication == False: - raise WebsocketAuthenticationRequired("To perform this action you need to authenticate using your API_KEY and API_SECRET.") - - await function(self, *args, **kwargs) - - return wrapper - class BfxWebsocketClient(object): VERSION = 2 @@ -36,73 +26,42 @@ class BfxWebsocketClient(object): *AuthenticatedChannelsHandler.EVENTS ] - def __init__(self, host, API_KEY=None, API_SECRET=None): - self.host, self.chanIds, self.event_emitter = host, dict(), AsyncIOEventEmitter() + def __init__(self, host, buckets=5, API_KEY=None, API_SECRET=None): + self.host, self.websocket, self.event_emitter = host, None, AsyncIOEventEmitter() - self.websocket, self.API_KEY, self.API_SECRET = None, API_KEY, API_SECRET + self.API_KEY, self.API_SECRET, self.authentication = API_KEY, API_SECRET, False - self.authentication = False + self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter) - self.handlers = { - "public": PublicChannelsHandler(event_emitter=self.event_emitter), - "authenticated": AuthenticatedChannelsHandler(event_emitter=self.event_emitter) - } + self.buckets = [ _BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ] - async def connect(self): - async for websocket in websockets.connect(self.host): + async def start(self): + tasks = [ bucket._connect(index) for index, bucket in enumerate(self.buckets) ] + + if self.API_KEY != None and self.API_SECRET != None: + tasks.append(self.__connect(self.API_KEY, self.API_SECRET)) + + await asyncio.gather(*tasks) + + async def __connect(self, API_KEY, API_SECRET, filter=None): + async with websockets.connect(self.host) as websocket: self.websocket = websocket - try: - self.event_emitter.emit("open") + await self.__authenticate(API_KEY, API_SECRET, filter) - if self.API_KEY != None and self.API_SECRET != None: - await self.authenticate(self.API_KEY, self.API_SECRET) + async for message in websocket: + message = json.loads(message) - async for message in websocket: - message = json.loads(message) + if isinstance(message, dict) and message["event"] == "auth": + if message["status"] == "OK": + self.event_emitter.emit("authenticated", message); self.authentication = True + else: raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.") + elif isinstance(message, dict) and message["event"] == "error": + self.event_emitter.emit("wss-error", message["code"], message["msg"]) + elif isinstance(message, list) and (chanId := message[0]) == 0 and message[1] != HEARTBEAT: + self.handler.handle(message[1], message[2]) - if isinstance(message, dict) and message["event"] == "info" and "version" in message: - if BfxWebsocketClient.VERSION != message["version"]: - raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).") - elif isinstance(message, dict) and message["event"] == "subscribed": - self.chanIds[message["chanId"]] = message - - self.event_emitter.emit("subscribed", message) - elif isinstance(message, dict) and message["event"] == "unsubscribed": - if message["status"] == "OK": - del self.chanIds[message["chanId"]] - elif isinstance(message, dict) and message["event"] == "auth": - if message["status"] == "OK": - self.event_emitter.emit("authenticated", message) - - self.authentication = True - else: raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.") - elif isinstance(message, dict) and message["event"] == "error": - self.event_emitter.emit("wss-error", message["code"], message["msg"]) - elif isinstance(message, list) and message[1] != HEARTBEAT: - if ((chanId := message[0]) or True) and chanId == 0: - self.handlers["authenticated"].handle(message[1], message[2]) - else: self.handlers["public"].handle(self.chanIds[chanId], *message[1:]) - except websockets.ConnectionClosed: - continue - - @_require_websocket_connection - async def subscribe(self, channel, **kwargs): - await self.websocket.send(json.dumps({ - "event": "subscribe", - "channel": channel, - **kwargs - })) - - @_require_websocket_connection - async def unsubscribe(self, chanId): - await self.websocket.send(json.dumps({ - "event": "unsubscribe", - "chanId": chanId - })) - - @_require_websocket_connection - async def authenticate(self, API_KEY, API_SECRET, filter=None): + async def __authenticate(self, API_KEY, API_SECRET, filter=None): data = { "event": "auth", "filter": filter, "apiKey": API_KEY } data["authNonce"] = int(time.time()) * 1000 @@ -117,13 +76,26 @@ class BfxWebsocketClient(object): await self.websocket.send(json.dumps(data)) - @_require_websocket_authentication - async def notify(self, MESSAGE_ID, info): - await self.websocket.send(json.dumps([ 0, "n", MESSAGE_ID, { "type": "ucm-test", "info": info } ])) + async def subscribe(self, channel, **kwargs): + counters = [ bucket.count for bucket in self.buckets ] - async def clear(self): - for chanId in self.chanIds.keys(): - await self.unsubscribe(chanId) + index = counters.index(min(counters)) + + await self.buckets[index]._subscribe(channel, **kwargs) + + def __require_websocket_authentication(function): + @_require_websocket_connection + async def wrapper(self, *args, **kwargs): + if self.authentication == False: + raise WebsocketAuthenticationRequired("To perform this action you need to authenticate using your API_KEY and API_SECRET.") + + await function(self, *args, **kwargs) + + return wrapper + + def __bucket_open_signal(self, index): + if all(bucket.websocket != None and bucket.websocket.open == True for bucket in self.buckets): + self.event_emitter.emit("open") def on(self, event): if event not in BfxWebsocketClient.EVENTS: @@ -143,5 +115,44 @@ class BfxWebsocketClient(object): return handler - def run(self): - asyncio.run(self.connect()) \ No newline at end of file +class _BfxWebsocketBucket(object): + def __init__(self, host, event_emitter, __bucket_open_signal): + self.host, self.event_emitter, self.__bucket_open_signal = host, event_emitter, __bucket_open_signal + + self.websocket, self.chanIds, self.count = None, dict(), 0 + + self.handler = PublicChannelsHandler(event_emitter=self.event_emitter) + + async def _connect(self, index): + async with websockets.connect(self.host) as websocket: + self.websocket = websocket + + self.__bucket_open_signal(index) + + async for message in websocket: + message = json.loads(message) + + if isinstance(message, dict) and message["event"] == "info" and "version" in message: + if BfxWebsocketClient.VERSION != message["version"]: + raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).") + elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]): + self.chanIds[chanId] = message + + self.event_emitter.emit("subscribed", message) + elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]): + if message["status"] == "OK": + del self.chanIds[chanId] + elif isinstance(message, dict) and message["event"] == "error": + self.event_emitter.emit("wss-error", message["code"], message["msg"]) + elif isinstance(message, list) and (chanId := message[0]) and message[1] != HEARTBEAT: + self.handler.handle(self.chanIds[chanId], *message[1:]) + + @_require_websocket_connection + async def _subscribe(self, channel, **kwargs): + self.count += 1 + + await self.websocket.send(json.dumps({ + "event": "subscribe", + "channel": channel, + **kwargs + })) \ No newline at end of file