diff --git a/.pylintrc b/.pylintrc index 1f1a19b..3d6d4a5 100644 --- a/.pylintrc +++ b/.pylintrc @@ -24,7 +24,7 @@ max-line-length=120 expected-line-ending-format=LF [BASIC] -good-names=t,f,id,ip,on,pl,tf,A,B,C,D,E,F +good-names=t,f,id,ip,on,pl,tf,to,A,B,C,D,E,F [TYPECHECK] generated-members=websockets diff --git a/bfxapi/client.py b/bfxapi/client.py index 3d7f3e6..21fdafe 100644 --- a/bfxapi/client.py +++ b/bfxapi/client.py @@ -3,7 +3,7 @@ from typing import \ from bfxapi._utils.logging import ColorLogger -from bfxapi.exceptions import IncompleteCredentialError +from bfxapi._exceptions import IncompleteCredentialError from bfxapi.rest import BfxRestInterface from bfxapi.websocket import BfxWebSocketClient @@ -22,7 +22,7 @@ class Client: rest_host: str = REST_HOST, wss_host: str = WSS_HOST, filters: Optional[List[str]] = None, - timeout: Optional[float] = 60 * 15, + timeout: Optional[int] = 60 * 15, log_filename: Optional[str] = None, log_level: Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"] = "INFO" ) -> None: diff --git a/bfxapi/rest/middleware/middleware.py b/bfxapi/rest/middleware/middleware.py index 323990f..cf434e5 100644 --- a/bfxapi/rest/middleware/middleware.py +++ b/bfxapi/rest/middleware/middleware.py @@ -5,7 +5,7 @@ from http import HTTPStatus import time, hmac, hashlib, json, requests from ..enums import Error -from ..exceptions import ResourceNotFound, RequestParametersError, InvalidAuthenticationCredentials, UnknownGenericError +from ..exceptions import ResourceNotFound, RequestParametersError, InvalidCredentialError, UnknownGenericError from ..._utils.json_encoder import JSONEncoder from ..._utils.json_decoder import JSONDecoder @@ -91,7 +91,7 @@ class Middleware: f"following parameter error: <{data[2]}>") if data[1] == Error.ERR_AUTH_FAIL: - raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.") + raise InvalidCredentialError("Cannot authenticate with given API-KEY and API-SECRET.") if data[1] is None or data[1] == Error.ERR_UNK or data[1] == Error.ERR_GENERIC: raise UnknownGenericError("The server replied to the request with " \ diff --git a/bfxapi/websocket/_client/bfx_websocket_bucket.py b/bfxapi/websocket/_client/bfx_websocket_bucket.py index 8d2aed7..7e2ada1 100644 --- a/bfxapi/websocket/_client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/_client/bfx_websocket_bucket.py @@ -13,9 +13,6 @@ from bfxapi.websocket._handlers import PublicChannelsHandler from bfxapi.websocket.subscriptions import Subscription -from bfxapi.websocket.exceptions import FullBucketError - - _CHECKSUM_FLAG_VALUE = 131_072 def _strip(message: Dict[str, Any], keys: List[str]) -> Dict[str, Any]: @@ -111,10 +108,6 @@ class BfxWebSocketBucket(Connection): channel: str, sub_id: Optional[str] = None, **kwargs: Any) -> None: - if self.is_full: - raise FullBucketError("The bucket is full: " + \ - "can't subscribe to any other channel.") - subscription: Dict[str, Any] = \ { **kwargs, "event": "subscribe", "channel": channel } diff --git a/bfxapi/websocket/_client/bfx_websocket_client.py b/bfxapi/websocket/_client/bfx_websocket_client.py index 1518e35..48d525e 100644 --- a/bfxapi/websocket/_client/bfx_websocket_client.py +++ b/bfxapi/websocket/_client/bfx_websocket_client.py @@ -1,24 +1,24 @@ from typing import \ - TYPE_CHECKING, TypedDict, List, \ - Dict, Optional, Any, \ - no_type_check + TypedDict, List, Dict, \ + Optional, Any, no_type_check from logging import Logger + from datetime import datetime from socket import gaierror +from asyncio import Task import \ traceback, json, asyncio, \ hmac, hashlib, random, \ websockets +import websockets.client + from websockets.exceptions import \ ConnectionClosedError, \ InvalidStatusCode -from websockets.legacy.client import \ - connect as _websockets__connect - from bfxapi._utils.json_encoder import JSONEncoder from bfxapi.websocket._connection import Connection @@ -26,24 +26,22 @@ from bfxapi.websocket._handlers import AuthEventsHandler from bfxapi.websocket._event_emitter import BfxEventEmitter from bfxapi.websocket.exceptions import \ - InvalidAuthenticationCredentials, \ + InvalidCredentialError, \ ReconnectionTimeoutError, \ - OutdatedClientVersion, \ + VersionMismatchError, \ ZeroConnectionsError, \ - UnknownChannelError + UnknownChannelError, \ + UnknownSubscriptionError -from bfxapi.websocket._client.bfx_websocket_bucket import BfxWebSocketBucket +from .bfx_websocket_bucket import BfxWebSocketBucket -from bfxapi.websocket._client.bfx_websocket_inputs import BfxWebSocketInputs +from .bfx_websocket_inputs import BfxWebSocketInputs -if TYPE_CHECKING: - from asyncio import Task +_Credentials = TypedDict("_Credentials", \ + { "api_key": str, "api_secret": str, "filters": Optional[List[str]] }) - _Credentials = TypedDict("_Credentials", \ - { "api_key": str, "api_secret": str, "filters": Optional[List[str]] }) - - _Reconnection = TypedDict("_Reconnection", - { "attempts": int, "reason": str, "timestamp": datetime }) +_Reconnection = TypedDict("_Reconnection", + { "attempts": int, "reason": str, "timestamp": datetime }) _DEFAULT_LOGGER = Logger("bfxapi.websocket._client", level=0) @@ -72,22 +70,18 @@ class _Delay: self.__backoff_delay = _Delay.__BACKOFF_MIN class BfxWebSocketClient(Connection): - VERSION = 2 - - MAXIMUM_CONNECTIONS_AMOUNT = 20 - def __init__(self, host: str, *, - credentials: Optional["_Credentials"] = None, - timeout: Optional[float] = 60 * 15, + credentials: Optional[_Credentials] = None, + timeout: Optional[int] = 60 * 15, logger: Logger = _DEFAULT_LOGGER) -> None: super().__init__(host) self.__credentials, self.__timeout, self.__logger = \ credentials, timeout, logger - self.__buckets: Dict[BfxWebSocketBucket, Optional["Task"]] = { } + self.__buckets: Dict[BfxWebSocketBucket, Optional[Task]] = { } self.__reconnection: Optional[_Reconnection] = None @@ -113,32 +107,14 @@ class BfxWebSocketClient(Connection): def inputs(self) -> BfxWebSocketInputs: return self.__inputs - def run(self, connections: int = 5) -> None: - return asyncio.run(self.start(connections)) - - async def start(self, connections: int = 5) -> None: - if connections == 0: - self.__logger.info("With connections set to 0 it will not be possible to subscribe to any " \ - "public channel. Attempting a subscription will cause a ZeroConnectionsError to be thrown.") - - if connections > BfxWebSocketClient.MAXIMUM_CONNECTIONS_AMOUNT: - self.__logger.warning(f"It is not safe to use more than {BfxWebSocketClient.MAXIMUM_CONNECTIONS_AMOUNT} " \ - f"buckets from the same connection ({connections} in use), the server could momentarily " \ - "block the client with <429 Too Many Requests>.") - - for _ in range(connections): - _bucket = BfxWebSocketBucket( \ - self._host, self.__event_emitter) - - self.__buckets.update({ _bucket: None }) - - await self.__connect() + def run(self) -> None: + return asyncio.run(self.start()) #pylint: disable-next=too-many-branches - async def __connect(self) -> None: + async def start(self) -> None: _delay = _Delay(backoff_factor=1.618) - _sleep: Optional["Task"] = None + _sleep: Optional[Task] = None def _on_timeout(): if not self.open: @@ -158,19 +134,20 @@ class BfxWebSocketClient(Connection): from None try: - await self.__connection() + await self.__connect() except (ConnectionClosedError, InvalidStatusCode, gaierror) as error: - async def _cancel(task: "Task") -> None: + async def _cancel(task: Task) -> None: task.cancel() try: await task - except (ConnectionClosedError, gaierror) as _e: + except (ConnectionClosedError, InvalidStatusCode, gaierror) as _e: if type(error) is not type(_e) or error.args != _e.args: raise _e except asyncio.CancelledError: pass + # pylint: disable-next=consider-using-dict-items for bucket in self.__buckets: if task := self.__buckets[bucket]: self.__buckets[bucket] = None @@ -186,7 +163,7 @@ class BfxWebSocketClient(Connection): self.__logger.info("WSS server is about to restart, clients need " \ "to reconnect (server sent 20051). Reconnection attempt in progress...") - if self.__timeout is not None: + if self.__timeout: asyncio.get_event_loop().call_later( self.__timeout, _on_timeout) @@ -214,8 +191,8 @@ class BfxWebSocketClient(Connection): break - async def __connection(self) -> None: - async with _websockets__connect(self._host) as websocket: + async def __connect(self) -> None: + async with websockets.client.connect(self._host) as websocket: if self.__reconnection: self.__logger.info(f"_Reconnection attempt successful (no.{self.__reconnection['attempts']}): The " \ f"client has been offline for a total of {datetime.now() - self.__reconnection['timestamp']} " \ @@ -225,11 +202,9 @@ class BfxWebSocketClient(Connection): self._websocket = websocket - self.__buckets = { - bucket: asyncio.create_task(_c) - for bucket in self.__buckets - if (_c := bucket.start()) - } + for bucket in self.__buckets: + self.__buckets[bucket] = \ + asyncio.create_task(bucket.start()) if len(self.__buckets) == 0 or \ (await asyncio.gather(*[bucket.wait() for bucket in self.__buckets])): @@ -241,29 +216,31 @@ class BfxWebSocketClient(Connection): await self._websocket.send(authentication) - async for message in self._websocket: - message = json.loads(message) + async for _message in self._websocket: + message = json.loads(_message) if isinstance(message, dict): if message["event"] == "info" and "version" in message: - if BfxWebSocketClient.VERSION != message["version"]: - raise OutdatedClientVersion("Mismatch between the client version and the server " \ - "version. Update the library to the latest version to continue (client version: " \ - f"{BfxWebSocketClient.VERSION}, server version: {message['version']}).") + if message["version"] != 2: + raise VersionMismatchError("Mismatch between the client and the server version: " + \ + "please update bitfinex-api-py to the latest version to resolve this error " + \ + f"(client version: 2, server version: {message['version']}).") elif message["event"] == "info" and message["code"] == 20051: - code, reason = 1012, "Stop/Restart WebSocket Server (please reconnect)." - rcvd = websockets.frames.Close(code=code, reason=reason) + rcvd = websockets.frames.Close( \ + 1012, "Stop/Restart WebSocket Server (please reconnect).") + raise ConnectionClosedError(rcvd=rcvd, sent=None) elif message["event"] == "auth": if message["status"] != "OK": - raise InvalidAuthenticationCredentials( - "Cannot authenticate with given API-KEY and API-SECRET.") + raise InvalidCredentialError("Cannot authenticate " + \ + "with given API-KEY and API-SECRET.") self.__event_emitter.emit("authenticated", message) self._authentication = True elif message["event"] == "error": - self.__event_emitter.emit("wss-error", message["code"], message["msg"]) + self.__event_emitter.emit("wss-error", \ + message["code"], message["msg"]) if isinstance(message, list) and \ message[0] == 0 and message[1] != Connection._HEARTBEAT: @@ -294,14 +271,20 @@ class BfxWebSocketClient(Connection): @Connection.require_websocket_connection async def unsubscribe(self, sub_id: str) -> None: for bucket in self.__buckets: - if bucket.has(sub_id=sub_id): - await bucket.unsubscribe(sub_id=sub_id) + if bucket.has(sub_id): + return await bucket.unsubscribe(sub_id) + + raise UnknownSubscriptionError("Unable to find " + \ + f"a subscription with sub_id <{sub_id}>.") @Connection.require_websocket_connection async def resubscribe(self, sub_id: str) -> None: for bucket in self.__buckets: - if bucket.has(sub_id=sub_id): - await bucket.resubscribe(sub_id=sub_id) + if bucket.has(sub_id): + return await bucket.resubscribe(sub_id) + + raise UnknownSubscriptionError("Unable to find " + \ + f"a subscription with sub_id <{sub_id}>.") @Connection.require_websocket_connection async def close(self, code: int = 1000, reason: str = str()) -> None: @@ -323,7 +306,7 @@ class BfxWebSocketClient(Connection): @Connection.require_websocket_authentication async def __handle_websocket_input(self, event: str, data: Any) -> None: - await self._websocket.send(json.dumps(\ + await self._websocket.send(json.dumps( \ [ 0, event, None, data], cls=JSONEncoder)) @no_type_check