Add new module bfxapi._utils.json_decoder.

This commit is contained in:
Davide Casale
2023-10-08 21:49:36 +02:00
parent 9872adf60f
commit de0ee54900
6 changed files with 107 additions and 101 deletions

View File

@@ -0,0 +1,13 @@
from typing import Dict, Any
import re, json
def _to_snake_case(string: str) -> str:
return re.sub(r"(?<!^)(?=[A-Z])", "_", string).lower()
def _object_hook(data: Dict[str, Any]) -> Any:
return { _to_snake_case(key): value for key, value in data.items() }
class JSONDecoder(json.JSONDecoder):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(object_hook=_object_hook, *args, **kwargs)

View File

@@ -1,11 +1,11 @@
import json
from decimal import Decimal
from typing import \
Union, List, Dict, \
Any
import json
from decimal import Decimal
_ExtJSON = Union[Dict[str, "_ExtJSON"], List["_ExtJSON"], \
bool, int, float, str, Decimal, None]

View File

@@ -1,46 +1,32 @@
import re
from typing import Callable, TypeVar, cast, \
TypedDict, Dict, List, Union, Literal, Optional, Any
from typing import \
TypedDict, Dict, List, \
Union, Literal, Optional, \
Any
from decimal import Decimal
from ..middleware import Middleware
from bfxapi.rest.middleware import Middleware
from ..enums import MerchantSettingsKey
from bfxapi.rest.enums import MerchantSettingsKey
from ...types import \
InvoiceSubmission, InvoicePage, InvoiceStats, \
CurrencyConversion, MerchantDeposit, MerchantUnlinkedDeposit
#region Defining methods to convert dictionary keys to snake_case and camelCase.
T = TypeVar("T")
_to_snake_case: Callable[[str], str] = lambda string: re.sub(r"(?<!^)(?=[A-Z])", "_", string).lower()
_to_camel_case: Callable[[str], str] = lambda string: \
(components := string.split("_"))[0] + str().join(c.title() for c in components[1:])
def _scheme(data: T, adapter: Callable[[str], str]) -> T:
if isinstance(data, list):
return cast(T, [ _scheme(sub_data, adapter) for sub_data in data ])
if isinstance(data, dict):
return cast(T, { adapter(key): _scheme(value, adapter) for key, value in data.items() })
return data
def _to_snake_case_keys(dictionary: T) -> T:
return _scheme(dictionary, _to_snake_case)
def _to_camel_case_keys(dictionary: T) -> T:
return _scheme(dictionary, _to_camel_case)
#endregion
from bfxapi.types import \
InvoiceSubmission, \
InvoicePage, \
InvoiceStats, \
CurrencyConversion, \
MerchantDeposit, \
MerchantUnlinkedDeposit
_CustomerInfo = TypedDict("_CustomerInfo", {
"nationality": str, "resid_country": str, "resid_city": str,
"resid_zip_code": str, "resid_street": str, "resid_building_no": str,
"full_name": str, "email": str, "tos_accepted": bool
"nationality": str,
"resid_country": str,
"resid_city": str,
"resid_zip_code": str,
"resid_street": str,
"resid_building_no": str,
"full_name": str,
"email": str,
"tos_accepted": bool
})
class RestMerchantEndpoints(Middleware):
@@ -55,13 +41,13 @@ class RestMerchantEndpoints(Middleware):
duration: Optional[int] = None,
webhook: Optional[str] = None,
redirect_url: Optional[str] = None) -> InvoiceSubmission:
body = _to_camel_case_keys({
"amount": amount, "currency": currency, "order_id": order_id,
"customer_info": customer_info, "pay_currencies": pay_currencies, "duration": duration,
"webhook": webhook, "redirect_url": redirect_url
})
body = {
"amount": amount, "currency": currency, "orderId": order_id,
"customerInfo": customer_info, "payCurrencies": pay_currencies, "duration": duration,
"webhook": webhook, "redirectUrl": redirect_url
}
data = _to_snake_case_keys(self._post("auth/w/ext/pay/invoice/create", body=body))
data = self._post("auth/w/ext/pay/invoice/create", body=body)
return InvoiceSubmission.parse(data)
@@ -76,9 +62,9 @@ class RestMerchantEndpoints(Middleware):
"limit": limit
}
response = self._post("auth/r/ext/pay/invoices", body=body)
data = self._post("auth/r/ext/pay/invoices", body=body)
return [ InvoiceSubmission.parse(sub_data) for sub_data in _to_snake_case_keys(response) ]
return [ InvoiceSubmission.parse(sub_data) for sub_data in data ]
def get_invoices_paginated(self,
page: int = 1,
@@ -91,13 +77,13 @@ class RestMerchantEndpoints(Middleware):
crypto: Optional[List[str]] = None,
id: Optional[str] = None,
order_id: Optional[str] = None) -> InvoicePage:
body = _to_camel_case_keys({
"page": page, "page_size": page_size, "sort": sort,
"sort_field": sort_field, "status": status, "fiat": fiat,
"crypto": crypto, "id": id, "order_id": order_id
})
body = {
"page": page, "pageSize": page_size, "sort": sort,
"sortField": sort_field, "status": status, "fiat": fiat,
"crypto": crypto, "id": id, "orderId": order_id
}
data = _to_snake_case_keys(self._post("auth/r/ext/pay/invoices/paginated", body=body))
data = self._post("auth/r/ext/pay/invoices/paginated", body=body)
return InvoicePage.parse(data)
@@ -105,13 +91,15 @@ class RestMerchantEndpoints(Middleware):
status: Literal["CREATED", "PENDING", "COMPLETED", "EXPIRED"],
format: str) -> List[InvoiceStats]:
return [ InvoiceStats(**sub_data) for sub_data in \
self._post("auth/r/ext/pay/invoice/stats/count", body={ "status": status, "format": format }) ]
self._post("auth/r/ext/pay/invoice/stats/count", \
body={ "status": status, "format": format }) ]
def get_invoice_earning_stats(self,
currency: str,
format: str) -> List[InvoiceStats]:
return [ InvoiceStats(**sub_data) for sub_data in \
self._post("auth/r/ext/pay/invoice/stats/earning", body={ "currency": currency, "format": format }) ]
self._post("auth/r/ext/pay/invoice/stats/earning", \
body={ "currency": currency, "format": format }) ]
def complete_invoice(self,
id: str,
@@ -119,45 +107,43 @@ class RestMerchantEndpoints(Middleware):
*,
deposit_id: Optional[int] = None,
ledger_id: Optional[int] = None) -> InvoiceSubmission:
return InvoiceSubmission.parse(_to_snake_case_keys(self._post("auth/w/ext/pay/invoice/complete", body={
body = {
"id": id, "payCcy": pay_currency, "depositId": deposit_id,
"ledgerId": ledger_id
})))
}
data = self._post("auth/w/ext/pay/invoice/complete", body=body)
return InvoiceSubmission.parse(data)
def expire_invoice(self, id: str) -> InvoiceSubmission:
body = { "id": id }
response = self._post("auth/w/ext/pay/invoice/expire", body=body)
return InvoiceSubmission.parse(_to_snake_case_keys(response))
data = self._post("auth/w/ext/pay/invoice/expire", body=body)
return InvoiceSubmission.parse(data)
def get_currency_conversion_list(self) -> List[CurrencyConversion]:
return [
CurrencyConversion(
base_currency=sub_data["baseCcy"],
convert_currency=sub_data["convertCcy"],
created=sub_data["created"]
) for sub_data in self._post("auth/r/ext/pay/settings/convert/list")
]
return [ CurrencyConversion(**sub_data) \
for sub_data in self._post("auth/r/ext/pay/settings/convert/list") ]
def add_currency_conversion(self,
base_currency: str,
convert_currency: str) -> bool:
return bool(self._post("auth/w/ext/pay/settings/convert/create", body={
"baseCcy": base_currency,
"convertCcy": convert_currency
}))
base_ccy: str,
convert_ccy: str) -> bool:
return bool(self._post("auth/w/ext/pay/settings/convert/create", \
body={ "baseCcy": base_ccy, "convertCcy": convert_ccy }))
def remove_currency_conversion(self,
base_currency: str,
convert_currency: str) -> bool:
return bool(self._post("auth/w/ext/pay/settings/convert/remove", body={
"baseCcy": base_currency,
"convertCcy": convert_currency
}))
base_ccy: str,
convert_ccy: str) -> bool:
return bool(self._post("auth/w/ext/pay/settings/convert/remove", \
body={ "baseCcy": base_ccy, "convertCcy": convert_ccy }))
def set_merchant_settings(self,
key: MerchantSettingsKey,
val: Any) -> bool:
return bool(self._post("auth/w/ext/pay/settings/set", body={ "key": key, "val": val }))
return bool(self._post("auth/w/ext/pay/settings/set", \
body={ "key": key, "val": val }))
def get_merchant_settings(self, key: MerchantSettingsKey) -> Any:
return self._post("auth/r/ext/pay/settings/get", body={ "key": key })
@@ -167,19 +153,28 @@ class RestMerchantEndpoints(Middleware):
def get_deposits(self,
start: int,
end: int,
to: int,
*,
ccy: Optional[str] = None,
unlinked: Optional[bool] = None) -> List[MerchantDeposit]:
body = { "from": start, "to": end, "ccy": ccy, "unlinked": unlinked }
response = self._post("auth/r/ext/pay/deposits", body=body)
return [ MerchantDeposit(**sub_data) for sub_data in _to_snake_case_keys(response) ]
body = {
"from": start, "to": to, "ccy": ccy,
"unlinked": unlinked
}
data = self._post("auth/r/ext/pay/deposits", body=body)
return [ MerchantDeposit(**sub_data) for sub_data in data ]
def get_unlinked_deposits(self,
ccy: str,
*,
start: Optional[int] = None,
end: Optional[int] = None) -> List[MerchantUnlinkedDeposit]:
body = { "ccy": ccy, "start": start, "end": end }
response = self._post("/auth/r/ext/pay/deposits/unlinked", body=body)
return [ MerchantUnlinkedDeposit(**sub_data) for sub_data in _to_snake_case_keys(response) ]
body = {
"ccy": ccy, "start": start, "end": end
}
data = self._post("/auth/r/ext/pay/deposits/unlinked", body=body)
return [ MerchantUnlinkedDeposit(**sub_data) for sub_data in data ]

