Fix and rewrite some logic in class BfxWebSocketClient.

This commit is contained in:
Davide Casale
2023-10-09 16:25:46 +02:00
parent de0ee54900
commit 25881e77c8
5 changed files with 62 additions and 86 deletions

View File

@@ -24,7 +24,7 @@ max-line-length=120
expected-line-ending-format=LF expected-line-ending-format=LF
[BASIC] [BASIC]
good-names=t,f,id,ip,on,pl,tf,A,B,C,D,E,F good-names=t,f,id,ip,on,pl,tf,to,A,B,C,D,E,F
[TYPECHECK] [TYPECHECK]
generated-members=websockets generated-members=websockets

View File

@@ -3,7 +3,7 @@ from typing import \
from bfxapi._utils.logging import ColorLogger from bfxapi._utils.logging import ColorLogger
from bfxapi.exceptions import IncompleteCredentialError from bfxapi._exceptions import IncompleteCredentialError
from bfxapi.rest import BfxRestInterface from bfxapi.rest import BfxRestInterface
from bfxapi.websocket import BfxWebSocketClient from bfxapi.websocket import BfxWebSocketClient
@@ -22,7 +22,7 @@ class Client:
rest_host: str = REST_HOST, rest_host: str = REST_HOST,
wss_host: str = WSS_HOST, wss_host: str = WSS_HOST,
filters: Optional[List[str]] = None, filters: Optional[List[str]] = None,
timeout: Optional[float] = 60 * 15, timeout: Optional[int] = 60 * 15,
log_filename: Optional[str] = None, log_filename: Optional[str] = None,
log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO" log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO"
) -> None: ) -> None:

View File

@@ -5,7 +5,7 @@ from http import HTTPStatus
import time, hmac, hashlib, json, requests import time, hmac, hashlib, json, requests
from ..enums import Error from ..enums import Error
from ..exceptions import ResourceNotFound, RequestParametersError, InvalidAuthenticationCredentials, UnknownGenericError from ..exceptions import ResourceNotFound, RequestParametersError, InvalidCredentialError, UnknownGenericError
from ..._utils.json_encoder import JSONEncoder from ..._utils.json_encoder import JSONEncoder
from ..._utils.json_decoder import JSONDecoder from ..._utils.json_decoder import JSONDecoder
@@ -91,7 +91,7 @@ class Middleware:
f"following parameter error: <{data[2]}>") f"following parameter error: <{data[2]}>")
if data[1] == Error.ERR_AUTH_FAIL: if data[1] == Error.ERR_AUTH_FAIL:
raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.") raise InvalidCredentialError("Cannot authenticate with given API-KEY and API-SECRET.")
if data[1] is None or data[1] == Error.ERR_UNK or data[1] == Error.ERR_GENERIC: if data[1] is None or data[1] == Error.ERR_UNK or data[1] == Error.ERR_GENERIC:
raise UnknownGenericError("The server replied to the request with " \ raise UnknownGenericError("The server replied to the request with " \

View File

@@ -13,9 +13,6 @@ from bfxapi.websocket._handlers import PublicChannelsHandler
from bfxapi.websocket.subscriptions import Subscription from bfxapi.websocket.subscriptions import Subscription
from bfxapi.websocket.exceptions import FullBucketError
_CHECKSUM_FLAG_VALUE = 131_072 _CHECKSUM_FLAG_VALUE = 131_072
def _strip(message: Dict[str, Any], keys: List[str]) -> Dict[str, Any]: def _strip(message: Dict[str, Any], keys: List[str]) -> Dict[str, Any]:
@@ -111,10 +108,6 @@ class BfxWebSocketBucket(Connection):
channel: str, channel: str,
sub_id: Optional[str] = None, sub_id: Optional[str] = None,
**kwargs: Any) -> None: **kwargs: Any) -> None:
if self.is_full:
raise FullBucketError("The bucket is full: " + \
"can't subscribe to any other channel.")
subscription: Dict[str, Any] = \ subscription: Dict[str, Any] = \
{ **kwargs, "event": "subscribe", "channel": channel } { **kwargs, "event": "subscribe", "channel": channel }

View File

