Fix bugs and rewrite regions of new reconnection system.

This commit is contained in:
Davide Casale
2023-02-16 20:08:05 +01:00
parent fa9bdfc333
commit b8a5bcb515
3 changed files with 83 additions and 33 deletions

View File

@@ -1,7 +1,7 @@
from .rest import BfxRestInterface
from .websocket import BfxWebsocketClient
from typing import Optional
from typing import List, Optional
from enum import Enum
@@ -21,8 +21,15 @@ class Client(object):
WSS_HOST: str = Constants.WSS_HOST,
API_KEY: Optional[str] = None,
API_SECRET: Optional[str] = None,
filter: Optional[List[str]] = None,
log_level: str = "WARNING"
):
credentials = {
"API_KEY": API_KEY,
"API_SECRET": API_SECRET,
"filter": filter
}
self.rest = BfxRestInterface(
host=REST_HOST,
API_KEY=API_KEY,
@@ -31,7 +38,6 @@ class Client(object):
self.wss = BfxWebsocketClient(
host=WSS_HOST,
API_KEY=API_KEY,
API_SECRET=API_SECRET,
credentials=credentials,
log_level=log_level
)

View File

@@ -63,9 +63,12 @@ class BfxWebsocketBucket(object):
self.event_emitter.emit("wss-error", message["code"], message["msg"])
elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT:
self.handler.handle(self.subscriptions[chanId], *message[1:])
except websockets.ConnectionClosedError: self.on_open_event.clear(); reconnection = True; continue
except websockets.ConnectionClosedError:
self.on_open_event.clear()
reconnection = True
continue
await self.websocket.wait_closed(); break
break
@_require_websocket_connection
async def _subscribe(self, channel, subId=None, **kwargs):

View File

@@ -1,4 +1,4 @@
import traceback, json, asyncio, hmac, hashlib, time, websockets
import traceback, json, asyncio, hmac, hashlib, time, websockets, socket, random
from typing import cast
@@ -38,10 +38,10 @@ class BfxWebsocketClient(object):
*AuthenticatedChannelsHandler.EVENTS
]
def __init__(self, host, API_KEY = None, API_SECRET = None, filter = None, log_level = "WARNING"):
self.host, self.websocket, self.event_emitter = host, None, AsyncIOEventEmitter()
def __init__(self, host, credentials = None, log_level = "WARNING"):
self.websocket = None
self.API_KEY, self.API_SECRET, self.filter = API_KEY, API_SECRET, filter
self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter()
self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input)
@@ -71,33 +71,35 @@ class BfxWebsocketClient(object):
tasks = [ bucket._connect(index) for index, bucket in enumerate(self.buckets) ]
tasks.append(self.__connect(self.API_KEY, self.API_SECRET, self.filter))
tasks.append(self.__connect(self.credentials))
await asyncio.gather(*tasks)
async def __connect(self, API_KEY, API_SECRET, filter=None):
Reconnection = namedtuple("Reconnection", ["status", "code", "timestamp"])
async def __connect(self, credentials = None):
Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"])
reconnection = Reconnection(status=False, code=0, timestamp=None)
reconnection = Reconnection(status=False, attempts=0, timestamp=None)
async for websocket in websockets.connect(self.host):
self.websocket, self.authentication = websocket, False
async def _connection():
nonlocal reconnection
if (await asyncio.gather(*[ on_open_event.wait() for on_open_event in self.on_open_events ])):
self.event_emitter.emit("open")
async with websockets.connect(self.host) as websocket:
if reconnection.status == True:
self.logger.info(f"Reconnect attempt successful (attempt N°{reconnection.attempts}): The " +
f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " +
f"(first reconnection attempt: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).")
if self.API_KEY != None and self.API_SECRET != None:
await self.__authenticate(API_KEY=API_KEY, API_SECRET=API_SECRET, filter=filter)
reconnection = Reconnection(status=False, attempts=0, timestamp=None)
self.websocket, self.authentication = websocket, False
if await asyncio.gather(*[on_open_event.wait() for on_open_event in self.on_open_events]):
self.event_emitter.emit("open")
if self.credentials != None:
await self.__authenticate(**self.credentials)
try:
async for message in websocket:
if reconnection.status == True:
self.logger.warning(f"Reconnect Attempt Successful (error <{reconnection.code}>): The " +
f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " +
f"(first reconnection attempt: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).")
reconnection = Reconnection(status=False, code=0, timestamp=None)
message = json.loads(message)
if isinstance(message, dict) and message["event"] == "info" and "version" in message:
@@ -113,13 +115,52 @@ class BfxWebsocketClient(object):
self.event_emitter.emit("wss-error", message["code"], message["msg"])
elif isinstance(message, list) and (chanId := message[0]) == 0 and message[1] != _HEARTBEAT:
self.handler.handle(message[1], message[2])
except websockets.ConnectionClosedError as error:
self.logger.error(f"Connection terminated due to an error (status code: <{error.code}>) -> {str(error)}. Attempting to reconnect...")
reconnection = Reconnection(status=True, code=error.code, timestamp=datetime.now());
continue
class _Delay:
BACKOFF_MIN, BACKOFF_MAX = 1.92, 60.0
BACKOFF_INITIAL = 5.0
def __init__(self, backoff_factor):
self.__backoff_factor = backoff_factor
self.__backoff_delay = _Delay.BACKOFF_MIN
self.__initial_delay = random.random() * _Delay.BACKOFF_INITIAL
def next(self):
backoff_delay = self.peek()
__backoff_delay = self.__backoff_delay * self.__backoff_factor
self.__backoff_delay = min(__backoff_delay, _Delay.BACKOFF_MAX)
return backoff_delay
def peek(self):
return (self.__backoff_delay == _Delay.BACKOFF_MIN) \
and self.__initial_delay or self.__backoff_delay
delay = _Delay(backoff_factor=1.618)
while True:
if reconnection.status == True:
await asyncio.sleep(delay.next())
try:
await _connection()
except (websockets.ConnectionClosedError, socket.gaierror) as error:
if isinstance(error, websockets.ConnectionClosedError) and error.code == 1006:
self.logger.error("Connection lost: no close frame received "
+ "or sent (1006). Attempting to reconnect...")
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now());
elif isinstance(error, socket.gaierror) and reconnection.status == True:
self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. "
+ f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds."
+ f"(at the moment the client has been offline for {datetime.now() - reconnection.timestamp})")
reconnection = reconnection._replace(attempts=reconnection.attempts + 1)
else: raise error
if reconnection.status == False:
await self.websocket.wait_closed(); break
break
async def __authenticate(self, API_KEY, API_SECRET, filter=None):
data = { "event": "auth", "filter": filter, "apiKey": API_KEY }