Rename .chanIds field in _BfxWebsocketBucket to .subscriptions.

This commit is contained in:
Davide Casale
2022-11-28 16:50:07 +01:00
parent a5083c36cf
commit 9e8bea905a

View File

@@ -86,7 +86,7 @@ class BfxWebsocketClient(object):
await self.websocket.send(json.dumps(data)) await self.websocket.send(json.dumps(data))
async def subscribe(self, channel, **kwargs): async def subscribe(self, channel, **kwargs):
counters = [ len(bucket.pendings) + len(bucket.chanIds) for bucket in self.buckets ] counters = [ len(bucket.pendings) + len(bucket.subscriptions) for bucket in self.buckets ]
index = counters.index(min(counters)) index = counters.index(min(counters))
@@ -94,7 +94,7 @@ class BfxWebsocketClient(object):
async def unsubscribe(self, chanId): async def unsubscribe(self, chanId):
for bucket in self.buckets: for bucket in self.buckets:
if chanId in bucket.chanIds.keys(): if chanId in bucket.subscriptions.keys():
await bucket._unsubscribe(chanId=chanId) await bucket._unsubscribe(chanId=chanId)
async def close(self, code=1000, reason=str()): async def close(self, code=1000, reason=str()):
@@ -149,7 +149,7 @@ class _BfxWebsocketBucket(object):
def __init__(self, host, event_emitter, __bucket_open_signal): def __init__(self, host, event_emitter, __bucket_open_signal):
self.host, self.event_emitter, self.__bucket_open_signal = host, event_emitter, __bucket_open_signal self.host, self.event_emitter, self.__bucket_open_signal = host, event_emitter, __bucket_open_signal
self.websocket, self.chanIds, self.pendings = None, dict(), list() self.websocket, self.subscriptions, self.pendings = None, dict(), list()
self.handler = PublicChannelsHandler(event_emitter=self.event_emitter) self.handler = PublicChannelsHandler(event_emitter=self.event_emitter)
@@ -168,21 +168,21 @@ class _BfxWebsocketBucket(object):
raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).") raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).")
elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]): elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]):
self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ] self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ]
self.chanIds[chanId] = message self.subscriptions[chanId] = message
self.event_emitter.emit("subscribed", message) self.event_emitter.emit("subscribed", message)
elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]): elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]):
if message["status"] == "OK": if message["status"] == "OK":
del self.chanIds[chanId] del self.subscriptions[chanId]
elif isinstance(message, dict) and message["event"] == "error": elif isinstance(message, dict) and message["event"] == "error":
self.event_emitter.emit("wss-error", message["code"], message["msg"]) self.event_emitter.emit("wss-error", message["code"], message["msg"])
elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT: elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT:
self.handler.handle(self.chanIds[chanId], *message[1:]) self.handler.handle(self.subscriptions[chanId], *message[1:])
except websockets.ConnectionClosedError: continue except websockets.ConnectionClosedError: continue
finally: await self.websocket.wait_closed(); break finally: await self.websocket.wait_closed(); break
@_require_websocket_connection @_require_websocket_connection
async def _subscribe(self, channel, subId=None, **kwargs): async def _subscribe(self, channel, subId=None, **kwargs):
if len(self.chanIds) + len(self.pendings) == _BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT: if len(self.subscriptions) + len(self.pendings) == _BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT:
raise TooManySubscriptions("The client has reached the maximum number of subscriptions.") raise TooManySubscriptions("The client has reached the maximum number of subscriptions.")
subscription = { subscription = {