View File

@@ -7,6 +7,7 @@ import time, hmac, hashlib, json, requests
from ..enums import Error
from ..exceptions import ResourceNotFound, RequestParametersError, InvalidAuthenticationCredentials, UnknownGenericError
from ..._utils.json_encoder import JSONEncoder
from ..._utils.json_decoder import JSONDecoder
if TYPE_CHECKING:
from requests.sessions import _Params
@@ -49,7 +50,7 @@ class Middleware:
if response.status_code == HTTPStatus.NOT_FOUND:
raise ResourceNotFound(f"No resources found at endpoint <{endpoint}>.")
data = response.json()
data = response.json(cls=JSONDecoder)
if len(data) and data[0] == "error":
if data[1] == Error.ERR_PARAMS:
@@ -82,7 +83,7 @@ class Middleware:
if response.status_code == HTTPStatus.NOT_FOUND:
raise ResourceNotFound(f"No resources found at endpoint <{endpoint}>.")
data = response.json()
data = response.json(cls=JSONDecoder)
if isinstance(data, list) and len(data) and data[0] == "error":
if data[1] == Error.ERR_PARAMS:

View File

@@ -657,8 +657,8 @@ class InvoiceStats(_Type):
@dataclass
class CurrencyConversion(_Type):
base_currency: str
convert_currency: str
base_ccy: str
convert_ccy: str
created: int
@dataclass

