mirror of
https://github.com/aljazceru/bitfinex-api-py.git
synced 2025-12-18 22:34:21 +01:00
Replace use of asyncio.locks.Event with asyncio.locks.Condition in bfx_websocket_bucket.py.
This commit is contained in:
@@ -23,48 +23,44 @@ class BfxWebSocketBucket:
|
||||
def __init__(self, host, event_emitter, events_per_subscription):
|
||||
self.host, self.event_emitter, self.events_per_subscription = host, event_emitter, events_per_subscription
|
||||
self.websocket, self.subscriptions, self.pendings = None, {}, []
|
||||
self.on_open_event = asyncio.locks.Event()
|
||||
self.condition = asyncio.locks.Condition()
|
||||
|
||||
self.handler = PublicChannelsHandler(event_emitter=self.event_emitter, \
|
||||
events_per_subscription=self.events_per_subscription)
|
||||
|
||||
async def connect(self):
|
||||
async def _connection():
|
||||
async with websockets.connect(self.host) as websocket:
|
||||
self.websocket = websocket
|
||||
self.on_open_event.set()
|
||||
await self.__recover_state()
|
||||
async with websockets.connect(self.host) as websocket:
|
||||
self.websocket = websocket
|
||||
|
||||
async for message in websocket:
|
||||
message = json.loads(message)
|
||||
await self.__recover_state()
|
||||
|
||||
if isinstance(message, dict):
|
||||
if message["event"] == "subscribed" and (chan_id := message["chanId"]):
|
||||
self.pendings = [ pending \
|
||||
for pending in self.pendings if pending["subId"] != message["subId"] ]
|
||||
async with self.condition:
|
||||
self.condition.notify()
|
||||
|
||||
self.subscriptions[chan_id] = message
|
||||
async for message in websocket:
|
||||
message = json.loads(message)
|
||||
|
||||
sub_id = message["subId"]
|
||||
if isinstance(message, dict):
|
||||
if message["event"] == "subscribed" and (chan_id := message["chanId"]):
|
||||
self.pendings = [ pending \
|
||||
for pending in self.pendings if pending["subId"] != message["subId"] ]
|
||||
|
||||
if "subscribed" not in self.events_per_subscription.get(sub_id, []):
|
||||
self.events_per_subscription.setdefault(sub_id, []).append("subscribed")
|
||||
self.event_emitter.emit("subscribed", message)
|
||||
elif message["event"] == "unsubscribed" and (chan_id := message["chanId"]):
|
||||
if message["status"] == "OK":
|
||||
del self.subscriptions[chan_id]
|
||||
elif message["event"] == "error":
|
||||
self.event_emitter.emit("wss-error", message["code"], message["msg"])
|
||||
self.subscriptions[chan_id] = message
|
||||
|
||||
if isinstance(message, list):
|
||||
if (chan_id := message[0]) and message[1] != _HEARTBEAT:
|
||||
self.handler.handle(self.subscriptions[chan_id], message[1:])
|
||||
sub_id = message["subId"]
|
||||
|
||||
try:
|
||||
await _connection()
|
||||
except websockets.exceptions.ConnectionClosedError as error:
|
||||
if error.code in (1006, 1012):
|
||||
self.on_open_event.clear()
|
||||
if "subscribed" not in self.events_per_subscription.get(sub_id, []):
|
||||
self.events_per_subscription.setdefault(sub_id, []).append("subscribed")
|
||||
self.event_emitter.emit("subscribed", message)
|
||||
elif message["event"] == "unsubscribed" and (chan_id := message["chanId"]):
|
||||
if message["status"] == "OK":
|
||||
del self.subscriptions[chan_id]
|
||||
elif message["event"] == "error":
|
||||
self.event_emitter.emit("wss-error", message["code"], message["msg"])
|
||||
|
||||
if isinstance(message, list):
|
||||
if (chan_id := message[0]) and message[1] != _HEARTBEAT:
|
||||
self.handler.handle(self.subscriptions[chan_id], message[1:])
|
||||
|
||||
async def __recover_state(self):
|
||||
for pending in self.pendings:
|
||||
@@ -107,3 +103,9 @@ class BfxWebSocketBucket:
|
||||
for subscription in self.subscriptions.values():
|
||||
if subscription["subId"] == sub_id:
|
||||
return subscription["chanId"]
|
||||
|
||||
async def wait(self):
|
||||
async with self.condition:
|
||||
await self.condition.wait_for(
|
||||
lambda: self.websocket is not None and \
|
||||
self.websocket.open)
|
||||
|
||||
@@ -138,7 +138,7 @@ class BfxWebSocketClient:
|
||||
tasks = [ asyncio.create_task(coroutine) for coroutine in coroutines ]
|
||||
|
||||
if len(self.buckets) == 0 or \
|
||||
(await asyncio.gather(*[bucket.on_open_event.wait() for bucket in self.buckets])):
|
||||
(await asyncio.gather(*[bucket.wait() for bucket in self.buckets])):
|
||||
self.event_emitter.emit("open")
|
||||
|
||||
if self.credentials:
|
||||
@@ -184,34 +184,34 @@ class BfxWebSocketClient:
|
||||
try:
|
||||
await _connection()
|
||||
except (websockets.exceptions.ConnectionClosedError, socket.gaierror) as error:
|
||||
if isinstance(error, websockets.exceptions.ConnectionClosedError):
|
||||
if error.code in (1006, 1012):
|
||||
if error.code == 1006:
|
||||
self.logger.error("Connection lost: no close frame received " \
|
||||
"or sent (1006). Trying to reconnect...")
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
|
||||
if error.code == 1012:
|
||||
self.logger.info("WSS server is about to restart, clients need " \
|
||||
"to reconnect (server sent 20051). Reconnection attempt in progress...")
|
||||
if isinstance(error, websockets.exceptions.ConnectionClosedError) and error.code in (1006, 1012):
|
||||
if error.code == 1006:
|
||||
self.logger.error("Connection lost: no close frame received " \
|
||||
"or sent (1006). Trying to reconnect...")
|
||||
|
||||
for task in tasks:
|
||||
task.cancel()
|
||||
if error.code == 1012:
|
||||
self.logger.info("WSS server is about to restart, clients need " \
|
||||
"to reconnect (server sent 20051). Reconnection attempt in progress...")
|
||||
|
||||
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now())
|
||||
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now())
|
||||
|
||||
if self.wss_timeout is not None:
|
||||
timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_wss_timeout)
|
||||
if self.wss_timeout is not None:
|
||||
timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_wss_timeout)
|
||||
|
||||
delay = _Delay(backoff_factor=1.618)
|
||||
delay = _Delay(backoff_factor=1.618)
|
||||
|
||||
self.authentication = False
|
||||
self.authentication = False
|
||||
elif isinstance(error, socket.gaierror) and reconnection.status:
|
||||
self.logger.warning(f"Reconnection attempt was unsuccessful (no.{reconnection.attempts}). " \
|
||||
f"Next reconnection attempt in {delay.peek():.2f} seconds. (at the moment " \
|
||||
f"the client has been offline for {datetime.now() - reconnection.timestamp})")
|
||||
|
||||
reconnection = reconnection._replace(attempts=reconnection.attempts + 1)
|
||||
else: raise error
|
||||
else:
|
||||
raise error
|
||||
|
||||
if not reconnection.status:
|
||||
self.event_emitter.emit("disconnection",
|
||||
|
||||
Reference in New Issue
Block a user