Rewrite bfxapi/rest/_Requests.py with type hinting. Add None values erasement in bfxapi/utils/JSONEncoder.py. Update code with new improvements.

This commit is contained in:
Davide Casale
2023-02-06 19:15:58 +01:00
parent 929ae62d2f
commit c588d9f20c
6 changed files with 86 additions and 96 deletions

View File

@@ -1,21 +1,29 @@
import time, hmac, hashlib, json, requests
from typing import TYPE_CHECKING, Optional, Any
from http import HTTPStatus
from .enums import Error
from .exceptions import ResourceNotFound, RequestParametersError, InvalidAuthenticationCredentials, UnknownGenericError
from ..utils.JSONEncoder import JSONEncoder
if TYPE_CHECKING:
from requests.sessions import _Params
class _Requests(object):
def __init__(self, host, API_KEY = None, API_SECRET = None):
def __init__(self, host: str, API_KEY: Optional[str] = None, API_SECRET: Optional[str] = None):
self.host, self.API_KEY, self.API_SECRET = host, API_KEY, API_SECRET
def __build_authentication_headers(self, endpoint, data):
def __build_authentication_headers(self, endpoint: str, data: str):
assert isinstance(self.API_KEY, str) and isinstance(self.API_SECRET, str), \
"API_KEY and API_SECRET must be both str to call __build_authentication_headers"
nonce = str(int(time.time()) * 1000)
path = f"/api/v2/{endpoint}{nonce}"
if data != None: path += data
if data == None:
path = f"/api/v2/{endpoint}{nonce}"
else: path = f"/api/v2/{endpoint}{nonce}{data}"
signature = hmac.new(
self.API_SECRET.encode("utf8"),
@@ -29,7 +37,7 @@ class _Requests(object):
"bfx-apikey": self.API_KEY
}
def _GET(self, endpoint, params = None):
def _GET(self, endpoint: str, params: Optional["_Params"] = None) -> Any:
response = requests.get(f"{self.host}/{endpoint}", params=params)
if response.status_code == HTTPStatus.NOT_FOUND:
@@ -46,11 +54,10 @@ class _Requests(object):
return data
def _POST(self, endpoint, params = None, data = None, _ignore_authentication_headers = False):
headers = { "Content-Type": "application/json" }
def _POST(self, endpoint: str, params: Optional["_Params"] = None, body: Optional[Any] = None, _ignore_authentication_headers: bool = False) -> Any:
data = json.dumps(body, cls=JSONEncoder)
if isinstance(data, dict):
data = json.dumps({ key: value for key, value in data.items() if value != None}, cls=JSONEncoder)
headers = { "Content-Type": "application/json" }
if self.API_KEY and self.API_SECRET and _ignore_authentication_headers == False:
headers = { **headers, **self.__build_authentication_headers(endpoint, data) }