mirror of
https://github.com/aljazceru/bitfinex-api-py.git
synced 2025-12-18 22:34:21 +01:00
# Description <!--- Describe your changes in detail --> PR includes some global refactoring in preparation for the v3.0.0 stable release. ## Motivation and Context <!--- Why is this change required? What problem does it solve? --> - ## Related Issue <!--- If suggesting a new feature or change, please discuss it in an issue first --> <!--- If fixing a bug, there should be an issue describing it with steps to reproduce --> <!--- Please link to the issue here: --> PR fixes the following issue: - ## Type of change <!-- Select the most suitable choice and remove the others from the checklist --> - [X] Bug fix (non-breaking change which fixes an issue); # Checklist: - [X] I've done a self-review of my code; - [X] I've made corresponding changes to the documentation; - [X] I've made sure my changes generate no warnings; - [X] mypy returns no errors when run on the root package; <!-- If you use pre-commit hooks you can always check off the following tasks --> - [X] I've run black to format my code; - [X] I've run isort to format my code's import statements; - [X] flake8 reports no errors when run on the entire code base;
130 lines
3.7 KiB
Python
130 lines
3.7 KiB
Python
import hashlib
|
|
import hmac
|
|
import json
|
|
from datetime import datetime
|
|
from enum import IntEnum
|
|
from typing import TYPE_CHECKING, Any, List, Optional
|
|
|
|
import requests
|
|
|
|
from bfxapi._utils.json_decoder import JSONDecoder
|
|
from bfxapi._utils.json_encoder import JSONEncoder
|
|
from bfxapi.exceptions import InvalidCredentialError
|
|
from bfxapi.rest.exceptions import RequestParametersError, UnknownGenericError
|
|
|
|
if TYPE_CHECKING:
|
|
from requests.sessions import _Params
|
|
|
|
|
|
class _Error(IntEnum):
|
|
ERR_UNK = 10000
|
|
ERR_GENERIC = 10001
|
|
ERR_PARAMS = 10020
|
|
ERR_AUTH_FAIL = 10100
|
|
|
|
|
|
class Middleware:
|
|
__TIMEOUT = 30
|
|
|
|
def __init__(
|
|
self, host: str, api_key: Optional[str] = None, api_secret: Optional[str] = None
|
|
):
|
|
self.__host = host
|
|
|
|
self.__api_key = api_key
|
|
|
|
self.__api_secret = api_secret
|
|
|
|
def get(self, endpoint: str, params: Optional["_Params"] = None) -> Any:
|
|
headers = {"Accept": "application/json"}
|
|
|
|
if self.__api_key and self.__api_secret:
|
|
headers = {**headers, **self.__get_authentication_headers(endpoint)}
|
|
|
|
request = requests.get(
|
|
url=f"{self.__host}/{endpoint}",
|
|
params=params,
|
|
headers=headers,
|
|
timeout=Middleware.__TIMEOUT,
|
|
)
|
|
|
|
data = request.json(cls=JSONDecoder)
|
|
|
|
if isinstance(data, list) and len(data) > 0 and data[0] == "error":
|
|
self.__handle_error(data)
|
|
|
|
return data
|
|
|
|
def post(
|
|
self,
|
|
endpoint: str,
|
|
body: Optional[Any] = None,
|
|
params: Optional["_Params"] = None,
|
|
) -> Any:
|
|
_body = body and json.dumps(body, cls=JSONEncoder) or None
|
|
|
|
headers = {"Accept": "application/json", "Content-Type": "application/json"}
|
|
|
|
if self.__api_key and self.__api_secret:
|
|
headers = {
|
|
**headers,
|
|
**self.__get_authentication_headers(endpoint, _body),
|
|
}
|
|
|
|
request = requests.post(
|
|
url=f"{self.__host}/{endpoint}",
|
|
data=_body,
|
|
params=params,
|
|
headers=headers,
|
|
timeout=Middleware.__TIMEOUT,
|
|
)
|
|
|
|
data = request.json(cls=JSONDecoder)
|
|
|
|
if isinstance(data, list) and len(data) > 0 and data[0] == "error":
|
|
self.__handle_error(data)
|
|
|
|
return data
|
|
|
|
def __handle_error(self, error: List[Any]) -> None:
|
|
if error[1] == _Error.ERR_PARAMS:
|
|
raise RequestParametersError(
|
|
"The request was rejected with the following parameter"
|
|
f"error: <{error[2]}>"
|
|
)
|
|
|
|
if error[1] == _Error.ERR_AUTH_FAIL:
|
|
raise InvalidCredentialError(
|
|
"Cannot authenticate with given API-KEY and API-SECRET."
|
|
)
|
|
|
|
if not error[1] or error[1] == _Error.ERR_UNK or error[1] == _Error.ERR_GENERIC:
|
|
raise UnknownGenericError(
|
|
"The server replied to the request with a generic error with "
|
|
f"the following message: <{error[2]}>."
|
|
)
|
|
|
|
def __get_authentication_headers(self, endpoint: str, data: Optional[str] = None):
|
|
assert (
|
|
self.__api_key and self.__api_secret
|
|
), "API-KEY and API-SECRET must be strings."
|
|
|
|
nonce = str(round(datetime.now().timestamp() * 1_000_000))
|
|
|
|
if not data:
|
|
message = f"/api/v2/{endpoint}{nonce}"
|
|
else:
|
|
message = f"/api/v2/{endpoint}{nonce}{data}"
|
|
|
|
signature = hmac.new(
|
|
key=self.__api_secret.encode("utf8"),
|
|
msg=message.encode("utf8"),
|
|
digestmod=hashlib.sha384,
|
|
)
|
|
|
|
return {
|
|
"bfx-nonce": nonce,
|
|
"bfx-signature": signature.hexdigest(),
|
|
"bfx-apikey": self.__api_key,
|
|
}
|