@@ -1,24 +1,24 @@
from typing import \ from typing import \
TYPE_CHECKING, TypedDict, List, \ TypedDict, List, Dict, \
Dict, Optional, Any, \ Optional, Any, no_type_check
no_type_check
from logging import Logger from logging import Logger
from datetime import datetime from datetime import datetime
from socket import gaierror from socket import gaierror
from asyncio import Task
import \ import \
traceback, json, asyncio, \ traceback, json, asyncio, \
hmac, hashlib, random, \ hmac, hashlib, random, \
websockets websockets
import websockets.client
from websockets.exceptions import \ from websockets.exceptions import \
ConnectionClosedError, \ ConnectionClosedError, \
InvalidStatusCode InvalidStatusCode
from websockets.legacy.client import \
connect as _websockets__connect
from bfxapi._utils.json_encoder import JSONEncoder from bfxapi._utils.json_encoder import JSONEncoder
from bfxapi.websocket._connection import Connection from bfxapi.websocket._connection import Connection
@@ -26,24 +26,22 @@ from bfxapi.websocket._handlers import AuthEventsHandler
from bfxapi.websocket._event_emitter import BfxEventEmitter from bfxapi.websocket._event_emitter import BfxEventEmitter
from bfxapi.websocket.exceptions import \ from bfxapi.websocket.exceptions import \
InvalidAuthenticationCredentials, \ InvalidCredentialError, \
ReconnectionTimeoutError, \ ReconnectionTimeoutError, \
OutdatedClientVersion, \ VersionMismatchError, \
ZeroConnectionsError, \ ZeroConnectionsError, \
UnknownChannelError UnknownChannelError, \
UnknownSubscriptionError
from bfxapi.websocket._client.bfx_websocket_bucket import BfxWebSocketBucket from .bfx_websocket_bucket import BfxWebSocketBucket
from bfxapi.websocket._client.bfx_websocket_inputs import BfxWebSocketInputs from .bfx_websocket_inputs import BfxWebSocketInputs
if TYPE_CHECKING: _Credentials = TypedDict("_Credentials", \
from asyncio import Task { "api_key": str, "api_secret": str, "filters": Optional[List[str]] })
_Credentials = TypedDict("_Credentials", \ _Reconnection = TypedDict("_Reconnection",
{ "api_key": str, "api_secret": str, "filters": Optional[List[str]] }) { "attempts": int, "reason": str, "timestamp": datetime })
_Reconnection = TypedDict("_Reconnection",
{ "attempts": int, "reason": str, "timestamp": datetime })
_DEFAULT_LOGGER = Logger("bfxapi.websocket._client", level=0) _DEFAULT_LOGGER = Logger("bfxapi.websocket._client", level=0)
@@ -72,22 +70,18 @@ class _Delay:
self.__backoff_delay = _Delay.__BACKOFF_MIN self.__backoff_delay = _Delay.__BACKOFF_MIN
class BfxWebSocketClient(Connection): class BfxWebSocketClient(Connection):
VERSION = 2
MAXIMUM_CONNECTIONS_AMOUNT = 20
def __init__(self, def __init__(self,
host: str, host: str,
*, *,
credentials: Optional["_Credentials"] = None, credentials: Optional[_Credentials] = None,
timeout: Optional[float] = 60 * 15, timeout: Optional[int] = 60 * 15,
logger: Logger = _DEFAULT_LOGGER) -> None: logger: Logger = _DEFAULT_LOGGER) -> None:
super().__init__(host) super().__init__(host)
self.__credentials, self.__timeout, self.__logger = \ self.__credentials, self.__timeout, self.__logger = \
credentials, timeout, logger credentials, timeout, logger
self.__buckets: Dict[BfxWebSocketBucket, Optional["Task"]] = { } self.__buckets: Dict[BfxWebSocketBucket, Optional[Task]] = { }
self.__reconnection: Optional[_Reconnection] = None self.__reconnection: Optional[_Reconnection] = None
@@ -113,32 +107,14 @@ class BfxWebSocketClient(Connection):
def inputs(self) -> BfxWebSocketInputs: def inputs(self) -> BfxWebSocketInputs:
return self.__inputs return self.__inputs
def run(self, connections: int = 5) -> None: def run(self) -> None:
return asyncio.run(self.start(connections)) return asyncio.run(self.start())
async def start(self, connections: int = 5) -> None:
if connections == 0:
self.__logger.info("With connections set to 0 it will not be possible to subscribe to any " \
"public channel. Attempting a subscription will cause a ZeroConnectionsError to be thrown.")
if connections > BfxWebSocketClient.MAXIMUM_CONNECTIONS_AMOUNT:
self.__logger.warning(f"It is not safe to use more than {BfxWebSocketClient.MAXIMUM_CONNECTIONS_AMOUNT} " \
f"buckets from the same connection ({connections} in use), the server could momentarily " \
"block the client with <429 Too Many Requests>.")
for _ in range(connections):
_bucket = BfxWebSocketBucket( \
self._host, self.__event_emitter)
self.__buckets.update({ _bucket: None })
await self.__connect()
#pylint: disable-next=too-many-branches #pylint: disable-next=too-many-branches
async def __connect(self) -> None: async def start(self) -> None:
_delay = _Delay(backoff_factor=1.618) _delay = _Delay(backoff_factor=1.618)
_sleep: Optional["Task"] = None _sleep: Optional[Task] = None
def _on_timeout(): def _on_timeout():
if not self.open: if not self.open:
@@ -158,19 +134,20 @@ class BfxWebSocketClient(Connection):
from None from None
try: try:
await self.__connection() await self.__connect()
except (ConnectionClosedError, InvalidStatusCode, gaierror) as error: except (ConnectionClosedError, InvalidStatusCode, gaierror) as error:
async def _cancel(task: "Task") -> None: async def _cancel(task: Task) -> None:
task.cancel() task.cancel()
try: try:
await task await task
except (ConnectionClosedError, gaierror) as _e: except (ConnectionClosedError, InvalidStatusCode, gaierror) as _e:
if type(error) is not type(_e) or error.args != _e.args: if type(error) is not type(_e) or error.args != _e.args:
raise _e raise _e
except asyncio.CancelledError: except asyncio.CancelledError:
pass pass
# pylint: disable-next=consider-using-dict-items
for bucket in self.__buckets: for bucket in self.__buckets:
if task := self.__buckets[bucket]: if task := self.__buckets[bucket]:
self.__buckets[bucket] = None self.__buckets[bucket] = None
@@ -186,7 +163,7 @@ class BfxWebSocketClient(Connection):
self.__logger.info("WSS server is about to restart, clients need " \ self.__logger.info("WSS server is about to restart, clients need " \
"to reconnect (server sent 20051). Reconnection attempt in progress...") "to reconnect (server sent 20051). Reconnection attempt in progress...")
if self.__timeout is not None: if self.__timeout:
asyncio.get_event_loop().call_later( asyncio.get_event_loop().call_later(
self.__timeout, _on_timeout) self.__timeout, _on_timeout)
@@ -214,8 +191,8 @@ class BfxWebSocketClient(Connection):
break break
async def __connection(self) -> None: async def __connect(self) -> None:
async with _websockets__connect(self._host) as websocket: async with websockets.client.connect(self._host) as websocket:
if self.__reconnection: if self.__reconnection:
self.__logger.info(f"_Reconnection attempt successful (no.{self.__reconnection['attempts']}): The " \ self.__logger.info(f"_Reconnection attempt successful (no.{self.__reconnection['attempts']}): The " \
f"client has been offline for a total of {datetime.now() - self.__reconnection['timestamp']} " \ f"client has been offline for a total of {datetime.now() - self.__reconnection['timestamp']} " \
@@ -225,11 +202,9 @@ class BfxWebSocketClient(Connection):
self._websocket = websocket self._websocket = websocket
self.__buckets = { for bucket in self.__buckets:
bucket: asyncio.create_task(_c) self.__buckets[bucket] = \
for bucket in self.__buckets asyncio.create_task(bucket.start())
if (_c := bucket.start())
}
if len(self.__buckets) == 0 or \ if len(self.__buckets) == 0 or \
(await asyncio.gather(*[bucket.wait() for bucket in self.__buckets])): (await asyncio.gather(*[bucket.wait() for bucket in self.__buckets])):
@@ -241,29 +216,31 @@ class BfxWebSocketClient(Connection):
await self._websocket.send(authentication) await self._websocket.send(authentication)
async for message in self._websocket: async for _message in self._websocket:
message = json.loads(message) message = json.loads(_message)
if isinstance(message, dict): if isinstance(message, dict):
if message["event"] == "info" and "version" in message: if message["event"] == "info" and "version" in message:
if BfxWebSocketClient.VERSION != message["version"]: if message["version"] != 2:
raise OutdatedClientVersion("Mismatch between the client version and the server " \ raise VersionMismatchError("Mismatch between the client and the server version: " + \
"version. Update the library to the latest version to continue (client version: " \ "please update bitfinex-api-py to the latest version to resolve this error " + \
f"{BfxWebSocketClient.VERSION}, server version: {message['version']}).") f"(client version: 2, server version: {message['version']}).")
elif message["event"] == "info" and message["code"] == 20051: elif message["event"] == "info" and message["code"] == 20051:
code, reason = 1012, "Stop/Restart WebSocket Server (please reconnect)." rcvd = websockets.frames.Close( \
rcvd = websockets.frames.Close(code=code, reason=reason) 1012, "Stop/Restart WebSocket Server (please reconnect).")
raise ConnectionClosedError(rcvd=rcvd, sent=None) raise ConnectionClosedError(rcvd=rcvd, sent=None)
elif message["event"] == "auth": elif message["event"] == "auth":
if message["status"] != "OK": if message["status"] != "OK":
raise InvalidAuthenticationCredentials( raise InvalidCredentialError("Cannot authenticate " + \
"Cannot authenticate with given API-KEY and API-SECRET.") "with given API-KEY and API-SECRET.")
self.__event_emitter.emit("authenticated", message) self.__event_emitter.emit("authenticated", message)
self._authentication = True self._authentication = True
elif message["event"] == "error": elif message["event"] == "error":
self.__event_emitter.emit("wss-error", message["code"], message["msg"]) self.__event_emitter.emit("wss-error", \
message["code"], message["msg"])
if isinstance(message, list) and \ if isinstance(message, list) and \
message[0] == 0 and message[1] != Connection._HEARTBEAT: message[0] == 0 and message[1] != Connection._HEARTBEAT:
@@ -294,14 +271,20 @@ class BfxWebSocketClient(Connection):
@Connection.require_websocket_connection @Connection.require_websocket_connection
async def unsubscribe(self, sub_id: str) -> None: async def unsubscribe(self, sub_id: str) -> None:
for bucket in self.__buckets: for bucket in self.__buckets:
if bucket.has(sub_id=sub_id): if bucket.has(sub_id):
await bucket.unsubscribe(sub_id=sub_id) return await bucket.unsubscribe(sub_id)
raise UnknownSubscriptionError("Unable to find " + \
f"a subscription with sub_id <{sub_id}>.")
@Connection.require_websocket_connection @Connection.require_websocket_connection
async def resubscribe(self, sub_id: str) -> None: async def resubscribe(self, sub_id: str) -> None:
for bucket in self.__buckets: for bucket in self.__buckets:
if bucket.has(sub_id=sub_id): if bucket.has(sub_id):
await bucket.resubscribe(sub_id=sub_id) return await bucket.resubscribe(sub_id)
raise UnknownSubscriptionError("Unable to find " + \
f"a subscription with sub_id <{sub_id}>.")
@Connection.require_websocket_connection @Connection.require_websocket_connection
async def close(self, code: int = 1000, reason: str = str()) -> None: async def close(self, code: int = 1000, reason: str = str()) -> None:
@@ -323,7 +306,7 @@ class BfxWebSocketClient(Connection):
@Connection.require_websocket_authentication @Connection.require_websocket_authentication
async def __handle_websocket_input(self, event: str, data: Any) -> None: async def __handle_websocket_input(self, event: str, data: Any) -> None:
await self._websocket.send(json.dumps(\ await self._websocket.send(json.dumps( \
[ 0, event, None, data], cls=JSONEncoder)) [ 0, event, None, data], cls=JSONEncoder))
@no_type_check @no_type_check