View File

@@ -7,16 +7,20 @@ import asyncio, json, uuid
import websockets.client
from pyee import EventEmitter
from bfxapi._utils.json_decoder import JSONDecoder
from bfxapi.websocket._connection import Connection
from bfxapi.websocket._handlers import PublicChannelsHandler
from bfxapi.websocket.subscriptions import Subscription
from bfxapi.websocket.exceptions import FullBucketError
_CHECKSUM_FLAG_VALUE = 131_072
def _strip(message: Dict[str, Any], keys: List[str]) -> Dict[str, Any]:
return { key: message[key] for key in message if not key in keys }
return { key: value for key, value in message.items() \
if not key in keys }
class BfxWebSocketBucket(Connection):
__MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25
@@ -53,16 +57,9 @@ class BfxWebSocketBucket(Connection):
self.__condition.notify(1)
async for _message in self._websocket:
message = json.loads(_message)
message = json.loads(_message, cls=JSONDecoder)
if isinstance(message, dict):
# I think there's a better way to do it...
if "chanId" in message:
message["chan_id"] = message.pop("chanId")
if "subId" in message:
message["sub_id"] = message.pop("subId")
if message["event"] == "subscribed":
self.__on_subscribed(message)
elif message["event"] == "unsubscribed":
@@ -83,7 +80,7 @@ class BfxWebSocketBucket(Connection):
chan_id = cast(int, message["chan_id"])
subscription = cast(Subscription, _strip(message, \
keys=["event", "chan_id", "pair", "currency"]))
keys=["chan_id", "event", "pair", "currency"]))
self.__pendings = [ pending \
for pending in self.__pendings \