From 464d942fb0d31413d73f5a1c3fa7e40b65510118 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Tue, 14 Mar 2023 19:16:12 +0100 Subject: [PATCH] Replace BfxWebsocketClient::on_open_events with BfxWebsocketBucket::on_open_event field. --- bfxapi/websocket/client/bfx_websocket_bucket.py | 6 +++--- bfxapi/websocket/client/bfx_websocket_client.py | 17 +++++------------ 2 files changed, 8 insertions(+), 15 deletions(-) diff --git a/bfxapi/websocket/client/bfx_websocket_bucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py index 72c3419..2d5e248 100644 --- a/bfxapi/websocket/client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -1,6 +1,6 @@ from typing import Literal, TypeVar, Callable, cast -import json, uuid, websockets +import asyncio, json, uuid, websockets from ..handlers import PublicChannelsHandler @@ -24,8 +24,8 @@ class BfxWebsocketBucket: MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 - def __init__(self, host, event_emitter, on_open_event): - self.host, self.event_emitter, self.on_open_event = host, event_emitter, on_open_event + def __init__(self, host, event_emitter): + self.host, self.event_emitter, self.on_open_event = host, event_emitter, asyncio.locks.Event() self.websocket, self.subscriptions, self.pendings = None, {}, [] diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index a88d3cd..5071ab6 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -62,12 +62,10 @@ class BfxWebsocketClient: ] def __init__(self, host, credentials = None, log_filename = None, log_level = "INFO"): - self.websocket = None + self.websocket, self.buckets, self.authentication = None, [], False self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter() - self.on_open_events, self.buckets, self.authentication = [], [], False - self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input) self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter) @@ -95,14 +93,9 @@ class BfxWebsocketClient: "block the client with <429 Too Many Requests>.") for _ in range(connections): - self.on_open_events.append(asyncio.Event()) + self.buckets += [BfxWebsocketBucket(self.host, self.event_emitter)] - for index in range(connections): - self.buckets += [BfxWebsocketBucket(self.host, self.event_emitter, self.on_open_events[index])] - - tasks = [ bucket.connect() for bucket in self.buckets ] - - tasks.append(self.__connect()) + tasks = [ bucket.connect() for bucket in self.buckets ] + [ self.__connect() ] await asyncio.gather(*tasks) @@ -125,8 +118,8 @@ class BfxWebsocketClient: self.websocket, self.authentication = websocket, False - if len(self.on_open_events) == 0 or \ - (await asyncio.gather(*[on_open_event.wait() for on_open_event in self.on_open_events])): + if len(self.buckets) == 0 or \ + (await asyncio.gather(*[bucket.on_open_event.wait() for bucket in self.buckets])): self.event_emitter.emit("open") if self.credentials: