Add support for new rest public endpoints (in BfxRestInterface.py, serializers.py and typings.py).

This commit is contained in:
Davide Casale
2022-12-01 17:48:50 +01:00
parent 8e8719e3d7
commit ea6044a5eb
4 changed files with 186 additions and 14 deletions

View File

@@ -1,4 +1,4 @@
from typing import Generic, TypeVar, Iterable, List, Any
from typing import Generic, TypeVar, Iterable, Optional, List, Any
from . import typings
@@ -7,19 +7,21 @@ from .exceptions import BfxRestException
T = TypeVar("T")
class _Serializer(Generic[T]):
def __init__(self, name: str, labels: List[str]):
self.name, self.__labels = name, labels
def __init__(self, name: str, labels: List[str], IGNORE: List[str] = [ "_PLACEHOLDER" ]):
self.name, self.__labels, self.__IGNORE = name, labels, IGNORE
def __serialize(self, *args: Any, IGNORE: List[str] = [ "_PLACEHOLDER" ]) -> Iterable[T]:
if len(self.__labels) != len(args):
raise BfxRestException("<self.__labels> and <*args> arguments should contain the same amount of elements.")
def __serialize(self, *args: Any, skip: Optional[List[str]]) -> Iterable[T]:
labels = list(filter(lambda label: label not in (skip or list()), self.__labels))
for index, label in enumerate(self.__labels):
if label not in IGNORE:
if len(labels) != len(args):
raise BfxRestException("<labels> and <*args> arguments should contain the same amount of elements.")
for index, label in enumerate(labels):
if label not in self.__IGNORE:
yield label, args[index]
def parse(self, *values: Any) -> T:
return dict(self.__serialize(*values))
def parse(self, *values: Any, skip: Optional[List[str]] = None) -> T:
return dict(self.__serialize(*values, skip=skip))
#region Serializers definition for Rest Public Endpoints
@@ -27,4 +29,69 @@ PlatformStatus = _Serializer[typings.PlatformStatus]("PlatformStatus", labels=[
"OPERATIVE"
])
TradingPairTicker = _Serializer[typings.TradingPairTicker]("TradingPairTicker", labels=[
"SYMBOL",
"BID",
"BID_SIZE",
"ASK",
"ASK_SIZE",
"DAILY_CHANGE",
"DAILY_CHANGE_RELATIVE",
"LAST_PRICE",
"VOLUME",
"HIGH",
"LOW"
])
FundingCurrencyTicker = _Serializer[typings.FundingCurrencyTicker]("FundingCurrencyTicker", labels=[
"SYMBOL",
"FRR",
"BID",
"BID_PERIOD",
"BID_SIZE",
"ASK",
"ASK_PERIOD",
"ASK_SIZE",
"DAILY_CHANGE",
"DAILY_CHANGE_RELATIVE",
"LAST_PRICE",
"VOLUME",
"HIGH",
"LOW",
"_PLACEHOLDER",
"_PLACEHOLDER",
"FRR_AMOUNT_AVAILABLE"
])
TickerHistory = _Serializer[typings.TickerHistory]("TickerHistory", labels=[
"SYMBOL",
"BID",
"_PLACEHOLDER",
"ASK",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"MTS"
])
TradingPairTrade = _Serializer[typings.TradingPairTrade]("TradingPairTrade", labels=[
"ID",
"MTS",
"AMOUNT",
"PRICE"
])
FundingCurrencyTrade = _Serializer[typings.FundingCurrencyTrade]("FundingCurrencyTrade", labels=[
"ID",
"MTS",
"AMOUNT",
"RATE",
"PERIOD"
])
#endregion