Replace BfxWebsocketClient::on_open_events with BfxWebsocketBucket::on_open_event field.

This commit is contained in:
Davide Casale
2023-03-14 19:16:12 +01:00
parent 249f19fe41
commit 464d942fb0
2 changed files with 8 additions and 15 deletions

View File

@@ -1,6 +1,6 @@
from typing import Literal, TypeVar, Callable, cast from typing import Literal, TypeVar, Callable, cast
import json, uuid, websockets import asyncio, json, uuid, websockets
from ..handlers import PublicChannelsHandler from ..handlers import PublicChannelsHandler
@@ -24,8 +24,8 @@ class BfxWebsocketBucket:
MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25
def __init__(self, host, event_emitter, on_open_event): def __init__(self, host, event_emitter):
self.host, self.event_emitter, self.on_open_event = host, event_emitter, on_open_event self.host, self.event_emitter, self.on_open_event = host, event_emitter, asyncio.locks.Event()
self.websocket, self.subscriptions, self.pendings = None, {}, [] self.websocket, self.subscriptions, self.pendings = None, {}, []

View File

@@ -62,12 +62,10 @@ class BfxWebsocketClient:
] ]
def __init__(self, host, credentials = None, log_filename = None, log_level = "INFO"): def __init__(self, host, credentials = None, log_filename = None, log_level = "INFO"):
self.websocket = None self.websocket, self.buckets, self.authentication = None, [], False
self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter() self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter()
self.on_open_events, self.buckets, self.authentication = [], [], False
self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input) self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input)
self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter) self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter)
@@ -95,14 +93,9 @@ class BfxWebsocketClient:
"block the client with <429 Too Many Requests>.") "block the client with <429 Too Many Requests>.")
for _ in range(connections): for _ in range(connections):
self.on_open_events.append(asyncio.Event()) self.buckets += [BfxWebsocketBucket(self.host, self.event_emitter)]
for index in range(connections): tasks = [ bucket.connect() for bucket in self.buckets ] + [ self.__connect() ]
self.buckets += [BfxWebsocketBucket(self.host, self.event_emitter, self.on_open_events[index])]
tasks = [ bucket.connect() for bucket in self.buckets ]
tasks.append(self.__connect())
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
@@ -125,8 +118,8 @@ class BfxWebsocketClient:
self.websocket, self.authentication = websocket, False self.websocket, self.authentication = websocket, False
if len(self.on_open_events) == 0 or \ if len(self.buckets) == 0 or \
(await asyncio.gather(*[on_open_event.wait() for on_open_event in self.on_open_events])): (await asyncio.gather(*[bucket.on_open_event.wait() for bucket in self.buckets])):
self.event_emitter.emit("open") self.event_emitter.emit("open")
if self.credentials: if self.credentials: