Merge pull request #3 from Davi0kProgramsThings/fix/pylint

Merge branch `fix/pylint` in branch `master`.
This commit is contained in:
Davide Casale
2023-03-08 22:08:43 +01:00
committed by GitHub
65 changed files with 2472 additions and 1764 deletions

4
.gitignore vendored
View File

@@ -2,8 +2,10 @@
*.pyc
*.log
bitfinex_api_py.egg-info
__pycache__
dist
venv
!.gitkeep
!.gitkeep

View File

@@ -1,11 +1,34 @@
[MAIN]
py-version = 3.8.0
py-version=3.8.0
ignore=examples
[MESSAGES CONTROL]
disable=
multiple-imports,
missing-docstring,
too-few-public-methods
logging-not-lazy,
logging-fstring-interpolation,
too-few-public-methods,
too-many-public-methods,
too-many-instance-attributes,
dangerous-default-value,
inconsistent-return-statements,
[SIMILARITIES]
min-similarity-lines=6
[VARIABLES]
allowed-redefined-builtins=type,dir,id,all,format,len
[FORMAT]
max-line-length=120
expected-line-ending-format=LF
[BASIC]
good-names=id,on,pl,t,ip,tf,A,B,C,D,E,F
[TYPECHECK]
generated-members=websockets
[STRING]
check-quote-consistency=yes

View File

@@ -3,4 +3,4 @@ from .client import Client
from .urls import REST_HOST, PUB_REST_HOST, \
WSS_HOST, PUB_WSS_HOST
NAME = "bfxapi"
NAME = "bfxapi"

View File

@@ -19,7 +19,7 @@ class Client:
credentials = None
if api_key and api_secret:
credentials = { "API_KEY": api_key, "API_SECRET": api_secret, "filters": filters }
credentials = { "api_key": api_key, "api_secret": api_secret, "filters": filters }
self.rest = BfxRestInterface(
host=rest_host,

View File

@@ -47,4 +47,4 @@ class Error(int, Enum):
ERR_SUB_LIMIT = 10305
ERR_UNSUB_FAIL = 10400
ERR_UNSUB_NOT = 10401
ERR_READY = 11000
ERR_READY = 11000

View File

@@ -9,11 +9,8 @@ class BfxBaseException(Exception):
Base class for every custom exception in bfxapi/rest/exceptions.py and bfxapi/websocket/exceptions.py.
"""
pass
class LabelerSerializerException(BfxBaseException):
"""
This exception indicates an error thrown by the _Serializer class in bfxapi/labeler.py.
"""
pass

View File

@@ -1,12 +1,12 @@
from .exceptions import LabelerSerializerException
from typing import Type, Generic, TypeVar, Iterable, Dict, List, Tuple, Any, cast
from typing import Type, Generic, TypeVar, Iterable, Optional, Dict, List, Tuple, Any, cast
from .exceptions import LabelerSerializerException
T = TypeVar("T", bound="_Type")
def compose(*decorators):
def wrapper(function):
for decorator in reversed(decorators):
for decorator in reversed(decorators):
function = decorator(function)
return function
@@ -28,50 +28,70 @@ def partial(cls):
return cls
class _Type(object):
class _Type:
"""
Base class for any dataclass serializable by the _Serializer generic class.
"""
pass
class _Serializer(Generic[T]):
def __init__(self, name: str, klass: Type[_Type], labels: List[str], IGNORE: List[str] = [ "_PLACEHOLDER" ]):
self.name, self.klass, self.__labels, self.__IGNORE = name, klass, labels, IGNORE
def __init__(self, name: str, klass: Type[_Type], labels: List[str],
*, flat: bool = False, ignore: List[str] = [ "_PLACEHOLDER" ]):
self.name, self.klass, self.__labels, self.__flat, self.__ignore = name, klass, labels, flat, ignore
def _serialize(self, *args: Any, skip: Optional[List[str]] = None) -> Iterable[Tuple[str, Any]]:
labels = list(filter(lambda label: label not in (skip or list()), self.__labels))
def _serialize(self, *args: Any) -> Iterable[Tuple[str, Any]]:
if self.__flat:
args = tuple(_Serializer.__flatten(list(args)))
if len(labels) > len(args):
raise LabelerSerializerException(f"{self.name} -> <labels> and <*args> arguments should contain the same amount of elements.")
if len(self.__labels) > len(args):
raise LabelerSerializerException(f"{self.name} -> <labels> and <*args> " \
"arguments should contain the same amount of elements.")
for index, label in enumerate(labels):
if label not in self.__IGNORE:
for index, label in enumerate(self.__labels):
if label not in self.__ignore:
yield label, args[index]
def parse(self, *values: Any, skip: Optional[List[str]] = None) -> T:
return cast(T, self.klass(**dict(self._serialize(*values, skip=skip))))
def parse(self, *values: Any) -> T:
return cast(T, self.klass(**dict(self._serialize(*values))))
def get_labels(self) -> List[str]:
return [ label for label in self.__labels if label not in self.__IGNORE ]
return [ label for label in self.__labels if label not in self.__ignore ]
@classmethod
def __flatten(cls, array: List[Any]) -> List[Any]:
if len(array) == 0:
return array
if isinstance(array[0], list):
return cls.__flatten(array[0]) + cls.__flatten(array[1:])
return array[:1] + cls.__flatten(array[1:])
class _RecursiveSerializer(_Serializer, Generic[T]):
def __init__(self, name: str, klass: Type[_Type], labels: List[str], serializers: Dict[str, _Serializer[Any]], IGNORE: List[str] = ["_PLACEHOLDER"]):
super().__init__(name, klass, labels, IGNORE)
def __init__(self, name: str, klass: Type[_Type], labels: List[str],
*, serializers: Dict[str, _Serializer[Any]],
flat: bool = False, ignore: List[str] = [ "_PLACEHOLDER" ]):
super().__init__(name, klass, labels, flat=flat, ignore=ignore)
self.serializers = serializers
def parse(self, *values: Any, skip: Optional[List[str]] = None) -> T:
serialization = dict(self._serialize(*values, skip=skip))
def parse(self, *values: Any) -> T:
serialization = dict(self._serialize(*values))
for key in serialization:
if key in self.serializers.keys():
serialization[key] = self.serializers[key].parse(*serialization[key], skip=skip)
serialization[key] = self.serializers[key].parse(*serialization[key])
return cast(T, self.klass(**serialization))
def generate_labeler_serializer(name: str, klass: Type[T], labels: List[str], IGNORE: List[str] = [ "_PLACEHOLDER" ]) -> _Serializer[T]:
return _Serializer[T](name, klass, labels, IGNORE)
def generate_labeler_serializer(name: str, klass: Type[T], labels: List[str],
*, flat: bool = False, ignore: List[str] = [ "_PLACEHOLDER" ]
) -> _Serializer[T]:
return _Serializer[T](name, klass, labels, \
flat=flat, ignore=ignore)
def generate_recursive_serializer(name: str, klass: Type[T], labels: List[str], serializers: Dict[str, _Serializer[Any]], IGNORE: List[str] = [ "_PLACEHOLDER" ]) -> _RecursiveSerializer[T]:
return _RecursiveSerializer[T](name, klass, labels, serializers, IGNORE)
def generate_recursive_serializer(name: str, klass: Type[T], labels: List[str],
*, serializers: Dict[str, _Serializer[Any]],
flat: bool = False, ignore: List[str] = [ "_PLACEHOLDER" ]
) -> _RecursiveSerializer[T]:
return _RecursiveSerializer[T](name, klass, labels, \
serializers=serializers, flat=flat, ignore=ignore)

View File

@@ -1,4 +1,4 @@
from typing import List, Dict, Union, Optional, Any, TypedDict, Generic, TypeVar, cast
from typing import List, Optional, Any, Generic, TypeVar, cast
from dataclasses import dataclass
from .labeler import _Type, _Serializer
@@ -7,7 +7,7 @@ T = TypeVar("T")
@dataclass
class Notification(_Type, Generic[T]):
mts: int
type: str
type: str
message_id: Optional[int]
data: T
code: Optional[int]
@@ -18,21 +18,21 @@ class _Notification(_Serializer, Generic[T]):
__LABELS = [ "mts", "type", "message_id", "_PLACEHOLDER", "data", "code", "status", "text" ]
def __init__(self, serializer: Optional[_Serializer] = None, is_iterable: bool = False):
super().__init__("Notification", Notification, _Notification.__LABELS, IGNORE = [ "_PLACEHOLDER" ])
super().__init__("Notification", Notification, _Notification.__LABELS, ignore = [ "_PLACEHOLDER" ])
self.serializer, self.is_iterable = serializer, is_iterable
def parse(self, *values: Any, skip: Optional[List[str]] = None) -> Notification[T]:
def parse(self, *values: Any) -> Notification[T]:
notification = cast(Notification[T], Notification(**dict(self._serialize(*values))))
if isinstance(self.serializer, _Serializer):
data = cast(List[Any], notification.data)
if self.is_iterable == False:
if not self.is_iterable:
if len(data) == 1 and isinstance(data[0], list):
data = data[0]
notification.data = cast(T, self.serializer.klass(**dict(self.serializer._serialize(*data, skip=skip))))
else: notification.data = cast(T, [ self.serializer.klass(**dict(self.serializer._serialize(*sub_data, skip=skip))) for sub_data in data ])
notification.data = self.serializer.parse(*data)
else: notification.data = cast(T, [ self.serializer.parse(*sub_data) for sub_data in data ])
return notification
return notification

View File

@@ -1,4 +1,4 @@
from .endpoints import BfxRestInterface, RestPublicEndpoints, RestAuthenticatedEndpoints, \
RestMerchantEndpoints
NAME = "rest"
NAME = "rest"

View File

@@ -4,4 +4,4 @@ from .rest_public_endpoints import RestPublicEndpoints
from .rest_authenticated_endpoints import RestAuthenticatedEndpoints
from .rest_merchant_endpoints import RestMerchantEndpoints
NAME = "endpoints"
NAME = "endpoints"

View File

@@ -1,16 +1,13 @@
from typing import Optional
from .rest_public_endpoints import RestPublicEndpoints
from .rest_authenticated_endpoints import RestAuthenticatedEndpoints
from .rest_merchant_endpoints import RestMerchantEndpoints
class BfxRestInterface(object):
class BfxRestInterface:
VERSION = 2
def __init__(self, host, credentials = None):
API_KEY, API_SECRET = credentials and \
(credentials["API_KEY"], credentials["API_SECRET"]) or (None, None)
api_key, api_secret = (credentials['api_key'], credentials['api_secret']) if credentials else (None, None)
self.public = RestPublicEndpoints(host=host)
self.auth = RestAuthenticatedEndpoints(host=host, API_KEY=API_KEY, API_SECRET=API_SECRET)
self.merchant = RestMerchantEndpoints(host=host, API_KEY=API_KEY, API_SECRET=API_SECRET)
self.public = RestPublicEndpoints(host=host)
self.auth = RestAuthenticatedEndpoints(host=host, api_key=api_key, api_secret=api_secret)
self.merchant = RestMerchantEndpoints(host=host, api_key=api_key, api_secret=api_secret)

View File

@@ -1,321 +1,474 @@
from typing import List, Tuple, Union, Literal, Optional
from typing import Dict, List, Tuple, Union, Literal, Optional
from decimal import Decimal
from datetime import datetime
from .. types import *
from .. types import Notification, \
UserInfo, LoginHistory, BalanceAvailable, \
Order, Position, Trade, \
FundingTrade, OrderTrade, Ledger, \
FundingOffer, FundingCredit, FundingLoan, \
FundingAutoRenew, FundingInfo, Wallet, \
Transfer, Withdrawal, DepositAddress, \
LightningNetworkInvoice, Movement, SymbolMarginInfo, \
BaseMarginInfo, PositionClaim, PositionIncreaseInfo, \
PositionIncrease, PositionHistory, PositionSnapshot, \
PositionAudit, DerivativePositionCollateral, DerivativePositionCollateralLimits
from .. import serializers
from .. serializers import _Notification
from .. enums import Sort, OrderType, FundingOfferType
from .. middleware import Middleware
from ...utils.json_encoder import JSON
class RestAuthenticatedEndpoints(Middleware):
def get_user_info(self) -> UserInfo:
return serializers.UserInfo.parse(*self._POST(f"auth/r/info/user"))
return serializers.UserInfo \
.parse(*self._post("auth/r/info/user"))
def get_login_history(self) -> List[LoginHistory]:
return [ serializers.LoginHistory.parse(*sub_data) for sub_data in self._POST("auth/r/logins/hist") ]
return [ serializers.LoginHistory.parse(*sub_data)
for sub_data in self._post("auth/r/logins/hist") ]
def get_balance_available_for_orders_or_offers(self, symbol: str, type: str, dir: Optional[int] = None, rate: Optional[str] = None, lev: Optional[str] = None) -> BalanceAvailable:
return serializers.BalanceAvailable.parse(*self._POST("auth/calc/order/avail", body={
def get_balance_available_for_orders_or_offers(self,
symbol: str,
type: str,
*,
dir: Optional[int] = None,
rate: Optional[str] = None,
lev: Optional[str] = None) -> BalanceAvailable:
body = {
"symbol": symbol, "type": type, "dir": dir,
"rate": rate, "lev": lev
}))
}
return serializers.BalanceAvailable \
.parse(*self._post("auth/calc/order/avail", body=body))
def get_wallets(self) -> List[Wallet]:
return [ serializers.Wallet.parse(*sub_data) for sub_data in self._POST("auth/r/wallets") ]
return [ serializers.Wallet.parse(*sub_data) \
for sub_data in self._post("auth/r/wallets") ]
def get_orders(self, symbol: Optional[str] = None, ids: Optional[List[str]] = None) -> List[Order]:
endpoint = "auth/r/orders"
def get_orders(self,
*,
symbol: Optional[str] = None,
ids: Optional[List[str]] = None) -> List[Order]:
if symbol is None:
endpoint = "auth/r/orders"
else: endpoint = f"auth/r/orders/{symbol}"
if symbol != None:
endpoint += f"/{symbol}"
return [ serializers.Order.parse(*sub_data) \
for sub_data in self._post(endpoint, body={ "id": ids }) ]
return [ serializers.Order.parse(*sub_data) for sub_data in self._POST(endpoint, body={ "id": ids }) ]
def submit_order(self, type: OrderType, symbol: str, amount: Union[Decimal, float, str],
price: Optional[Union[Decimal, float, str]] = None, lev: Optional[int] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None, price_aux_limit: Optional[Union[Decimal, float, str]] = None, price_oco_stop: Optional[Union[Decimal, float, str]] = None,
gid: Optional[int] = None, cid: Optional[int] = None,
flags: Optional[int] = 0, tif: Optional[Union[datetime, str]] = None, meta: Optional[JSON] = None) -> Notification[Order]:
def submit_order(self,
type: OrderType,
symbol: str,
amount: Union[Decimal, float, str],
*,
price: Optional[Union[Decimal, float, str]] = None,
lev: Optional[int] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None,
price_oco_stop: Optional[Union[Decimal, float, str]] = None,
gid: Optional[int] = None,
cid: Optional[int] = None,
flags: Optional[int] = 0,
tif: Optional[Union[datetime, str]] = None,
meta: Optional[JSON] = None) -> Notification[Order]:
body = {
"type": type, "symbol": symbol, "amount": amount,
"price": price, "lev": lev,
"price_trailing": price_trailing, "price_aux_limit": price_aux_limit, "price_oco_stop": price_oco_stop,
"gid": gid, "cid": cid,
"flags": flags, "tif": tif, "meta": meta
"price": price, "lev": lev, "price_trailing": price_trailing,
"price_aux_limit": price_aux_limit, "price_oco_stop": price_oco_stop, "gid": gid,
"cid": cid, "flags": flags, "tif": tif,
"meta": meta
}
return serializers._Notification[Order](serializers.Order).parse(*self._POST("auth/w/order/submit", body=body))
def update_order(self, id: int, amount: Optional[Union[Decimal, float, str]] = None, price: Optional[Union[Decimal, float, str]] = None,
cid: Optional[int] = None, cid_date: Optional[str] = None, gid: Optional[int] = None,
flags: Optional[int] = 0, lev: Optional[int] = None, delta: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None, price_trailing: Optional[Union[Decimal, float, str]] = None, tif: Optional[Union[datetime, str]] = None) -> Notification[Order]:
return _Notification[Order](serializers.Order) \
.parse(*self._post("auth/w/order/submit", body=body))
def update_order(self,
id: int,
*,
amount: Optional[Union[Decimal, float, str]] = None,
price: Optional[Union[Decimal, float, str]] = None,
cid: Optional[int] = None,
cid_date: Optional[str] = None,
gid: Optional[int] = None,
flags: Optional[int] = 0,
lev: Optional[int] = None,
delta: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None,
tif: Optional[Union[datetime, str]] = None) -> Notification[Order]:
body = {
"id": id, "amount": amount, "price": price,
"cid": cid, "cid_date": cid_date, "gid": gid,
"flags": flags, "lev": lev, "delta": delta,
"price_aux_limit": price_aux_limit, "price_trailing": price_trailing, "tif": tif
}
return serializers._Notification[Order](serializers.Order).parse(*self._POST("auth/w/order/update", body=body))
def cancel_order(self, id: Optional[int] = None, cid: Optional[int] = None, cid_date: Optional[str] = None) -> Notification[Order]:
body = {
"id": id,
"cid": cid,
"cid_date": cid_date
}
return _Notification[Order](serializers.Order) \
.parse(*self._post("auth/w/order/update", body=body))
return serializers._Notification[Order](serializers.Order).parse(*self._POST("auth/w/order/cancel", body=body))
def cancel_order(self,
*,
id: Optional[int] = None,
cid: Optional[int] = None,
cid_date: Optional[str] = None) -> Notification[Order]:
return _Notification[Order](serializers.Order) \
.parse(*self._post("auth/w/order/cancel", \
body={ "id": id, "cid": cid, "cid_date": cid_date }))
def cancel_order_multi(self, ids: Optional[List[int]] = None, cids: Optional[List[Tuple[int, str]]] = None, gids: Optional[List[int]] = None, all: bool = False) -> Notification[List[Order]]:
def cancel_order_multi(self,
*,
ids: Optional[List[int]] = None,
cids: Optional[List[Tuple[int, str]]] = None,
gids: Optional[List[int]] = None,
all: bool = False) -> Notification[List[Order]]:
body = {
"ids": ids,
"cids": cids,
"gids": gids,
"ids": ids, "cids": cids, "gids": gids,
"all": int(all)
}
return serializers._Notification[List[Order]](serializers.Order, is_iterable=True).parse(*self._POST("auth/w/order/cancel/multi", body=body))
return _Notification[List[Order]](serializers.Order, is_iterable=True) \
.parse(*self._post("auth/w/order/cancel/multi", body=body))
def get_orders_history(self, symbol: Optional[str] = None, ids: Optional[List[int]] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[Order]:
if symbol == None:
def get_orders_history(self,
*,
symbol: Optional[str] = None,
ids: Optional[List[int]] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Order]:
if symbol is None:
endpoint = "auth/r/orders/hist"
else: endpoint = f"auth/r/orders/{symbol}/hist"
body = {
"id": ids,
"start": start, "end": end,
"limit": limit
"id": ids, "start": start, "end": end,
"limit": limit
}
return [ serializers.Order.parse(*sub_data) for sub_data in self._POST(endpoint, body=body) ]
return [ serializers.Order.parse(*sub_data) \
for sub_data in self._post(endpoint, body=body) ]
def get_order_trades(self, symbol: str, id: int) -> List[OrderTrade]:
return [ serializers.OrderTrade.parse(*sub_data) for sub_data in self._POST(f"auth/r/order/{symbol}:{id}/trades") ]
def get_order_trades(self,
symbol: str,
id: int) -> List[OrderTrade]:
return [ serializers.OrderTrade.parse(*sub_data) \
for sub_data in self._post(f"auth/r/order/{symbol}:{id}/trades") ]
def get_trades_history(self, symbol: Optional[str] = None, sort: Optional[Sort] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[Trade]:
if symbol == None:
def get_trades_history(self,
*,
symbol: Optional[str] = None,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Trade]:
if symbol is None:
endpoint = "auth/r/trades/hist"
else: endpoint = f"auth/r/trades/{symbol}/hist"
body = {
"sort": sort,
"start": start, "end": end,
"sort": sort, "start": start, "end": end,
"limit": limit
}
return [ serializers.Trade.parse(*sub_data) for sub_data in self._POST(endpoint, body=body) ]
return [ serializers.Trade.parse(*sub_data) \
for sub_data in self._post(endpoint, body=body) ]
def get_ledgers(self, currency: str, category: Optional[int] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[Ledger]:
def get_ledgers(self,
currency: str,
*,
category: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Ledger]:
body = {
"category": category,
"start": start, "end": end,
"limit": limit
"category": category, "start": start, "end": end,
"limit": limit
}
return [ serializers.Ledger.parse(*sub_data) for sub_data in self._POST(f"auth/r/ledgers/{currency}/hist", body=body) ]
return [ serializers.Ledger.parse(*sub_data) \
for sub_data in self._post(f"auth/r/ledgers/{currency}/hist", body=body) ]
def get_base_margin_info(self) -> BaseMarginInfo:
return serializers.BaseMarginInfo.parse(*(self._POST(f"auth/r/info/margin/base")[1]))
return serializers.BaseMarginInfo \
.parse(*(self._post("auth/r/info/margin/base")[1]))
def get_symbol_margin_info(self, symbol: str) -> SymbolMarginInfo:
response = self._POST(f"auth/r/info/margin/{symbol}")
data = [response[1]] + response[2]
return serializers.SymbolMarginInfo.parse(*data)
return serializers.SymbolMarginInfo \
.parse(*self._post(f"auth/r/info/margin/{symbol}"))
def get_all_symbols_margin_info(self) -> List[SymbolMarginInfo]:
return [ serializers.SymbolMarginInfo.parse(*([sub_data[1]] + sub_data[2])) for sub_data in self._POST(f"auth/r/info/margin/sym_all") ]
return [ serializers.SymbolMarginInfo.parse(*sub_data) \
for sub_data in self._post("auth/r/info/margin/sym_all") ]
def get_positions(self) -> List[Position]:
return [ serializers.Position.parse(*sub_data) for sub_data in self._POST("auth/r/positions") ]
return [ serializers.Position.parse(*sub_data) \
for sub_data in self._post("auth/r/positions") ]
def claim_position(self, id: int, amount: Optional[Union[Decimal, float, str]] = None) -> Notification[PositionClaim]:
return serializers._Notification[PositionClaim](serializers.PositionClaim).parse(
*self._POST("auth/w/position/claim", body={ "id": id, "amount": amount })
)
def claim_position(self,
id: int,
*,
amount: Optional[Union[Decimal, float, str]] = None) -> Notification[PositionClaim]:
return _Notification[PositionClaim](serializers.PositionClaim) \
.parse(*self._post("auth/w/position/claim", \
body={ "id": id, "amount": amount }))
def increase_position(self, symbol: str, amount: Union[Decimal, float, str]) -> Notification[PositionIncrease]:
return serializers._Notification[PositionIncrease](serializers.PositionIncrease).parse(
*self._POST("auth/w/position/increase", body={ "symbol": symbol, "amount": amount })
)
def increase_position(self,
symbol: str,
amount: Union[Decimal, float, str]) -> Notification[PositionIncrease]:
return _Notification[PositionIncrease](serializers.PositionIncrease) \
.parse(*self._post("auth/w/position/increase", \
body={ "symbol": symbol, "amount": amount }))
def get_increase_position_info(self, symbol: str, amount: Union[Decimal, float, str]) -> PositionIncreaseInfo:
response = self._POST(f"auth/r/position/increase/info", body={ "symbol": symbol, "amount": amount })
data = response[0] + [response[1][0]] + response[1][1] + [response[1][2]] + response[4] + response[5]
return serializers.PositionIncreaseInfo.parse(*data)
def get_increase_position_info(self,
symbol: str,
amount: Union[Decimal, float, str]) -> PositionIncreaseInfo:
return serializers.PositionIncreaseInfo \
.parse(*self._post("auth/r/position/increase/info", \
body={ "symbol": symbol, "amount": amount }))
def get_positions_history(self, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[PositionHistory]:
return [ serializers.PositionHistory.parse(*sub_data) for sub_data in self._POST("auth/r/positions/hist", body={ "start": start, "end": end, "limit": limit }) ]
def get_positions_history(self,
*,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[PositionHistory]:
return [ serializers.PositionHistory.parse(*sub_data) \
for sub_data in self._post("auth/r/positions/hist", \
body={ "start": start, "end": end, "limit": limit }) ]
def get_positions_snapshot(self, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[PositionSnapshot]:
return [ serializers.PositionSnapshot.parse(*sub_data) for sub_data in self._POST("auth/r/positions/snap", body={ "start": start, "end": end, "limit": limit }) ]
def get_positions_snapshot(self,
*,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[PositionSnapshot]:
return [ serializers.PositionSnapshot.parse(*sub_data) \
for sub_data in self._post("auth/r/positions/snap", \
body={ "start": start, "end": end, "limit": limit }) ]
def get_positions_audit(self, ids: Optional[List[int]] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[PositionAudit]:
return [ serializers.PositionAudit.parse(*sub_data) for sub_data in self._POST("auth/r/positions/audit", body={ "ids": ids, "start": start, "end": end, "limit": limit }) ]
def get_positions_audit(self,
*,
ids: Optional[List[int]] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[PositionAudit]:
body = {
"ids": ids, "start": start, "end": end,
"limit": limit
}
def set_derivative_position_collateral(self, symbol: str, collateral: Union[Decimal, float, str]) -> DerivativePositionCollateral:
return serializers.DerivativePositionCollateral.parse(*(self._POST("auth/w/deriv/collateral/set", body={ "symbol": symbol, "collateral": collateral })[0]))
return [ serializers.PositionAudit.parse(*sub_data) \
for sub_data in self._post("auth/r/positions/audit", body=body) ]
def set_derivative_position_collateral(self,
symbol: str,
collateral: Union[Decimal, float, str]) -> DerivativePositionCollateral:
return serializers.DerivativePositionCollateral \
.parse(*(self._post("auth/w/deriv/collateral/set", \
body={ "symbol": symbol, "collateral": collateral })[0]))
def get_derivative_position_collateral_limits(self, symbol: str) -> DerivativePositionCollateralLimits:
return serializers.DerivativePositionCollateralLimits.parse(*self._POST("auth/calc/deriv/collateral/limits", body={ "symbol": symbol }))
return serializers.DerivativePositionCollateralLimits \
.parse(*self._post("auth/calc/deriv/collateral/limit", body={ "symbol": symbol }))
def get_funding_offers(self, symbol: Optional[str] = None) -> List[FundingOffer]:
endpoint = "auth/r/funding/offers"
def get_funding_offers(self, *, symbol: Optional[str] = None) -> List[FundingOffer]:
if symbol is None:
endpoint = "auth/r/funding/offers"
else: endpoint = f"auth/r/funding/offers/{symbol}"
if symbol != None:
endpoint += f"/{symbol}"
return [ serializers.FundingOffer.parse(*sub_data) \
for sub_data in self._post(endpoint) ]
return [ serializers.FundingOffer.parse(*sub_data) for sub_data in self._POST(endpoint) ]
def submit_funding_offer(self, type: FundingOfferType, symbol: str, amount: Union[Decimal, float, str],
rate: Union[Decimal, float, str], period: int,
#pylint: disable-next=too-many-arguments
def submit_funding_offer(self,
type: FundingOfferType,
symbol: str,
amount: Union[Decimal, float, str],
rate: Union[Decimal, float, str],
period: int,
*,
flags: Optional[int] = 0) -> Notification[FundingOffer]:
body = {
"type": type, "symbol": symbol, "amount": amount,
"rate": rate, "period": period,
"flags": flags
"rate": rate, "period": period, "flags": flags
}
return serializers._Notification[FundingOffer](serializers.FundingOffer).parse(*self._POST("auth/w/funding/offer/submit", body=body))
return _Notification[FundingOffer](serializers.FundingOffer) \
.parse(*self._post("auth/w/funding/offer/submit", body=body))
def cancel_funding_offer(self, id: int) -> Notification[FundingOffer]:
return serializers._Notification[FundingOffer](serializers.FundingOffer).parse(*self._POST("auth/w/funding/offer/cancel", body={ "id": id }))
return _Notification[FundingOffer](serializers.FundingOffer) \
.parse(*self._post("auth/w/funding/offer/cancel", body={ "id": id }))
def cancel_all_funding_offers(self, currency: str) -> Notification[Literal[None]]:
return serializers._Notification[Literal[None]](None).parse(
*self._POST("auth/w/funding/offer/cancel/all", body={ "currency": currency })
)
return _Notification[Literal[None]](None) \
.parse(*self._post("auth/w/funding/offer/cancel/all", body={ "currency": currency }))
def submit_funding_close(self, id: int) -> Notification[Literal[None]]:
return serializers._Notification[Literal[None]](None).parse(
*self._POST("auth/w/funding/close", body={ "id": id })
)
return _Notification[Literal[None]](None) \
.parse(*self._post("auth/w/funding/close", body={ "id": id }))
def toggle_auto_renew(self, status: bool, currency: str, amount: Optional[str] = None, rate: Optional[int] = None, period: Optional[int] = None) -> Notification[FundingAutoRenew]:
return serializers._Notification[FundingAutoRenew](serializers.FundingAutoRenew).parse(*self._POST("auth/w/funding/auto", body={
"status": int(status),
"currency": currency, "amount": amount,
def toggle_auto_renew(self,
status: bool,
currency: str,
*,
amount: Optional[str] = None,
rate: Optional[int] = None,
period: Optional[int] = None) -> Notification[FundingAutoRenew]:
body = {
"status": int(status), "currency": currency, "amount": amount,
"rate": rate, "period": period
}))
}
def toggle_keep_funding(self, type: Literal["credit", "loan"], ids: Optional[List[int]] = None, changes: Optional[Dict[int, Literal[1, 2]]] = None) -> Notification[Literal[None]]:
return serializers._Notification[Literal[None]](None).parse(*self._POST("auth/w/funding/keep", body={
"type": type,
"id": ids,
"changes": changes
}))
return _Notification[FundingAutoRenew](serializers.FundingAutoRenew) \
.parse(*self._post("auth/w/funding/auto", body=body))
def get_funding_offers_history(self, symbol: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[FundingOffer]:
if symbol == None:
def toggle_keep_funding(self,
type: Literal["credit", "loan"],
*,
ids: Optional[List[int]] = None,
changes: Optional[Dict[int, Literal[1, 2]]] = None) -> Notification[Literal[None]]:
return _Notification[Literal[None]](None) \
.parse(*self._post("auth/w/funding/keep", \
body={ "type": type, "id": ids, "changes": changes }))
def get_funding_offers_history(self,
*,
symbol: Optional[str] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[FundingOffer]:
if symbol is None:
endpoint = "auth/r/funding/offers/hist"
else: endpoint = f"auth/r/funding/offers/{symbol}/hist"
body = {
"start": start, "end": end,
"limit": limit
}
return [ serializers.FundingOffer.parse(*sub_data) \
for sub_data in self._post(endpoint, \
body={ "start": start, "end": end, "limit": limit }) ]
return [ serializers.FundingOffer.parse(*sub_data) for sub_data in self._POST(endpoint, body=body) ]
def get_funding_loans(self, symbol: Optional[str] = None) -> List[FundingLoan]:
if symbol == None:
def get_funding_loans(self, *, symbol: Optional[str] = None) -> List[FundingLoan]:
if symbol is None:
endpoint = "auth/r/funding/loans"
else: endpoint = f"auth/r/funding/loans/{symbol}"
return [ serializers.FundingLoan.parse(*sub_data) for sub_data in self._POST(endpoint) ]
return [ serializers.FundingLoan.parse(*sub_data) \
for sub_data in self._post(endpoint) ]
def get_funding_loans_history(self, symbol: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[FundingLoan]:
if symbol == None:
def get_funding_loans_history(self,
*,
symbol: Optional[str] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[FundingLoan]:
if symbol is None:
endpoint = "auth/r/funding/loans/hist"
else: endpoint = f"auth/r/funding/loans/{symbol}/hist"
body = {
"start": start, "end": end,
"limit": limit
}
return [ serializers.FundingLoan.parse(*sub_data) for sub_data in self._POST(endpoint, body=body) ]
return [ serializers.FundingLoan.parse(*sub_data) \
for sub_data in self._post(endpoint, \
body={ "start": start, "end": end, "limit": limit }) ]
def get_funding_credits(self, symbol: Optional[str] = None) -> List[FundingCredit]:
if symbol == None:
def get_funding_credits(self, *, symbol: Optional[str] = None) -> List[FundingCredit]:
if symbol is None:
endpoint = "auth/r/funding/credits"
else: endpoint = f"auth/r/funding/credits/{symbol}"
return [ serializers.FundingCredit.parse(*sub_data) for sub_data in self._POST(endpoint) ]
return [ serializers.FundingCredit.parse(*sub_data) \
for sub_data in self._post(endpoint) ]
def get_funding_credits_history(self, symbol: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[FundingCredit]:
if symbol == None:
def get_funding_credits_history(self,
*,
symbol: Optional[str] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[FundingCredit]:
if symbol is None:
endpoint = "auth/r/funding/credits/hist"
else: endpoint = f"auth/r/funding/credits/{symbol}/hist"
body = {
"start": start, "end": end,
"limit": limit
}
return [ serializers.FundingCredit.parse(*sub_data) for sub_data in self._POST(endpoint, body=body) ]
return [ serializers.FundingCredit.parse(*sub_data) \
for sub_data in self._post(endpoint, \
body={ "start": start, "end": end, "limit": limit }) ]
def get_funding_trades_history(self, symbol: Optional[str] = None, sort: Optional[Sort] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[FundingTrade]:
if symbol == None:
def get_funding_trades_history(self,
*,
symbol: Optional[str] = None,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[FundingTrade]:
if symbol is None:
endpoint = "auth/r/funding/trades/hist"
else: endpoint = f"auth/r/funding/trades/{symbol}/hist"
body = {
"sort": sort,
"start": start, "end": end,
"limit": limit
}
"sort": sort, "start": start, "end": end,
"limit": limit }
return [ serializers.FundingTrade.parse(*sub_data) for sub_data in self._POST(endpoint, body=body) ]
return [ serializers.FundingTrade.parse(*sub_data) \
for sub_data in self._post(endpoint, body=body) ]
def get_funding_info(self, key: str) -> FundingInfo:
response = self._POST(f"auth/r/info/funding/{key}")
data = [response[1]] + response[2]
return serializers.FundingInfo.parse(*data)
return serializers.FundingInfo \
.parse(*(self._post(f"auth/r/info/funding/{key}")[2]))
def transfer_between_wallets(self, from_wallet: str, to_wallet: str, currency: str, currency_to: str, amount: Union[Decimal, float, str]) -> Notification[Transfer]:
#pylint: disable-next=too-many-arguments
def transfer_between_wallets(self,
from_wallet: str,
to_wallet: str,
currency: str,
currency_to: str,
amount: Union[Decimal, float, str]) -> Notification[Transfer]:
body = {
"from": from_wallet, "to": to_wallet,
"currency": currency, "currency_to": currency_to,
"from": from_wallet, "to": to_wallet, "currency": currency,
"currency_to": currency_to, "amount": amount
}
return _Notification[Transfer](serializers.Transfer) \
.parse(*self._post("auth/w/transfer", body=body))
def submit_wallet_withdrawal(self,
wallet: str,
method: str,
address: str,
amount: Union[Decimal, float, str]) -> Notification[Withdrawal]:
body = {
"wallet": wallet, "method": method, "address": address,
"amount": amount
}
return serializers._Notification[Transfer](serializers.Transfer).parse(*self._POST("auth/w/transfer", body=body))
return _Notification[Withdrawal](serializers.Withdrawal) \
.parse(*self._post("auth/w/withdraw", body=body))
def submit_wallet_withdrawal(self, wallet: str, method: str, address: str, amount: Union[Decimal, float, str]) -> Notification[Withdrawal]:
return serializers._Notification[Withdrawal](serializers.Withdrawal).parse(*self._POST("auth/w/withdraw", body={
"wallet": wallet, "method": method,
"address": address, "amount": amount,
}))
def get_deposit_address(self,
wallet: str,
method: str,
renew: bool = False) -> Notification[DepositAddress]:
return _Notification[DepositAddress](serializers.DepositAddress) \
.parse(*self._post("auth/w/deposit/address", \
body={ "wallet": wallet, "method": method, "renew": int(renew) }))
def get_deposit_address(self, wallet: str, method: str, renew: bool = False) -> Notification[DepositAddress]:
body = {
"wallet": wallet,
"method": method,
"renew": int(renew)
}
def generate_deposit_invoice(self,
wallet: str,
currency: str,
amount: Union[Decimal, float, str]) -> LightningNetworkInvoice:
return serializers.LightningNetworkInvoice \
.parse(*self._post("auth/w/deposit/invoice", \
body={ "wallet": wallet, "currency": currency, "amount": amount }))
return serializers._Notification[DepositAddress](serializers.DepositAddress).parse(*self._POST("auth/w/deposit/address", body=body))
def generate_deposit_invoice(self, wallet: str, currency: str, amount: Union[Decimal, float, str]) -> LightningNetworkInvoice:
body = {
"wallet": wallet, "currency": currency,
"amount": amount
}
return serializers.LightningNetworkInvoice.parse(*self._POST("auth/w/deposit/invoice", body=body))
def get_movements(self, currency: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[Movement]:
if currency == None:
def get_movements(self,
*,
currency: Optional[str] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Movement]:
if currency is None:
endpoint = "auth/r/movements/hist"
else: endpoint = f"auth/r/movements/{currency}/hist"
body = {
"start": start, "end": end,
"limit": limit
}
return [ serializers.Movement.parse(*sub_data) for sub_data in self._POST(endpoint, body=body) ]
return [ serializers.Movement.parse(*sub_data) \
for sub_data in self._post(endpoint, \
body={ "start": start, "end": end, "limit": limit }) ]

View File

@@ -1,9 +1,13 @@
from typing import TypedDict, List, Union, Literal, Optional, Any
from typing import TypedDict, Dict, List, Union, Literal, Optional, Any
from decimal import Decimal
from .. types import *
from .. types import \
InvoiceSubmission, InvoicePage, InvoiceStats, \
CurrencyConversion, MerchantDeposit, MerchantUnlinkedDeposit
from .. enums import MerchantSettingsKey
from .. middleware import Middleware
from ...utils.camel_and_snake_case_helpers import to_snake_case_keys, to_camel_case_keys
@@ -15,89 +19,142 @@ _CustomerInfo = TypedDict("_CustomerInfo", {
})
class RestMerchantEndpoints(Middleware):
def submit_invoice(self, amount: Union[Decimal, float, str], currency: str, order_id: str,
customer_info: _CustomerInfo, pay_currencies: List[str], duration: Optional[int] = None,
webhook: Optional[str] = None, redirect_url: Optional[str] = None) -> InvoiceSubmission:
#pylint: disable-next=too-many-arguments
def submit_invoice(self,
amount: Union[Decimal, float, str],
currency: str,
order_id: str,
customer_info: _CustomerInfo,
pay_currencies: List[str],
*,
duration: Optional[int] = None,
webhook: Optional[str] = None,
redirect_url: Optional[str] = None) -> InvoiceSubmission:
body = to_camel_case_keys({
"amount": amount, "currency": currency, "order_id": order_id,
"customer_info": customer_info, "pay_currencies": pay_currencies, "duration": duration,
"webhook": webhook, "redirect_url": redirect_url
})
data = to_snake_case_keys(self._POST("auth/w/ext/pay/invoice/create", body=body))
data = to_snake_case_keys(self._post("auth/w/ext/pay/invoice/create", body=body))
return InvoiceSubmission.parse(data)
def get_invoices(self, id: Optional[str] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[InvoiceSubmission]:
return [ InvoiceSubmission.parse(sub_data) for sub_data in to_snake_case_keys(self._POST("auth/r/ext/pay/invoices", body={
def get_invoices(self,
*,
id: Optional[str] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[InvoiceSubmission]:
body = {
"id": id, "start": start, "end": end,
"limit": limit
})) ]
def get_invoices_paginated(self, page: int = 1, page_size: int = 10, sort: Literal["asc", "desc"] = "asc",
sort_field: Literal["t", "amount", "status"] = "t", status: Optional[List[Literal["CREATED", "PENDING", "COMPLETED", "EXPIRED"]]] = None, fiat: Optional[List[str]] = None,
crypto: Optional[List[str]] = None, id: Optional[str] = None, order_id: Optional[str] = None) -> InvoicePage:
}
response = self._post("auth/r/ext/pay/invoices", body=body)
return [ InvoiceSubmission.parse(sub_data) for sub_data in to_snake_case_keys(response) ]
def get_invoices_paginated(self,
page: int = 1,
page_size: int = 10,
sort: Literal["asc", "desc"] = "asc",
sort_field: Literal["t", "amount", "status"] = "t",
*,
status: Optional[List[Literal["CREATED", "PENDING", "COMPLETED", "EXPIRED"]]] = None,
fiat: Optional[List[str]] = None,
crypto: Optional[List[str]] = None,
id: Optional[str] = None,
order_id: Optional[str] = None) -> InvoicePage:
body = to_camel_case_keys({
"page": page, "page_size": page_size, "sort": sort,
"sort_field": sort_field, "status": status, "fiat": fiat,
"crypto": crypto, "id": id, "order_id": order_id
})
data = to_snake_case_keys(self._POST("auth/r/ext/pay/invoices/paginated", body=body))
data = to_snake_case_keys(self._post("auth/r/ext/pay/invoices/paginated", body=body))
return InvoicePage.parse(data)
def get_invoice_count_stats(self, status: Literal["CREATED", "PENDING", "COMPLETED", "EXPIRED"], format: str) -> List[InvoiceStats]:
return [ InvoiceStats(**sub_data) for sub_data in self._POST("auth/r/ext/pay/invoice/stats/count", body={ "status": status, "format": format }) ]
def get_invoice_count_stats(self,
status: Literal["CREATED", "PENDING", "COMPLETED", "EXPIRED"],
format: str) -> List[InvoiceStats]:
return [ InvoiceStats(**sub_data) for sub_data in \
self._post("auth/r/ext/pay/invoice/stats/count", body={ "status": status, "format": format }) ]
def get_invoice_earning_stats(self, currency: str, format: str) -> List[InvoiceStats]:
return [ InvoiceStats(**sub_data) for sub_data in self._POST("auth/r/ext/pay/invoice/stats/earning", body={ "currency": currency, "format": format }) ]
def get_invoice_earning_stats(self,
currency: str,
format: str) -> List[InvoiceStats]:
return [ InvoiceStats(**sub_data) for sub_data in \
self._post("auth/r/ext/pay/invoice/stats/earning", body={ "currency": currency, "format": format }) ]
def complete_invoice(self, id: str, pay_currency: str, deposit_id: Optional[int] = None, ledger_id: Optional[int] = None) -> InvoiceSubmission:
return InvoiceSubmission.parse(to_snake_case_keys(self._POST("auth/w/ext/pay/invoice/complete", body={
def complete_invoice(self,
id: str,
pay_currency: str,
*,
deposit_id: Optional[int] = None,
ledger_id: Optional[int] = None) -> InvoiceSubmission:
return InvoiceSubmission.parse(to_snake_case_keys(self._post("auth/w/ext/pay/invoice/complete", body={
"id": id, "payCcy": pay_currency, "depositId": deposit_id,
"ledgerId": ledger_id
})))
def expire_invoice(self, id: str) -> InvoiceSubmission:
return InvoiceSubmission.parse(to_snake_case_keys(self._POST("auth/w/ext/pay/invoice/expire", body={ "id": id })))
body = { "id": id }
response = self._post("auth/w/ext/pay/invoice/expire", body=body)
return InvoiceSubmission.parse(to_snake_case_keys(response))
def get_currency_conversion_list(self) -> List[CurrencyConversion]:
return [
CurrencyConversion(
base_currency=sub_data["baseCcy"],
convert_currency=sub_data["convertCcy"],
base_currency=sub_data["baseCcy"],
convert_currency=sub_data["convertCcy"],
created=sub_data["created"]
) for sub_data in self._POST("auth/r/ext/pay/settings/convert/list")
) for sub_data in self._post("auth/r/ext/pay/settings/convert/list")
]
def add_currency_conversion(self, base_currency: str, convert_currency: str) -> bool:
return bool(self._POST("auth/w/ext/pay/settings/convert/create", body={
def add_currency_conversion(self,
base_currency: str,
convert_currency: str) -> bool:
return bool(self._post("auth/w/ext/pay/settings/convert/create", body={
"baseCcy": base_currency,
"convertCcy": convert_currency
}))
def remove_currency_conversion(self, base_currency: str, convert_currency: str) -> bool:
return bool(self._POST("auth/w/ext/pay/settings/convert/remove", body={
def remove_currency_conversion(self,
base_currency: str,
convert_currency: str) -> bool:
return bool(self._post("auth/w/ext/pay/settings/convert/remove", body={
"baseCcy": base_currency,
"convertCcy": convert_currency
}))
def set_merchant_settings(self, key: MerchantSettingsKey, val: Any) -> bool:
return bool(self._POST("auth/w/ext/pay/settings/set", body={ "key": key, "val": val }))
def set_merchant_settings(self,
key: MerchantSettingsKey,
val: Any) -> bool:
return bool(self._post("auth/w/ext/pay/settings/set", body={ "key": key, "val": val }))
def get_merchant_settings(self, key: MerchantSettingsKey) -> Any:
return self._POST("auth/r/ext/pay/settings/get", body={ "key": key })
def list_merchant_settings(self, keys: List[MerchantSettingsKey] = list()) -> Dict[MerchantSettingsKey, Any]:
return self._POST("auth/r/ext/pay/settings/list", body={ "keys": keys })
def get_deposits(self, start: int, end: int, ccy: Optional[str] = None, unlinked: Optional[bool] = None) -> List[MerchantDeposit]:
return [ MerchantDeposit(**sub_data) for sub_data in to_snake_case_keys(self._POST("auth/r/ext/pay/deposits", body={
"from": start, "to": end, "ccy": ccy, "unlinked": unlinked
})) ]
def get_unlinked_deposits(self, ccy: str, start: Optional[int] = None, end: Optional[int] = None) -> List[MerchantUnlinkedDeposit]:
return [ MerchantUnlinkedDeposit(**sub_data) for sub_data in to_snake_case_keys(self._POST("/auth/r/ext/pay/deposits/unlinked", body={
"ccy": ccy, "start": start, "end": end
})) ]
return self._post("auth/r/ext/pay/settings/get", body={ "key": key })
def list_merchant_settings(self, keys: List[MerchantSettingsKey] = []) -> Dict[MerchantSettingsKey, Any]:
return self._post("auth/r/ext/pay/settings/list", body={ "keys": keys })
def get_deposits(self,
start: int,
end: int,
*,
ccy: Optional[str] = None,
unlinked: Optional[bool] = None) -> List[MerchantDeposit]:
body = { "from": start, "to": end, "ccy": ccy, "unlinked": unlinked }
response = self._post("auth/r/ext/pay/deposits", body=body)
return [ MerchantDeposit(**sub_data) for sub_data in to_snake_case_keys(response) ]
def get_unlinked_deposits(self,
ccy: str,
*,
start: Optional[int] = None,
end: Optional[int] = None) -> List[MerchantUnlinkedDeposit]:
body = { "ccy": ccy, "start": start, "end": end }
response = self._post("/auth/r/ext/pay/deposits/unlinked", body=body)
return [ MerchantUnlinkedDeposit(**sub_data) for sub_data in to_snake_case_keys(response) ]

View File

@@ -1,7 +1,15 @@
from typing import List, Union, Literal, Optional, Any, cast
from decimal import Decimal
from .. types import *
from .. types import \
PlatformStatus, TradingPairTicker, FundingCurrencyTicker, \
TickersHistory, TradingPairTrade, FundingCurrencyTrade, \
TradingPairBook, FundingCurrencyBook, TradingPairRawBook, \
FundingCurrencyRawBook, Statistic, Candle, \
DerivativesStatus, Liquidation, Leaderboard, \
FundingStatistic, PulseProfile, PulseMessage, \
TradingMarketAveragePrice, FundingMarketAveragePrice, FxRate
from .. import serializers
from .. enums import Config, Sort
@@ -9,103 +17,151 @@ from .. middleware import Middleware
class RestPublicEndpoints(Middleware):
def conf(self, config: Config) -> Any:
return self._GET(f"conf/{config}")[0]
return self._get(f"conf/{config}")[0]
def get_platform_status(self) -> PlatformStatus:
return serializers.PlatformStatus.parse(*self._GET("platform/status"))
return serializers.PlatformStatus.parse(*self._get("platform/status"))
def get_tickers(self, symbols: List[str]) -> List[Union[TradingPairTicker, FundingCurrencyTicker]]:
data = self._GET("tickers", params={ "symbols": ",".join(symbols) })
data = self._get("tickers", params={ "symbols": ",".join(symbols) })
parsers = { "t": serializers.TradingPairTicker.parse, "f": serializers.FundingCurrencyTicker.parse }
return [ cast(Union[TradingPairTicker, FundingCurrencyTicker], parsers[sub_data[0][0]](*sub_data)) for sub_data in data ]
return [ cast(Union[TradingPairTicker, FundingCurrencyTicker], \
parsers[sub_data[0][0]](*sub_data)) for sub_data in data ]
def get_t_tickers(self, pairs: Union[List[str], Literal["ALL"]]) -> List[TradingPairTicker]:
if isinstance(pairs, str) and pairs == "ALL":
return [ cast(TradingPairTicker, sub_data) for sub_data in self.get_tickers([ "ALL" ]) if cast(str, sub_data.symbol).startswith("t") ]
return [ cast(TradingPairTicker, sub_data) for sub_data in self.get_tickers([ "ALL" ]) \
if cast(str, sub_data.symbol).startswith("t") ]
data = self.get_tickers([ pair for pair in pairs ])
data = self.get_tickers(list(pairs))
return cast(List[TradingPairTicker], data)
def get_f_tickers(self, currencies: Union[List[str], Literal["ALL"]]) -> List[FundingCurrencyTicker]:
if isinstance(currencies, str) and currencies == "ALL":
return [ cast(FundingCurrencyTicker, sub_data) for sub_data in self.get_tickers([ "ALL" ]) if cast(str, sub_data.symbol).startswith("f") ]
return [ cast(FundingCurrencyTicker, sub_data) for sub_data in self.get_tickers([ "ALL" ]) \
if cast(str, sub_data.symbol).startswith("f") ]
data = self.get_tickers([ currency for currency in currencies ])
data = self.get_tickers(list(currencies))
return cast(List[FundingCurrencyTicker], data)
def get_t_ticker(self, pair: str) -> TradingPairTicker:
return serializers.TradingPairTicker.parse(*self._GET(f"ticker/t{pair}"), skip=["SYMBOL"])
return serializers.TradingPairTicker.parse(*([pair] + self._get(f"ticker/{pair}")))
def get_f_ticker(self, currency: str) -> FundingCurrencyTicker:
return serializers.FundingCurrencyTicker.parse(*self._GET(f"ticker/f{currency}"), skip=["SYMBOL"])
return serializers.FundingCurrencyTicker.parse(*([currency] + self._get(f"ticker/{currency}")))
def get_tickers_history(self, symbols: List[str], start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[TickersHistory]:
return [ serializers.TickersHistory.parse(*sub_data) for sub_data in self._GET("tickers/hist", params={
def get_tickers_history(self,
symbols: List[str],
*,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[TickersHistory]:
return [ serializers.TickersHistory.parse(*sub_data) for sub_data in self._get("tickers/hist", params={
"symbols": ",".join(symbols),
"start": start, "end": end,
"limit": limit
}) ]
def get_t_trades(self, pair: str, limit: Optional[int] = None, start: Optional[str] = None, end: Optional[str] = None, sort: Optional[Sort] = None) -> List[TradingPairTrade]:
def get_t_trades(self,
pair: str,
*,
limit: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
sort: Optional[Sort] = None) -> List[TradingPairTrade]:
params = { "limit": limit, "start": start, "end": end, "sort": sort }
data = self._GET(f"trades/{pair}/hist", params=params)
data = self._get(f"trades/{pair}/hist", params=params)
return [ serializers.TradingPairTrade.parse(*sub_data) for sub_data in data ]
def get_f_trades(self, currency: str, limit: Optional[int] = None, start: Optional[str] = None, end: Optional[str] = None, sort: Optional[Sort] = None) -> List[FundingCurrencyTrade]:
def get_f_trades(self,
currency: str,
*,
limit: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
sort: Optional[Sort] = None) -> List[FundingCurrencyTrade]:
params = { "limit": limit, "start": start, "end": end, "sort": sort }
data = self._GET(f"trades/{currency}/hist", params=params)
data = self._get(f"trades/{currency}/hist", params=params)
return [ serializers.FundingCurrencyTrade.parse(*sub_data) for sub_data in data ]
def get_t_book(self, pair: str, precision: Literal["P0", "P1", "P2", "P3", "P4"], len: Optional[Literal[1, 25, 100]] = None) -> List[TradingPairBook]:
return [ serializers.TradingPairBook.parse(*sub_data) for sub_data in self._GET(f"book/{pair}/{precision}", params={ "len": len }) ]
def get_t_book(self,
pair: str,
precision: Literal["P0", "P1", "P2", "P3", "P4"],
*,
len: Optional[Literal[1, 25, 100]] = None) -> List[TradingPairBook]:
return [ serializers.TradingPairBook.parse(*sub_data) \
for sub_data in self._get(f"book/{pair}/{precision}", params={ "len": len }) ]
def get_f_book(self, currency: str, precision: Literal["P0", "P1", "P2", "P3", "P4"], len: Optional[Literal[1, 25, 100]] = None) -> List[FundingCurrencyBook]:
return [ serializers.FundingCurrencyBook.parse(*sub_data) for sub_data in self._GET(f"book/{currency}/{precision}", params={ "len": len }) ]
def get_f_book(self,
currency: str,
precision: Literal["P0", "P1", "P2", "P3", "P4"],
*,
len: Optional[Literal[1, 25, 100]] = None) -> List[FundingCurrencyBook]:
return [ serializers.FundingCurrencyBook.parse(*sub_data) \
for sub_data in self._get(f"book/{currency}/{precision}", params={ "len": len }) ]
def get_t_raw_book(self, pair: str, len: Optional[Literal[1, 25, 100]] = None) -> List[TradingPairRawBook]:
return [ serializers.TradingPairRawBook.parse(*sub_data) for sub_data in self._GET(f"book/{pair}/R0", params={ "len": len }) ]
def get_t_raw_book(self,
pair: str,
*,
len: Optional[Literal[1, 25, 100]] = None) -> List[TradingPairRawBook]:
return [ serializers.TradingPairRawBook.parse(*sub_data) \
for sub_data in self._get(f"book/{pair}/R0", params={ "len": len }) ]
def get_f_raw_book(self, currency: str, len: Optional[Literal[1, 25, 100]] = None) -> List[FundingCurrencyRawBook]:
return [ serializers.FundingCurrencyRawBook.parse(*sub_data) for sub_data in self._GET(f"book/{currency}/R0", params={ "len": len }) ]
def get_f_raw_book(self,
currency: str,
*,
len: Optional[Literal[1, 25, 100]] = None) -> List[FundingCurrencyRawBook]:
return [ serializers.FundingCurrencyRawBook.parse(*sub_data) \
for sub_data in self._get(f"book/{currency}/R0", params={ "len": len }) ]
def get_stats_hist(
self,
resource: str,
sort: Optional[Sort] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None
) -> List[Statistic]:
def get_stats_hist(self,
resource: str,
*,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Statistic]:
params = { "sort": sort, "start": start, "end": end, "limit": limit }
data = self._GET(f"stats1/{resource}/hist", params=params)
data = self._get(f"stats1/{resource}/hist", params=params)
return [ serializers.Statistic.parse(*sub_data) for sub_data in data ]
def get_stats_last(
self,
resource: str,
sort: Optional[Sort] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None
) -> Statistic:
def get_stats_last(self,
resource: str,
*,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> Statistic:
params = { "sort": sort, "start": start, "end": end, "limit": limit }
data = self._GET(f"stats1/{resource}/last", params=params)
data = self._get(f"stats1/{resource}/last", params=params)
return serializers.Statistic.parse(*data)
def get_candles_hist(
self,
symbol: str, tf: str = "1m",
sort: Optional[Sort] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None
) -> List[Candle]:
def get_candles_hist(self,
symbol: str,
tf: str = "1m",
*,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Candle]:
params = { "sort": sort, "start": start, "end": end, "limit": limit }
data = self._GET(f"candles/trade:{tf}:{symbol}/hist", params=params)
data = self._get(f"candles/trade:{tf}:{symbol}/hist", params=params)
return [ serializers.Candle.parse(*sub_data) for sub_data in data ]
def get_candles_last(
self,
symbol: str, tf: str = "1m",
sort: Optional[Sort] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None
) -> Candle:
def get_candles_last(self,
symbol: str,
tf: str = "1m",
*,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> Candle:
params = { "sort": sort, "start": start, "end": end, "limit": limit }
data = self._GET(f"candles/trade:{tf}:{symbol}/last", params=params)
data = self._get(f"candles/trade:{tf}:{symbol}/last", params=params)
return serializers.Candle.parse(*data)
def get_derivatives_status(self, keys: Union[List[str], Literal["ALL"]]) -> List[DerivativesStatus]:
@@ -113,74 +169,109 @@ class RestPublicEndpoints(Middleware):
params = { "keys": "ALL" }
else: params = { "keys": ",".join(keys) }
data = self._GET(f"status/deriv", params=params)
data = self._get("status/deriv", params=params)
return [ serializers.DerivativesStatus.parse(*sub_data) for sub_data in data ]
def get_derivatives_status_history(
self,
type: str, symbol: str,
sort: Optional[Sort] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None
) -> List[DerivativesStatus]:
def get_derivatives_status_history(self,
type: str,
symbol: str,
*,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[DerivativesStatus]:
params = { "sort": sort, "start": start, "end": end, "limit": limit }
data = self._GET(f"status/{type}/{symbol}/hist", params=params)
return [ serializers.DerivativesStatus.parse(*sub_data, skip=[ "KEY" ]) for sub_data in data ]
data = self._get(f"status/{type}/{symbol}/hist", params=params)
return [ serializers.DerivativesStatus.parse(*([symbol] + sub_data)) for sub_data in data ]
def get_liquidations(self, sort: Optional[Sort] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[Liquidation]:
def get_liquidations(self,
*,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Liquidation]:
params = { "sort": sort, "start": start, "end": end, "limit": limit }
data = self._GET("liquidations/hist", params=params)
data = self._get("liquidations/hist", params=params)
return [ serializers.Liquidation.parse(*sub_data[0]) for sub_data in data ]
def get_seed_candles(self, symbol: str, tf: str = '1m', sort: Optional[Sort] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[Candle]:
def get_seed_candles(self,
symbol: str,
tf: str = "1m",
*,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Candle]:
params = {"sort": sort, "start": start, "end": end, "limit": limit}
data = self._GET(f"candles/trade:{tf}:{symbol}/hist?limit={limit}&start={start}&end={end}&sort={sort}", params=params)
data = self._get(f"candles/trade:{tf}:{symbol}/hist", params=params)
return [ serializers.Candle.parse(*sub_data) for sub_data in data ]
def get_leaderboards_hist(
self,
resource: str,
sort: Optional[Sort] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None
) -> List[Leaderboard]:
def get_leaderboards_hist(self,
resource: str,
*,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Leaderboard]:
params = { "sort": sort, "start": start, "end": end, "limit": limit }
data = self._GET(f"rankings/{resource}/hist", params=params)
data = self._get(f"rankings/{resource}/hist", params=params)
return [ serializers.Leaderboard.parse(*sub_data) for sub_data in data ]
def get_leaderboards_last(
self,
resource: str,
sort: Optional[Sort] = None, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None
) -> Leaderboard:
def get_leaderboards_last(self,
resource: str,
*,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> Leaderboard:
params = { "sort": sort, "start": start, "end": end, "limit": limit }
data = self._GET(f"rankings/{resource}/last", params=params)
data = self._get(f"rankings/{resource}/last", params=params)
return serializers.Leaderboard.parse(*data)
def get_funding_stats(self, symbol: str, start: Optional[str] = None, end: Optional[str] = None, limit: Optional[int] = None) -> List[FundingStatistic]:
def get_funding_stats(self,
symbol: str,
*,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[FundingStatistic]:
params = { "start": start, "end": end, "limit": limit }
data = self._GET(f"funding/stats/{symbol}/hist", params=params)
data = self._get(f"funding/stats/{symbol}/hist", params=params)
return [ serializers.FundingStatistic.parse(*sub_data) for sub_data in data ]
def get_pulse_profile(self, nickname: str) -> PulseProfile:
return serializers.PulseProfile.parse(*self._GET(f"pulse/profile/{nickname}"))
return serializers.PulseProfile.parse(*self._get(f"pulse/profile/{nickname}"))
def get_pulse_history(self, end: Optional[str] = None, limit: Optional[int] = None) -> List[PulseMessage]:
messages = list()
def get_pulse_history(self, *, end: Optional[str] = None, limit: Optional[int] = None) -> List[PulseMessage]:
messages = []
for subdata in self._GET("pulse/hist", params={ "end": end, "limit": limit }):
for subdata in self._get("pulse/hist", params={ "end": end, "limit": limit }):
subdata[18] = subdata[18][0]
message = serializers.PulseMessage.parse(*subdata)
messages.append(message)
return messages
def get_trading_market_average_price(self, symbol: str, amount: Union[Decimal, float, str], price_limit: Optional[Union[Decimal, float, str]] = None) -> TradingMarketAveragePrice:
return serializers.TradingMarketAveragePrice.parse(*self._POST("calc/trade/avg", body={
def get_trading_market_average_price(self,
symbol: str,
amount: Union[Decimal, float, str],
*,
price_limit: Optional[Union[Decimal, float, str]] = None
) -> TradingMarketAveragePrice:
return serializers.TradingMarketAveragePrice.parse(*self._post("calc/trade/avg", body={
"symbol": symbol, "amount": amount, "price_limit": price_limit
}))
def get_funding_market_average_price(self, symbol: str, amount: Union[Decimal, float, str], period: int, rate_limit: Optional[Union[Decimal, float, str]] = None) -> FundingMarketAveragePrice:
return serializers.FundingMarketAveragePrice.parse(*self._POST("calc/trade/avg", body={
def get_funding_market_average_price(self,
symbol: str,
amount: Union[Decimal, float, str],
period: int,
*,
rate_limit: Optional[Union[Decimal, float, str]] = None
) -> FundingMarketAveragePrice:
return serializers.FundingMarketAveragePrice.parse(*self._post("calc/trade/avg", body={
"symbol": symbol, "amount": amount, "period": period, "rate_limit": rate_limit
}))
def get_fx_rate(self, ccy1: str, ccy2: str) -> FxRate:
return serializers.FxRate.parse(*self._POST("calc/fx", body={ "ccy1": ccy1, "ccy2": ccy2 }))
return serializers.FxRate.parse(*self._post("calc/fx", body={ "ccy1": ccy1, "ccy2": ccy2 }))

View File

@@ -1,3 +1,4 @@
#pylint: disable-next=wildcard-import,unused-wildcard-import
from ..enums import *
class Config(str, Enum):
@@ -43,4 +44,4 @@ class MerchantSettingsKey(str, Enum):
NOTIFY_AUTOCONVERT_EXECUTED = "bfx_pay_notify_autoconvert_executed"
DUST_BALANCE_UI = "bfx_pay_dust_balance_ui"
MERCHANT_CUSTOMER_SUPPORT_URL = "bfx_pay_merchant_customer_support_url"
MERCHANT_UNDERPAID_THRESHOLD = "bfx_pay_merchant_underpaid_threshold"
MERCHANT_UNDERPAID_THRESHOLD = "bfx_pay_merchant_underpaid_threshold"

View File

@@ -14,32 +14,22 @@ class BfxRestException(BfxBaseException):
Base class for all custom exceptions in bfxapi/rest/exceptions.py.
"""
pass
class ResourceNotFound(BfxRestException):
"""
This error indicates a failed HTTP request to a non-existent resource.
"""
pass
class RequestParametersError(BfxRestException):
"""
This error indicates that there are some invalid parameters sent along with an HTTP request.
"""
pass
class InvalidAuthenticationCredentials(BfxRestException):
"""
This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication.
"""
pass
class UnknownGenericError(BfxRestException):
"""
This error indicates an undefined problem processing an HTTP request sent to the APIs.
"""
pass

View File

@@ -1,3 +1,3 @@
from .middleware import Middleware
NAME = "middleware"
NAME = "middleware"

View File

@@ -1,45 +1,51 @@
import time, hmac, hashlib, json, requests
from typing import TYPE_CHECKING, Optional, Any, cast
from typing import TYPE_CHECKING, Optional, Any
from http import HTTPStatus
import time, hmac, hashlib, json, requests
from ..enums import Error
from ..exceptions import ResourceNotFound, RequestParametersError, InvalidAuthenticationCredentials, UnknownGenericError
from ...utils.JSONEncoder import JSONEncoder
from ...utils.json_encoder import JSONEncoder
if TYPE_CHECKING:
from requests.sessions import _Params
class Middleware(object):
def __init__(self, host: str, API_KEY: Optional[str] = None, API_SECRET: Optional[str] = None):
self.host, self.API_KEY, self.API_SECRET = host, API_KEY, API_SECRET
class Middleware:
TIMEOUT = 30
def __init__(self, host: str, api_key: Optional[str] = None, api_secret: Optional[str] = None):
self.host, self.api_key, self.api_secret = host, api_key, api_secret
def __build_authentication_headers(self, endpoint: str, data: Optional[str] = None):
assert isinstance(self.API_KEY, str) and isinstance(self.API_SECRET, str), \
assert isinstance(self.api_key, str) and isinstance(self.api_secret, str), \
"API_KEY and API_SECRET must be both str to call __build_authentication_headers"
nonce = str(round(time.time() * 1_000_000))
if data == None:
if data is None:
path = f"/api/v2/{endpoint}{nonce}"
else: path = f"/api/v2/{endpoint}{nonce}{data}"
signature = hmac.new(
self.API_SECRET.encode("utf8"),
self.api_secret.encode("utf8"),
path.encode("utf8"),
hashlib.sha384
hashlib.sha384
).hexdigest()
return {
"bfx-nonce": nonce,
"bfx-signature": signature,
"bfx-apikey": self.API_KEY
"bfx-apikey": self.api_key
}
def _GET(self, endpoint: str, params: Optional["_Params"] = None) -> Any:
response = requests.get(f"{self.host}/{endpoint}", params=params)
def _get(self, endpoint: str, params: Optional["_Params"] = None) -> Any:
response = requests.get(
url=f"{self.host}/{endpoint}",
params=params,
timeout=Middleware.TIMEOUT
)
if response.status_code == HTTPStatus.NOT_FOUND:
raise ResourceNotFound(f"No resources found at endpoint <{endpoint}>.")
@@ -47,23 +53,32 @@ class Middleware(object):
if len(data) and data[0] == "error":
if data[1] == Error.ERR_PARAMS:
raise RequestParametersError(f"The request was rejected with the following parameter error: <{data[2]}>")
raise RequestParametersError("The request was rejected with the " \
f"following parameter error: <{data[2]}>")
if data[1] == None or data[1] == Error.ERR_UNK or data[1] == Error.ERR_GENERIC:
raise UnknownGenericError(f"The server replied to the request with a generic error with message: <{data[2]}>.")
if data[1] is None or data[1] == Error.ERR_UNK or data[1] == Error.ERR_GENERIC:
raise UnknownGenericError("The server replied to the request with " \
f"a generic error with message: <{data[2]}>.")
return data
def _POST(self, endpoint: str, params: Optional["_Params"] = None, body: Optional[Any] = None, _ignore_authentication_headers: bool = False) -> Any:
def _post(self, endpoint: str, params: Optional["_Params"] = None,
body: Optional[Any] = None, _ignore_authentication_headers: bool = False) -> Any:
data = body and json.dumps(body, cls=JSONEncoder) or None
headers = { "Content-Type": "application/json" }
if self.API_KEY and self.API_SECRET and _ignore_authentication_headers == False:
if self.api_key and self.api_secret and not _ignore_authentication_headers:
headers = { **headers, **self.__build_authentication_headers(endpoint, data) }
response = requests.post(f"{self.host}/{endpoint}", params=params, data=data, headers=headers)
response = requests.post(
url=f"{self.host}/{endpoint}",
params=params,
data=data,
headers=headers,
timeout=Middleware.TIMEOUT
)
if response.status_code == HTTPStatus.NOT_FOUND:
raise ResourceNotFound(f"No resources found at endpoint <{endpoint}>.")
@@ -71,12 +86,14 @@ class Middleware(object):
if isinstance(data, list) and len(data) and data[0] == "error":
if data[1] == Error.ERR_PARAMS:
raise RequestParametersError(f"The request was rejected with the following parameter error: <{data[2]}>")
raise RequestParametersError("The request was rejected with the " \
f"following parameter error: <{data[2]}>")
if data[1] == Error.ERR_AUTH_FAIL:
raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.")
if data[1] == None or data[1] == Error.ERR_UNK or data[1] == Error.ERR_GENERIC:
raise UnknownGenericError(f"The server replied to the request with a generic error with message: <{data[2]}>.")
if data[1] is None or data[1] == Error.ERR_UNK or data[1] == Error.ERR_GENERIC:
raise UnknownGenericError("The server replied to the request with " \
f"a generic error with message: <{data[2]}>.")
return data

File diff suppressed because it is too large Load Diff

View File

@@ -1,20 +1,26 @@
#pylint: disable=duplicate-code
#pylint: disable-next=wildcard-import,unused-wildcard-import
from typing import *
from dataclasses import dataclass
from .. labeler import _Type, partial, compose
#pylint: disable-next=unused-import
from .. notification import Notification
from .. utils.JSONEncoder import JSON
from ..utils.json_encoder import JSON
#region Type hinting for Rest Public Endpoints
@dataclass
class PlatformStatus(_Type):
status: int
@dataclass
class TradingPairTicker(_Type):
symbol: Optional[str]
symbol: str
bid: float
bid_size: float
ask: float
@@ -28,7 +34,7 @@ class TradingPairTicker(_Type):
@dataclass
class FundingCurrencyTicker(_Type):
symbol: Optional[str]
symbol: str
frr: float
bid: float
bid_period: int
@@ -53,43 +59,43 @@ class TickersHistory(_Type):
@dataclass
class TradingPairTrade(_Type):
id: int
mts: int
amount: float
id: int
mts: int
amount: float
price: float
@dataclass
class FundingCurrencyTrade(_Type):
id: int
mts: int
amount: float
rate: float
id: int
mts: int
amount: float
rate: float
period: int
@dataclass
class TradingPairBook(_Type):
price: float
count: int
price: float
count: int
amount: float
@dataclass
class FundingCurrencyBook(_Type):
rate: float
period: int
count: int
rate: float
period: int
count: int
amount: float
@dataclass
@dataclass
class TradingPairRawBook(_Type):
order_id: int
price: float
price: float
amount: float
@dataclass
@dataclass
class FundingCurrencyRawBook(_Type):
offer_id: int
period: int
rate: float
offer_id: int
period: int
rate: float
amount: float
@dataclass
@@ -108,7 +114,7 @@ class Candle(_Type):
@dataclass
class DerivativesStatus(_Type):
key: Optional[str]
key: str
mts: int
deriv_price: float
spot_price: float
@@ -142,7 +148,7 @@ class Leaderboard(_Type):
twitter_handle: Optional[str]
@dataclass
class FundingStatistic(_Type):
class FundingStatistic(_Type):
timestamp: int
frr: float
avg_period: float
@@ -286,14 +292,14 @@ class Position(_Type):
@dataclass
class Trade(_Type):
id: int
symbol: str
id: int
symbol: str
mts_create: int
order_id: int
exec_amount: float
exec_price: float
order_type: str
order_price: float
order_id: int
exec_amount: float
exec_price: float
order_type: str
order_price: float
maker:int
fee: float
fee_currency: str
@@ -311,12 +317,12 @@ class FundingTrade(_Type):
@dataclass
class OrderTrade(_Type):
id: int
symbol: str
id: int
symbol: str
mts_create: int
order_id: int
exec_amount: float
exec_price: float
order_id: int
exec_amount: float
exec_price: float
maker:int
fee: float
fee_currency: str
@@ -325,7 +331,7 @@ class OrderTrade(_Type):
@dataclass
class Ledger(_Type):
id: int
currency: str
currency: str
mts: int
amount: float
balance: float
@@ -398,7 +404,6 @@ class FundingAutoRenew(_Type):
@dataclass()
class FundingInfo(_Type):
symbol: str
yield_loan: float
yield_lend: float
duration_loan: float
@@ -413,7 +418,7 @@ class Wallet(_Type):
available_balance: float
last_change: str
trade_details: JSON
@dataclass
class Transfer(_Type):
mts: int
@@ -458,7 +463,7 @@ class Movement(_Type):
destination_address: str
transaction_id: str
withdraw_transaction_note: str
@dataclass
class SymbolMarginInfo(_Type):
symbol: str
@@ -586,16 +591,16 @@ class InvoiceSubmission(_Type):
@classmethod
def parse(cls, data: Dict[str, Any]) -> "InvoiceSubmission":
if "customer_info" in data and data["customer_info"] != None:
if "customer_info" in data and data["customer_info"] is not None:
data["customer_info"] = InvoiceSubmission.CustomerInfo(**data["customer_info"])
for index, invoice in enumerate(data["invoices"]):
data["invoices"][index] = InvoiceSubmission.Invoice(**invoice)
if "payment" in data and data["payment"] != None:
if "payment" in data and data["payment"] is not None:
data["payment"] = InvoiceSubmission.Payment(**data["payment"])
if "additional_payments" in data and data["additional_payments"] != None:
if "additional_payments" in data and data["additional_payments"] is not None:
for index, additional_payment in enumerate(data["additional_payments"]):
data["additional_payments"][index] = InvoiceSubmission.Payment(**additional_payment)
@@ -694,4 +699,4 @@ class MerchantUnlinkedDeposit(_Type):
status: str
note: Optional[str]
#endregion
#endregion

View File

@@ -13,6 +13,6 @@ def suite():
unittest.makeSuite(TestLabeler),
unittest.makeSuite(TestNotification),
])
if __name__ == "__main__":
unittest.TextTestRunner().run(suite())
unittest.TextTestRunner().run(suite())

View File

@@ -1,5 +1,7 @@
import unittest
from typing import Optional
from dataclasses import dataclass
from ..exceptions import LabelerSerializerException
from ..labeler import _Type, generate_labeler_serializer, generate_recursive_serializer
@@ -8,7 +10,7 @@ class TestLabeler(unittest.TestCase):
def test_generate_labeler_serializer(self):
@dataclass
class Test(_Type):
A: int
A: Optional[int]
B: float
C: str
@@ -19,38 +21,36 @@ class TestLabeler(unittest.TestCase):
self.assertEqual(serializer.parse(5, None, 65.0, None, "X"), Test(5, 65.0, "X"),
msg="_Serializer should produce the right result.")
self.assertEqual(serializer.parse(5, 65.0, "X", skip=[ "_PLACEHOLDER" ]), Test(5, 65.0, "X"),
msg="_Serializer should produce the right result when skip parameter is given.")
self.assertListEqual(serializer.get_labels(), [ "A", "B", "C" ],
msg="_Serializer::get_labels() should return the right list of labels.")
with self.assertRaises(LabelerSerializerException,
msg="_Serializer should raise LabelerSerializerException if given fewer arguments than the serializer labels."):
serializer.parse(5, 65.0, "X")
with self.assertRaises(LabelerSerializerException,
msg="_Serializer should raise LabelerSerializerException if given " \
"fewer arguments than the serializer labels."):
serializer.parse(5, 65.0, "X")
def test_generate_recursive_serializer(self):
@dataclass
class Outer(_Type):
class Outer(_Type):
A: int
B: float
C: "Middle"
@dataclass
class Middle(_Type):
class Middle(_Type):
D: str
E: "Inner"
@dataclass
class Inner(_Type):
class Inner(_Type):
F: bool
inner = generate_labeler_serializer("Inner", Inner, ["F"])
middle = generate_recursive_serializer("Middle", Middle, ["D", "E"], { "E": inner })
outer = generate_recursive_serializer("Outer", Outer, ["A", "B", "C"], { "C": middle })
middle = generate_recursive_serializer("Middle", Middle, ["D", "E"], serializers={ "E": inner })
outer = generate_recursive_serializer("Outer", Outer, ["A", "B", "C"], serializers={ "C": middle })
self.assertEqual(outer.parse(10, 45.5, [ "Y", [ True ] ]), Outer(10, 45.5, Middle("Y", Inner(True))),
msg="_RecursiveSerializer should produce the right result.")
if __name__ == "__main__":
unittest.main()
unittest.main()

View File

@@ -11,15 +11,19 @@ class TestNotification(unittest.TestCase):
A: int
B: float
C: str
test = generate_labeler_serializer("Test", Test,
test = generate_labeler_serializer("Test", Test,
[ "A", "_PLACEHOLDER", "B", "_PLACEHOLDER", "C" ])
notification = _Notification[Test](test)
self.assertEqual(notification.parse(*[1675787861506, "test", None, None, [ 5, None, 65.0, None, "X" ], 0, "SUCCESS", "This is just a test notification."]),
Notification[Test](1675787861506, "test", None, Test(5, 65.0, "X"), 0, "SUCCESS", "This is just a test notification."),
msg="_Notification should produce the right notification.")
actual = notification.parse(*[ 1675787861506, "test", None, None, [ 5, None, 65.0, None, "X" ], \
0, "SUCCESS", "This is just a test notification." ])
expected = Notification[Test](1675787861506, "test", None, Test(5, 65.0, "X"),
0, "SUCCESS", "This is just a test notification.")
self.assertEqual(actual, expected, msg="_Notification should produce the right notification.")
if __name__ == "__main__":
unittest.main()
unittest.main()

View File

@@ -1,3 +1,5 @@
#pylint: disable=duplicate-code
import unittest
from ..labeler import _Type
@@ -7,11 +9,13 @@ from ..rest import serializers
class TestRestSerializers(unittest.TestCase):
def test_rest_serializers(self):
for serializer in map(serializers.__dict__.get, serializers.__serializers__):
self.assertTrue(issubclass(serializer.klass, _Type),
f"_Serializer <{serializer.name}>: .klass field must be a subclass of _Type (got {serializer.klass}).")
self.assertListEqual(serializer.get_labels(), list(serializer.klass.__annotations__),
f"_Serializer <{serializer.name}> and _Type <{serializer.klass.__name__}> must have matching labels and fields.")
self.assertTrue(issubclass(serializer.klass, _Type),
f"_Serializer <{serializer.name}>: .klass field must be a subclass " \
f"of _Type (got {serializer.klass}).")
self.assertListEqual(serializer.get_labels(), list(serializer.klass.__annotations__),
f"_Serializer <{serializer.name}> and _Type <{serializer.klass.__name__}> " \
"must have matching labels and fields.")
if __name__ == "__main__":
unittest.main()
unittest.main()

View File

@@ -1,3 +1,5 @@
#pylint: disable=duplicate-code
import unittest
from ..labeler import _Type
@@ -7,11 +9,13 @@ from ..websocket import serializers
class TestWebsocketSerializers(unittest.TestCase):
def test_websocket_serializers(self):
for serializer in map(serializers.__dict__.get, serializers.__serializers__):
self.assertTrue(issubclass(serializer.klass, _Type),
f"_Serializer <{serializer.name}>: .klass field must be a subclass of _Type (got {serializer.klass}).")
self.assertListEqual(serializer.get_labels(), list(serializer.klass.__annotations__),
f"_Serializer <{serializer.name}> and _Type <{serializer.klass.__name__}> must have matching labels and fields.")
self.assertTrue(issubclass(serializer.klass, _Type),
f"_Serializer <{serializer.name}>: .klass field must be a subclass " \
f"of _Type (got {serializer.klass}).")
self.assertListEqual(serializer.get_labels(), list(serializer.klass.__annotations__),
f"_Serializer <{serializer.name}> and _Type <{serializer.klass.__name__}> " \
"must have matching labels and fields.")
if __name__ == "__main__":
unittest.main()
unittest.main()

View File

@@ -2,4 +2,4 @@ REST_HOST = "https://api.bitfinex.com/v2"
PUB_REST_HOST = "https://api-pub.bitfinex.com/v2"
WSS_HOST = "wss://api.bitfinex.com/ws/2"
PUB_WSS_HOST = "wss://api-pub.bitfinex.com/ws/2"
PUB_WSS_HOST = "wss://api-pub.bitfinex.com/ws/2"

View File

@@ -1 +1 @@
NAME = "utils"
NAME = "utils"

View File

@@ -1,22 +1,23 @@
import re
from typing import TypeVar, Callable, Dict, Any, cast
from typing import TypeVar, Callable, cast
T = TypeVar("T")
_to_snake_case: Callable[[str], str] = lambda string: re.sub(r"(?<!^)(?=[A-Z])", "_", string).lower()
_to_camel_case: Callable[[str], str] = lambda string: (components := string.split("_"))[0] + str().join(c.title() for c in components[1:])
_to_camel_case: Callable[[str], str] = lambda string: \
(components := string.split("_"))[0] + str().join(c.title() for c in components[1:])
def _scheme(data: T, adapter: Callable[[str], str]) -> T:
if isinstance(data, list):
return cast(T, [ _scheme(sub_data, adapter) for sub_data in data ])
elif isinstance(data, dict):
if isinstance(data, dict):
return cast(T, { adapter(key): _scheme(value, adapter) for key, value in data.items() })
else: return data
return data
def to_snake_case_keys(dictionary: T) -> T:
return _scheme(dictionary, _to_snake_case)
def to_camel_case_keys(dictionary: T) -> T:
return _scheme(dictionary, _to_camel_case)
return _scheme(dictionary, _to_camel_case)

View File

@@ -7,23 +7,25 @@ from typing import Type, List, Dict, Union, Any
JSON = Union[Dict[str, "JSON"], List["JSON"], bool, int, float, str, Type[None]]
def _strip(dictionary: Dict) -> Dict:
return { key: value for key, value in dictionary.items() if value != None}
return { key: value for key, value in dictionary.items() if value is not None }
def _convert_float_to_str(data: JSON) -> JSON:
if isinstance(data, float):
return format(Decimal(repr(data)), "f")
elif isinstance(data, list):
if isinstance(data, list):
return [ _convert_float_to_str(sub_data) for sub_data in data ]
elif isinstance(data, dict):
if isinstance(data, dict):
return _strip({ key: _convert_float_to_str(value) for key, value in data.items() })
else: return data
return data
class JSONEncoder(json.JSONEncoder):
def encode(self, obj: JSON) -> str:
return json.JSONEncoder.encode(self, _convert_float_to_str(obj))
def encode(self, o: JSON) -> str:
return json.JSONEncoder.encode(self, _convert_float_to_str(o))
def default(self, obj: Any) -> Any:
if isinstance(obj, Decimal): return format(obj, "f")
elif isinstance(obj, datetime): return str(obj)
def default(self, o: Any) -> Any:
if isinstance(o, Decimal):
return format(o, "f")
if isinstance(o, datetime):
return str(o)
return json.JSONEncoder.default(self, obj)
return json.JSONEncoder.default(self, o)

View File

@@ -28,24 +28,24 @@ class _ColorFormatter(logging.Formatter):
class ColorLogger(logging.Logger):
FORMAT = "[%(name)s] [%(levelname)s] [%(asctime)s] %(message)s"
def __init__(self, name, level):
logging.Logger.__init__(self, name, level)
logging.Logger.__init__(self, name, level)
colored_formatter = _ColorFormatter(self.FORMAT, use_color=True)
console = logging.StreamHandler(stream=sys.stderr)
console.setFormatter(fmt=colored_formatter)
handler = logging.StreamHandler(stream=sys.stderr)
handler.setFormatter(fmt=colored_formatter)
self.addHandler(hdlr=console)
self.addHandler(hdlr=handler)
class FileLogger(logging.Logger):
FORMAT = "[%(name)s] [%(levelname)s] [%(asctime)s] %(message)s"
def __init__(self, name, level, filename):
logging.Logger.__init__(self, name, level)
logging.Logger.__init__(self, name, level)
formatter = logging.Formatter(self.FORMAT)
fh = logging.FileHandler(filename=filename)
fh.setFormatter(fmt=formatter)
handler = logging.FileHandler(filename=filename)
handler.setFormatter(fmt=formatter)
self.addHandler(hdlr=fh)
self.addHandler(hdlr=handler)

View File

@@ -1,3 +1,3 @@
from .client import BfxWebsocketClient, BfxWebsocketBucket, BfxWebsocketInputs
NAME = "websocket"
NAME = "websocket"

View File

@@ -2,4 +2,4 @@ from .bfx_websocket_client import BfxWebsocketClient
from .bfx_websocket_bucket import BfxWebsocketBucket
from .bfx_websocket_inputs import BfxWebsocketInputs
NAME = "client"
NAME = "client"

View File

@@ -1,10 +1,10 @@
import json, uuid, websockets
from typing import Literal, TypeVar, Callable, cast
import json, uuid, websockets
from ..handlers import PublicChannelsHandler
from ..exceptions import ConnectionNotOpen, TooManySubscriptions, OutdatedClientVersion
from ..exceptions import ConnectionNotOpen, TooManySubscriptions
_HEARTBEAT = "hb"
@@ -12,14 +12,14 @@ F = TypeVar("F", bound=Callable[..., Literal[None]])
def _require_websocket_connection(function: F) -> F:
async def wrapper(self, *args, **kwargs):
if self.websocket == None or self.websocket.open == False:
if self.websocket is None or not self.websocket.open:
raise ConnectionNotOpen("No open connection with the server.")
await function(self, *args, **kwargs)
return cast(F, wrapper)
class BfxWebsocketBucket(object):
class BfxWebsocketBucket:
VERSION = 2
MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25
@@ -27,11 +27,11 @@ class BfxWebsocketBucket(object):
def __init__(self, host, event_emitter, on_open_event):
self.host, self.event_emitter, self.on_open_event = host, event_emitter, on_open_event
self.websocket, self.subscriptions, self.pendings = None, dict(), list()
self.websocket, self.subscriptions, self.pendings = None, {}, []
self.handler = PublicChannelsHandler(event_emitter=self.event_emitter)
async def _connect(self, index):
async def connect(self):
reconnection = False
async for websocket in websockets.connect(self.host):
@@ -39,12 +39,12 @@ class BfxWebsocketBucket(object):
self.on_open_event.set()
if reconnection == True or (reconnection := False):
if reconnection or (reconnection := False):
for pending in self.pendings:
await self.websocket.send(json.dumps(pending))
for _, subscription in self.subscriptions.items():
await self._subscribe(**subscription)
await self.subscribe(**subscription)
self.subscriptions.clear()
@@ -52,21 +52,27 @@ class BfxWebsocketBucket(object):
async for message in websocket:
message = json.loads(message)
if isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]):
self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ]
self.subscriptions[chanId] = message
self.event_emitter.emit("subscribed", message)
elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]):
if message["status"] == "OK":
del self.subscriptions[chanId]
elif isinstance(message, dict) and message["event"] == "error":
self.event_emitter.emit("wss-error", message["code"], message["msg"])
elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT:
self.handler.handle(self.subscriptions[chanId], *message[1:])
except websockets.ConnectionClosedError as error:
if isinstance(message, dict):
if message["event"] == "subscribed" and (chan_id := message["chanId"]):
self.pendings = \
[ pending for pending in self.pendings if pending["subId"] != message["subId"] ]
self.subscriptions[chan_id] = message
self.event_emitter.emit("subscribed", message)
elif message["event"] == "unsubscribed" and (chan_id := message["chanId"]):
if message["status"] == "OK":
del self.subscriptions[chan_id]
elif message["event"] == "error":
self.event_emitter.emit("wss-error", message["code"], message["msg"])
if isinstance(message, list):
if (chan_id := message[0]) and message[1] != _HEARTBEAT:
self.handler.handle(self.subscriptions[chan_id], *message[1:])
except websockets.ConnectionClosedError as error:
if error.code == 1006:
self.on_open_event.clear()
reconnection = True
reconnection = True
continue
raise error
@@ -74,7 +80,7 @@ class BfxWebsocketBucket(object):
break
@_require_websocket_connection
async def _subscribe(self, channel, subId=None, **kwargs):
async def subscribe(self, channel, sub_id=None, **kwargs):
if len(self.subscriptions) + len(self.pendings) == BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT:
raise TooManySubscriptions("The client has reached the maximum number of subscriptions.")
@@ -83,7 +89,7 @@ class BfxWebsocketBucket(object):
"event": "subscribe",
"channel": channel,
"subId": subId or str(uuid.uuid4()),
"subId": sub_id or str(uuid.uuid4()),
}
self.pendings.append(subscription)
@@ -91,17 +97,17 @@ class BfxWebsocketBucket(object):
await self.websocket.send(json.dumps(subscription))
@_require_websocket_connection
async def _unsubscribe(self, chanId):
async def unsubscribe(self, chan_id):
await self.websocket.send(json.dumps({
"event": "unsubscribe",
"chanId": chanId
"chanId": chan_id
}))
@_require_websocket_connection
async def _close(self, code=1000, reason=str()):
async def close(self, code=1000, reason=str()):
await self.websocket.close(code=code, reason=reason)
def _get_chan_id(self, subId):
def get_chan_id(self, sub_id):
for subscription in self.subscriptions.values():
if subscription["subId"] == subId:
return subscription["chanId"]
if subscription["subId"] == sub_id:
return subscription["chanId"]

View File

@@ -1,33 +1,56 @@
import traceback, json, asyncio, hmac, hashlib, time, websockets, socket, random
from typing import cast
from collections import namedtuple
from datetime import datetime
import traceback, json, asyncio, hmac, hashlib, time, socket, random, websockets
from pyee.asyncio import AsyncIOEventEmitter
from .bfx_websocket_bucket import _HEARTBEAT, F, _require_websocket_connection, BfxWebsocketBucket
from .bfx_websocket_inputs import BfxWebsocketInputs
from ..handlers import PublicChannelsHandler, AuthenticatedChannelsHandler
from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion
from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, \
OutdatedClientVersion
from ...utils.JSONEncoder import JSONEncoder
from ...utils.json_encoder import JSONEncoder
from ...utils.logger import ColorLogger, FileLogger
def _require_websocket_authentication(function: F) -> F:
async def wrapper(self, *args, **kwargs):
if hasattr(self, "authentication") and self.authentication == False:
raise WebsocketAuthenticationRequired("To perform this action you need to authenticate using your API_KEY and API_SECRET.")
if hasattr(self, "authentication") and not self.authentication:
raise WebsocketAuthenticationRequired("To perform this action you need to " \
"authenticate using your API_KEY and API_SECRET.")
await _require_websocket_connection(function)(self, *args, **kwargs)
return cast(F, wrapper)
class BfxWebsocketClient(object):
class _Delay:
BACKOFF_MIN, BACKOFF_MAX = 1.92, 60.0
BACKOFF_INITIAL = 5.0
def __init__(self, backoff_factor):
self.__backoff_factor = backoff_factor
self.__backoff_delay = _Delay.BACKOFF_MIN
self.__initial_delay = random.random() * _Delay.BACKOFF_INITIAL
def next(self):
backoff_delay = self.peek()
__backoff_delay = self.__backoff_delay * self.__backoff_factor
self.__backoff_delay = min(__backoff_delay, _Delay.BACKOFF_MAX)
return backoff_delay
def peek(self):
return (self.__backoff_delay == _Delay.BACKOFF_MIN) \
and self.__initial_delay or self.__backoff_delay
class BfxWebsocketClient:
VERSION = BfxWebsocketBucket.VERSION
MAXIMUM_CONNECTIONS_AMOUNT = 20
@@ -43,16 +66,18 @@ class BfxWebsocketClient(object):
self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter()
self.on_open_events, self.buckets, self.authentication = [], [], False
self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input)
self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter)
if log_filename == None:
if log_filename is None:
self.logger = ColorLogger("BfxWebsocketClient", level=log_level)
else: self.logger = FileLogger("BfxWebsocketClient", level=log_level, filename=log_filename)
self.event_emitter.add_listener("error",
lambda exception: self.logger.error(f"{type(exception).__name__}: {str(exception)}" + "\n" +
self.event_emitter.add_listener("error",
lambda exception: self.logger.error(f"{type(exception).__name__}: {str(exception)}" + "\n" +
str().join(traceback.format_exception(type(exception), exception, exception.__traceback__))[:-1])
)
@@ -61,23 +86,24 @@ class BfxWebsocketClient(object):
async def start(self, connections = 5):
if connections > BfxWebsocketClient.MAXIMUM_CONNECTIONS_AMOUNT:
self.logger.warning(f"It is not safe to use more than {BfxWebsocketClient.MAXIMUM_CONNECTIONS_AMOUNT} buckets from the same " +
f"connection ({connections} in use), the server could momentarily block the client with <429 Too Many Requests>.")
self.logger.warning(f"It is not safe to use more than {BfxWebsocketClient.MAXIMUM_CONNECTIONS_AMOUNT} " \
f"buckets from the same connection ({connections} in use), the server could momentarily " \
"block the client with <429 Too Many Requests>.")
self.on_open_events = [ asyncio.Event() for _ in range(connections) ]
for _ in range(connections):
self.on_open_events.append(asyncio.Event())
self.buckets = [
BfxWebsocketBucket(self.host, self.event_emitter, self.on_open_events[index])
for index in range(connections)
]
for index in range(connections):
self.buckets += [BfxWebsocketBucket(self.host, self.event_emitter, self.on_open_events[index])]
tasks = [ bucket._connect(index) for index, bucket in enumerate(self.buckets) ]
tasks.append(self.__connect(self.credentials))
tasks = [ bucket.connect() for bucket in self.buckets ]
tasks.append(self.__connect())
await asyncio.gather(*tasks)
async def __connect(self, credentials = None):
#pylint: disable-next=too-many-statements
async def __connect(self):
Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"])
reconnection, delay = Reconnection(status=False, attempts=0, timestamp=None), None
@@ -86,9 +112,9 @@ class BfxWebsocketClient(object):
nonlocal reconnection
async with websockets.connect(self.host) as websocket:
if reconnection.status == True:
self.logger.info(f"Reconnect attempt successful (attempt no.{reconnection.attempts}): The " +
f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " +
if reconnection.status:
self.logger.info(f"Reconnect attempt successful (attempt no.{reconnection.attempts}): The " \
f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " \
f"(connection lost at: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).")
reconnection = Reconnection(status=False, attempts=0, timestamp=None)
@@ -104,86 +130,73 @@ class BfxWebsocketClient(object):
async for message in websocket:
message = json.loads(message)
if isinstance(message, dict) and message["event"] == "info" and "version" in message:
if BfxWebsocketClient.VERSION != message["version"]:
raise OutdatedClientVersion(f"Mismatch between the client version and the server version. " +
f"Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, " +
f"server version: {message['version']}).")
elif isinstance(message, dict) and message["event"] == "info" and message["code"] == 20051:
rcvd = websockets.frames.Close(code=1012, reason="Stop/Restart Websocket Server (please reconnect).")
if isinstance(message, dict):
if message["event"] == "info" and "version" in message:
if BfxWebsocketClient.VERSION != message["version"]:
raise OutdatedClientVersion("Mismatch between the client version and the server " \
"version. Update the library to the latest version to continue (client version: " \
f"{BfxWebsocketClient.VERSION}, server version: {message['version']}).")
elif message["event"] == "info" and message["code"] == 20051:
rcvd = websockets.frames.Close(code=1012,
reason="Stop/Restart Websocket Server (please reconnect).")
raise websockets.ConnectionClosedError(rcvd=rcvd, sent=None)
elif isinstance(message, dict) and message["event"] == "auth":
if message["status"] == "OK":
self.event_emitter.emit("authenticated", message); self.authentication = True
else: raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.")
elif isinstance(message, dict) and message["event"] == "error":
self.event_emitter.emit("wss-error", message["code"], message["msg"])
elif isinstance(message, list) and (chanId := message[0]) == 0 and message[1] != _HEARTBEAT:
self.handler.handle(message[1], message[2])
raise websockets.ConnectionClosedError(rcvd=rcvd, sent=None)
elif message["event"] == "auth":
if message["status"] != "OK":
raise InvalidAuthenticationCredentials(
"Cannot authenticate with given API-KEY and API-SECRET.")
class _Delay:
BACKOFF_MIN, BACKOFF_MAX = 1.92, 60.0
self.event_emitter.emit("authenticated", message)
BACKOFF_INITIAL = 5.0
self.authentication = True
elif message["event"] == "error":
self.event_emitter.emit("wss-error", message["code"], message["msg"])
def __init__(self, backoff_factor):
self.__backoff_factor = backoff_factor
self.__backoff_delay = _Delay.BACKOFF_MIN
self.__initial_delay = random.random() * _Delay.BACKOFF_INITIAL
def next(self):
backoff_delay = self.peek()
__backoff_delay = self.__backoff_delay * self.__backoff_factor
self.__backoff_delay = min(__backoff_delay, _Delay.BACKOFF_MAX)
return backoff_delay
def peek(self):
return (self.__backoff_delay == _Delay.BACKOFF_MIN) \
and self.__initial_delay or self.__backoff_delay
if isinstance(message, list):
if message[0] == 0 and message[1] != _HEARTBEAT:
self.handler.handle(message[1], message[2])
while True:
if reconnection.status == True:
if reconnection.status:
await asyncio.sleep(delay.next())
try:
await _connection()
except (websockets.ConnectionClosedError, socket.gaierror) as error:
if isinstance(error, websockets.ConnectionClosedError) and (error.code == 1006 or error.code == 1012):
if isinstance(error, websockets.ConnectionClosedError) and error.code in (1006, 1012):
if error.code == 1006:
self.logger.error("Connection lost: no close frame received "
+ "or sent (1006). Attempting to reconnect...")
self.logger.error("Connection lost: no close frame received " \
"or sent (1006). Attempting to reconnect...")
if error.code == 1012:
self.logger.info("WSS server is about to restart, reconnection "
+ "required (client received 20051). Attempt in progress...")
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now());
self.logger.info("WSS server is about to restart, reconnection " \
"required (client received 20051). Attempt in progress...")
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now())
delay = _Delay(backoff_factor=1.618)
elif isinstance(error, socket.gaierror) and reconnection.status == True:
self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. "
+ f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds."
+ f"(at the moment the client has been offline for {datetime.now() - reconnection.timestamp})")
elif isinstance(error, socket.gaierror) and reconnection.status:
self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " \
f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds. (at the moment " \
f"the client has been offline for {datetime.now() - reconnection.timestamp})")
reconnection = reconnection._replace(attempts=reconnection.attempts + 1)
else: raise error
if reconnection.status == False:
if not reconnection.status:
break
async def __authenticate(self, API_KEY, API_SECRET, filters=None):
data = { "event": "auth", "filter": filters, "apiKey": API_KEY }
async def __authenticate(self, api_key, api_secret, filters=None):
data = { "event": "auth", "filter": filters, "apiKey": api_key }
data["authNonce"] = str(round(time.time() * 1_000_000))
data["authPayload"] = "AUTH" + data["authNonce"]
data["authSig"] = hmac.new(
API_SECRET.encode("utf8"),
api_secret.encode("utf8"),
data["authPayload"].encode("utf8"),
hashlib.sha384
hashlib.sha384
).hexdigest()
await self.websocket.send(json.dumps(data))
@@ -193,56 +206,58 @@ class BfxWebsocketClient(object):
index = counters.index(min(counters))
await self.buckets[index]._subscribe(channel, **kwargs)
await self.buckets[index].subscribe(channel, **kwargs)
async def unsubscribe(self, subId):
async def unsubscribe(self, sub_id):
for bucket in self.buckets:
if (chanId := bucket._get_chan_id(subId)):
await bucket._unsubscribe(chanId=chanId)
if (chan_id := bucket.get_chan_id(sub_id)):
await bucket.unsubscribe(chan_id=chan_id)
async def close(self, code=1000, reason=str()):
if self.websocket != None and self.websocket.open == True:
if self.websocket is not None and self.websocket.open:
await self.websocket.close(code=code, reason=reason)
for bucket in self.buckets:
await bucket._close(code=code, reason=reason)
await bucket.close(code=code, reason=reason)
@_require_websocket_authentication
async def notify(self, info, MESSAGE_ID=None, **kwargs):
await self.websocket.send(json.dumps([ 0, "n", MESSAGE_ID, { "type": "ucm-test", "info": info, **kwargs } ]))
async def notify(self, info, message_id=None, **kwargs):
await self.websocket.send(json.dumps([ 0, "n", message_id, { "type": "ucm-test", "info": info, **kwargs } ]))
@_require_websocket_authentication
async def __handle_websocket_input(self, input, data):
await self.websocket.send(json.dumps([ 0, input, None, data], cls=JSONEncoder))
async def __handle_websocket_input(self, event, data):
await self.websocket.send(json.dumps([ 0, event, None, data], cls=JSONEncoder))
def on(self, *events, callback = None):
for event in events:
if event not in BfxWebsocketClient.EVENTS:
raise EventNotSupported(f"Event <{event}> is not supported. To get a list of available events print BfxWebsocketClient.EVENTS")
raise EventNotSupported(f"Event <{event}> is not supported. To get a list " \
"of available events print BfxWebsocketClient.EVENTS")
if callback != None:
if callback is not None:
for event in events:
self.event_emitter.on(event, callback)
if callback == None:
if callback is None:
def handler(function):
for event in events:
self.event_emitter.on(event, function)
return handler
return handler
def once(self, *events, callback = None):
for event in events:
if event not in BfxWebsocketClient.EVENTS:
raise EventNotSupported(f"Event <{event}> is not supported. To get a list of available events print BfxWebsocketClient.EVENTS")
raise EventNotSupported(f"Event <{event}> is not supported. To get a list " \
"of available events print BfxWebsocketClient.EVENTS")
if callback != None:
if callback is not None:
for event in events:
self.event_emitter.once(event, callback)
if callback == None:
if callback is None:
def handler(function):
for event in events:
self.event_emitter.once(event, function)
return handler
return handler

View File

@@ -3,58 +3,92 @@ from datetime import datetime
from typing import Union, Optional, List, Tuple
from .. enums import OrderType, FundingOfferType
from ... utils.JSONEncoder import JSON
from ...utils.json_encoder import JSON
class BfxWebsocketInputs(object):
class BfxWebsocketInputs:
def __init__(self, handle_websocket_input):
self.handle_websocket_input = handle_websocket_input
self.__handle_websocket_input = handle_websocket_input
async def submit_order(self, type: OrderType, symbol: str, amount: Union[Decimal, float, str],
price: Optional[Union[Decimal, float, str]] = None, lev: Optional[int] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None, price_aux_limit: Optional[Union[Decimal, float, str]] = None, price_oco_stop: Optional[Union[Decimal, float, str]] = None,
gid: Optional[int] = None, cid: Optional[int] = None,
flags: Optional[int] = 0, tif: Optional[Union[datetime, str]] = None, meta: Optional[JSON] = None):
await self.handle_websocket_input("on", {
async def submit_order(self,
type: OrderType,
symbol: str,
amount: Union[Decimal, float, str],
*,
price: Optional[Union[Decimal, float, str]] = None,
lev: Optional[int] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None,
price_oco_stop: Optional[Union[Decimal, float, str]] = None,
gid: Optional[int] = None,
cid: Optional[int] = None,
flags: Optional[int] = 0,
tif: Optional[Union[datetime, str]] = None,
meta: Optional[JSON] = None):
await self.__handle_websocket_input("on", {
"type": type, "symbol": symbol, "amount": amount,
"price": price, "lev": lev,
"price_trailing": price_trailing, "price_aux_limit": price_aux_limit, "price_oco_stop": price_oco_stop,
"gid": gid, "cid": cid,
"flags": flags, "tif": tif, "meta": meta
"price": price, "lev": lev, "price_trailing": price_trailing,
"price_aux_limit": price_aux_limit, "price_oco_stop": price_oco_stop, "gid": gid,
"cid": cid, "flags": flags, "tif": tif,
"meta": meta
})
async def update_order(self, id: int, amount: Optional[Union[Decimal, float, str]] = None, price: Optional[Union[Decimal, float, str]] = None,
cid: Optional[int] = None, cid_date: Optional[str] = None, gid: Optional[int] = None,
flags: Optional[int] = 0, lev: Optional[int] = None, delta: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None, price_trailing: Optional[Union[Decimal, float, str]] = None, tif: Optional[Union[datetime, str]] = None):
await self.handle_websocket_input("ou", {
async def update_order(self,
id: int,
*,
amount: Optional[Union[Decimal, float, str]] = None,
price: Optional[Union[Decimal, float, str]] = None,
cid: Optional[int] = None,
cid_date: Optional[str] = None,
gid: Optional[int] = None,
flags: Optional[int] = 0,
lev: Optional[int] = None,
delta: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None,
tif: Optional[Union[datetime, str]] = None):
await self.__handle_websocket_input("ou", {
"id": id, "amount": amount, "price": price,
"cid": cid, "cid_date": cid_date, "gid": gid,
"flags": flags, "lev": lev, "delta": delta,
"price_aux_limit": price_aux_limit, "price_trailing": price_trailing, "tif": tif
})
async def cancel_order(self, id: Optional[int] = None, cid: Optional[int] = None, cid_date: Optional[str] = None):
await self.handle_websocket_input("oc", {
async def cancel_order(self,
*,
id: Optional[int] = None,
cid: Optional[int] = None,
cid_date: Optional[str] = None):
await self.__handle_websocket_input("oc", {
"id": id, "cid": cid, "cid_date": cid_date
})
async def cancel_order_multi(self, ids: Optional[List[int]] = None, cids: Optional[List[Tuple[int, str]]] = None, gids: Optional[List[int]] = None, all: bool = False):
await self.handle_websocket_input("oc_multi", {
async def cancel_order_multi(self,
*,
ids: Optional[List[int]] = None,
cids: Optional[List[Tuple[int, str]]] = None,
gids: Optional[List[int]] = None,
all: bool = False):
await self.__handle_websocket_input("oc_multi", {
"ids": ids, "cids": cids, "gids": gids,
"all": int(all)
})
async def submit_funding_offer(self, type: FundingOfferType, symbol: str, amount: Union[Decimal, float, str],
rate: Union[Decimal, float, str], period: int,
flags: Optional[int] = 0):
await self.handle_websocket_input("fon", {
#pylint: disable-next=too-many-arguments
async def submit_funding_offer(self,
type: FundingOfferType,
symbol: str,
amount: Union[Decimal, float, str],
rate: Union[Decimal, float, str],
period: int,
*,
flags: Optional[int] = 0):
await self.__handle_websocket_input("fon", {
"type": type, "symbol": symbol, "amount": amount,
"rate": rate, "period": period,
"flags": flags
"rate": rate, "period": period, "flags": flags
})
async def cancel_funding_offer(self, id: int):
await self.handle_websocket_input("foc", { "id": id })
await self.__handle_websocket_input("foc", { "id": id })
async def calc(self, *args: str):
await self.handle_websocket_input("calc", list(map(lambda arg: [arg], args)))
await self.__handle_websocket_input("calc", list(map(lambda arg: [arg], args)))

View File

@@ -1,3 +1,4 @@
#pylint: disable-next=wildcard-import,unused-wildcard-import
from .. enums import *
class Channel(str, Enum):
@@ -5,4 +6,4 @@ class Channel(str, Enum):
TRADES = "trades"
BOOK = "book"
CANDLES = "candles"
STATUS = "status"
STATUS = "status"

View File

@@ -16,53 +16,39 @@ class BfxWebsocketException(BfxBaseException):
Base class for all custom exceptions in bfxapi/websocket/exceptions.py.
"""
pass
class ConnectionNotOpen(BfxWebsocketException):
"""
This error indicates an attempt to communicate via websocket before starting the connection with the servers.
"""
pass
class TooManySubscriptions(BfxWebsocketException):
"""
This error indicates an attempt to subscribe to a public channel after reaching the limit of simultaneous connections.
This error indicates a subscription attempt after reaching the limit of simultaneous connections.
"""
pass
class WebsocketAuthenticationRequired(BfxWebsocketException):
"""
This error indicates an attempt to access a protected resource without logging in first.
"""
pass
class EventNotSupported(BfxWebsocketException):
"""
This error indicates a failed attempt to subscribe to an event not supported by the BfxWebsocketClient.
"""
pass
class OutdatedClientVersion(BfxWebsocketException):
"""
This error indicates a mismatch between the client version and the server WSS version.
"""
pass
class InvalidAuthenticationCredentials(BfxWebsocketException):
"""
This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication.
"""
pass
class HandlerNotFound(BfxWebsocketException):
"""
This error indicates that a handler was not found for an incoming message.
"""
pass

View File

@@ -1,4 +1,4 @@
from .public_channels_handler import PublicChannelsHandler
from .authenticated_channels_handler import AuthenticatedChannelsHandler
NAME = "handlers"
NAME = "handlers"

View File

@@ -1,18 +1,19 @@
from .. import serializers
from .. types import *
from .. serializers import _Notification
from .. exceptions import HandlerNotFound
class AuthenticatedChannelsHandler(object):
class AuthenticatedChannelsHandler:
__abbreviations = {
"os": "order_snapshot", "on": "order_new", "ou": "order_update", "oc": "order_cancel",
"ps": "position_snapshot", "pn": "position_new", "pu": "position_update", "pc": "position_close",
"te": "trade_executed", "tu": "trade_execution_update",
"fos": "funding_offer_snapshot", "fon": "funding_offer_new", "fou": "funding_offer_update", "foc": "funding_offer_cancel",
"fcs": "funding_credit_snapshot", "fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close",
"fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update", "flc": "funding_loan_close",
"ws": "wallet_snapshot", "wu": "wallet_update",
"os": "order_snapshot", "on": "order_new", "ou": "order_update",
"oc": "order_cancel", "ps": "position_snapshot", "pn": "position_new",
"pu": "position_update", "pc": "position_close", "te": "trade_executed",
"tu": "trade_execution_update", "fos": "funding_offer_snapshot", "fon": "funding_offer_new",
"fou": "funding_offer_update", "foc": "funding_offer_cancel", "fcs": "funding_credit_snapshot",
"fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close",
"fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update",
"flc": "funding_loan_close", "ws": "wallet_snapshot", "wu": "wallet_update",
"bu": "balance_update",
}
@@ -27,43 +28,42 @@ class AuthenticatedChannelsHandler(object):
("bu",): serializers.Balance
}
EVENTS = [
EVENTS = [
"notification",
"on-req-notification", "ou-req-notification", "oc-req-notification",
"oc_multi-notification",
"fon-req-notification", "foc-req-notification",
*list(__abbreviations.values())
*list(__abbreviations.values())
]
def __init__(self, event_emitter, strict = True):
self.event_emitter, self.strict = event_emitter, strict
def __init__(self, event_emitter):
self.event_emitter = event_emitter
def handle(self, type, stream):
if type == "n":
def handle(self, abbrevation, stream):
if abbrevation == "n":
return self.__notification(stream)
for types, serializer in AuthenticatedChannelsHandler.__serializers.items():
if type in types:
event = AuthenticatedChannelsHandler.__abbreviations[type]
for abbrevations, serializer in AuthenticatedChannelsHandler.__serializers.items():
if abbrevation in abbrevations:
event = AuthenticatedChannelsHandler.__abbreviations[abbrevation]
if all(isinstance(substream, list) for substream in stream):
return self.event_emitter.emit(event, [ serializer.parse(*substream) for substream in stream ])
return self.event_emitter.emit(event, serializer.parse(*stream))
if self.strict:
raise HandlerNotFound(f"No handler found for event of type <{type}>.")
raise HandlerNotFound(f"No handler found for event of type <{abbrevation}>.")
def __notification(self, stream):
type, serializer = "notification", serializers._Notification(serializer=None)
event, serializer = "notification", _Notification(serializer=None)
if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req":
type, serializer = f"{stream[1]}-notification", serializers._Notification(serializer=serializers.Order)
event, serializer = f"{stream[1]}-notification", _Notification(serializer=serializers.Order)
if stream[1] == "oc_multi-req":
type, serializer = f"{stream[1]}-notification", serializers._Notification(serializer=serializers.Order, iterate=True)
event, serializer = f"{stream[1]}-notification", _Notification(serializer=serializers.Order, iterate=True)
if stream[1] == "fon-req" or stream[1] == "foc-req":
type, serializer = f"{stream[1]}-notification", serializers._Notification(serializer=serializers.FundingOffer)
event, serializer = f"{stream[1]}-notification", _Notification(serializer=serializers.FundingOffer)
return self.event_emitter.emit(type, serializer.parse(*stream))
return self.event_emitter.emit(event, serializer.parse(*stream))

View File

@@ -1,20 +1,17 @@
from .. import serializers
from .. types import *
from .. exceptions import HandlerNotFound
class PublicChannelsHandler(object):
class PublicChannelsHandler:
EVENTS = [
"t_ticker_update", "f_ticker_update",
"t_trade_executed", "t_trade_execution_update", "f_trade_executed", "f_trade_execution_update", "t_trades_snapshot", "f_trades_snapshot",
"t_book_snapshot", "f_book_snapshot", "t_raw_book_snapshot", "f_raw_book_snapshot", "t_book_update", "f_book_update", "t_raw_book_update", "f_raw_book_update",
"candles_snapshot", "candles_update",
"derivatives_status_update",
"t_ticker_update", "f_ticker_update", "t_trade_executed", "t_trade_execution_update", "f_trade_executed",
"f_trade_execution_update", "t_trades_snapshot", "f_trades_snapshot", "t_book_snapshot", "f_book_snapshot",
"t_raw_book_snapshot", "f_raw_book_snapshot", "t_book_update", "f_book_update", "t_raw_book_update",
"f_raw_book_update", "candles_snapshot", "candles_update", "derivatives_status_update",
]
def __init__(self, event_emitter, strict = True):
self.event_emitter, self.strict = event_emitter, strict
def __init__(self, event_emitter):
self.event_emitter = event_emitter
self.__handlers = {
"ticker": self.__ticker_channel_handler,
@@ -25,13 +22,14 @@ class PublicChannelsHandler(object):
}
def handle(self, subscription, *stream):
#pylint: disable-next=unnecessary-lambda-assignment
_clear = lambda dictionary, *args: { key: value for key, value in dictionary.items() if key not in args }
#pylint: disable-next=consider-iterating-dictionary
if (channel := subscription["channel"]) and channel in self.__handlers.keys():
return self.__handlers[channel](_clear(subscription, "event", "channel", "chanId"), *stream)
if self.strict:
raise HandlerNotFound(f"No handler found for channel <{subscription['channel']}>.")
raise HandlerNotFound(f"No handler found for channel <{subscription['channel']}>.")
def __ticker_channel_handler(self, subscription, *stream):
if subscription["symbol"].startswith("t"):
@@ -49,17 +47,17 @@ class PublicChannelsHandler(object):
)
def __trades_channel_handler(self, subscription, *stream):
if (type := stream[0]) and type in [ "te", "tu", "fte", "ftu" ]:
if (event := stream[0]) and event in [ "te", "tu", "fte", "ftu" ]:
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
{ "te": "t_trade_executed", "tu": "t_trade_execution_update" }[type],
{ "te": "t_trade_executed", "tu": "t_trade_execution_update" }[event],
subscription,
serializers.TradingPairTrade.parse(*stream[1])
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
{ "fte": "f_trade_executed", "ftu": "f_trade_execution_update" }[type],
{ "fte": "f_trade_executed", "ftu": "f_trade_execution_update" }[event],
subscription,
serializers.FundingCurrencyTrade.parse(*stream[1])
)
@@ -79,36 +77,39 @@ class PublicChannelsHandler(object):
)
def __book_channel_handler(self, subscription, *stream):
type = subscription["symbol"][0]
event = subscription["symbol"][0]
if subscription["prec"] == "R0":
_trading_pair_serializer, _funding_currency_serializer, IS_RAW_BOOK = serializers.TradingPairRawBook, serializers.FundingCurrencyRawBook, True
else: _trading_pair_serializer, _funding_currency_serializer, IS_RAW_BOOK = serializers.TradingPairBook, serializers.FundingCurrencyBook, False
_trading_pair_serializer, _funding_currency_serializer, is_raw_book = \
serializers.TradingPairRawBook, serializers.FundingCurrencyRawBook, True
else: _trading_pair_serializer, _funding_currency_serializer, is_raw_book = \
serializers.TradingPairBook, serializers.FundingCurrencyBook, False
if all(isinstance(substream, list) for substream in stream[0]):
return self.event_emitter.emit(
type + "_" + (IS_RAW_BOOK and "raw_book" or "book") + "_snapshot",
subscription,
[ { "t": _trading_pair_serializer, "f": _funding_currency_serializer }[type].parse(*substream) for substream in stream[0] ]
return self.event_emitter.emit(
event + "_" + (is_raw_book and "raw_book" or "book") + "_snapshot",
subscription,
[ { "t": _trading_pair_serializer, "f": _funding_currency_serializer }[event] \
.parse(*substream) for substream in stream[0] ]
)
return self.event_emitter.emit(
type + "_" + (IS_RAW_BOOK and "raw_book" or "book") + "_update",
subscription,
{ "t": _trading_pair_serializer, "f": _funding_currency_serializer }[type].parse(*stream[0])
event + "_" + (is_raw_book and "raw_book" or "book") + "_update",
subscription,
{ "t": _trading_pair_serializer, "f": _funding_currency_serializer }[event].parse(*stream[0])
)
def __candles_channel_handler(self, subscription, *stream):
if all(isinstance(substream, list) for substream in stream[0]):
return self.event_emitter.emit(
"candles_snapshot",
subscription,
subscription,
[ serializers.Candle.parse(*substream) for substream in stream[0] ]
)
return self.event_emitter.emit(
"candles_update",
subscription,
"candles_update",
subscription,
serializers.Candle.parse(*stream[0])
)
@@ -118,4 +119,4 @@ class PublicChannelsHandler(object):
"derivatives_status_update",
subscription,
serializers.DerivativesStatus.parse(*stream[0])
)
)

View File

@@ -1,7 +1,10 @@
#pylint: disable=duplicate-code
from . import types
from .. labeler import generate_labeler_serializer
#pylint: disable-next=unused-import
from .. notification import _Notification
__serializers__ = [
@@ -17,277 +20,349 @@ __serializers__ = [
#region Serializers definition for Websocket Public Channels
TradingPairTicker = generate_labeler_serializer("TradingPairTicker", klass=types.TradingPairTicker, labels=[
"bid",
"bid_size",
"ask",
"ask_size",
"daily_change",
"daily_change_relative",
"last_price",
"volume",
"high",
"low"
])
TradingPairTicker = generate_labeler_serializer(
name="TradingPairTicker",
klass=types.TradingPairTicker,
labels=[
"bid",
"bid_size",
"ask",
"ask_size",
"daily_change",
"daily_change_relative",
"last_price",
"volume",
"high",
"low"
]
)
FundingCurrencyTicker = generate_labeler_serializer("FundingCurrencyTicker", klass=types.FundingCurrencyTicker, labels=[
"frr",
"bid",
"bid_period",
"bid_size",
"ask",
"ask_period",
"ask_size",
"daily_change",
"daily_change_relative",
"last_price",
"volume",
"high",
"low",
"_PLACEHOLDER",
"_PLACEHOLDER",
"frr_amount_available"
])
FundingCurrencyTicker = generate_labeler_serializer(
name="FundingCurrencyTicker",
klass=types.FundingCurrencyTicker,
labels=[
"frr",
"bid",
"bid_period",
"bid_size",
"ask",
"ask_period",
"ask_size",
"daily_change",
"daily_change_relative",
"last_price",
"volume",
"high",
"low",
"_PLACEHOLDER",
"_PLACEHOLDER",
"frr_amount_available"
]
)
TradingPairTrade = generate_labeler_serializer("TradingPairTrade", klass=types.TradingPairTrade, labels=[
"id",
"mts",
"amount",
"price"
])
TradingPairTrade = generate_labeler_serializer(
name="TradingPairTrade",
klass=types.TradingPairTrade,
labels=[
"id",
"mts",
"amount",
"price"
]
)
FundingCurrencyTrade = generate_labeler_serializer("FundingCurrencyTrade", klass=types.FundingCurrencyTrade, labels=[
"id",
"mts",
"amount",
"rate",
"period"
])
FundingCurrencyTrade = generate_labeler_serializer(
name="FundingCurrencyTrade",
klass=types.FundingCurrencyTrade,
labels=[
"id",
"mts",
"amount",
"rate",
"period"
]
)
TradingPairBook = generate_labeler_serializer("TradingPairBook", klass=types.TradingPairBook, labels=[
"price",
"count",
"amount"
])
TradingPairBook = generate_labeler_serializer(
name="TradingPairBook",
klass=types.TradingPairBook,
labels=[
"price",
"count",
"amount"
]
)
FundingCurrencyBook = generate_labeler_serializer("FundingCurrencyBook", klass=types.FundingCurrencyBook, labels=[
"rate",
"period",
"count",
"amount"
])
FundingCurrencyBook = generate_labeler_serializer(
name="FundingCurrencyBook",
klass=types.FundingCurrencyBook,
labels=[
"rate",
"period",
"count",
"amount"
]
)
TradingPairRawBook = generate_labeler_serializer("TradingPairRawBook", klass=types.TradingPairRawBook, labels=[
"order_id",
"price",
"amount"
])
TradingPairRawBook = generate_labeler_serializer(
name="TradingPairRawBook",
klass=types.TradingPairRawBook,
labels=[
"order_id",
"price",
"amount"
]
)
FundingCurrencyRawBook = generate_labeler_serializer("FundingCurrencyRawBook", klass=types.FundingCurrencyRawBook, labels=[
"offer_id",
"period",
"rate",
"amount"
])
FundingCurrencyRawBook = generate_labeler_serializer(
name="FundingCurrencyRawBook",
klass=types.FundingCurrencyRawBook,
labels=[
"offer_id",
"period",
"rate",
"amount"
]
)
Candle = generate_labeler_serializer("Candle", klass=types.Candle, labels=[
"mts",
"open",
"close",
"high",
"low",
"volume"
])
Candle = generate_labeler_serializer(
name="Candle",
klass=types.Candle,
labels=[
"mts",
"open",
"close",
"high",
"low",
"volume"
]
)
DerivativesStatus = generate_labeler_serializer("DerivativesStatus", klass=types.DerivativesStatus, labels=[
"mts",
"_PLACEHOLDER",
"deriv_price",
"spot_price",
"_PLACEHOLDER",
"insurance_fund_balance",
"_PLACEHOLDER",
"next_funding_evt_timestamp_ms",
"next_funding_accrued",
"next_funding_step",
"_PLACEHOLDER",
"current_funding",
"_PLACEHOLDER",
"_PLACEHOLDER",
"mark_price",
"_PLACEHOLDER",
"_PLACEHOLDER",
"open_interest",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"clamp_min",
"clamp_max"
])
DerivativesStatus = generate_labeler_serializer(
name="DerivativesStatus",
klass=types.DerivativesStatus,
labels=[
"mts",
"_PLACEHOLDER",
"deriv_price",
"spot_price",
"_PLACEHOLDER",
"insurance_fund_balance",
"_PLACEHOLDER",
"next_funding_evt_timestamp_ms",
"next_funding_accrued",
"next_funding_step",
"_PLACEHOLDER",
"current_funding",
"_PLACEHOLDER",
"_PLACEHOLDER",
"mark_price",
"_PLACEHOLDER",
"_PLACEHOLDER",
"open_interest",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"clamp_min",
"clamp_max"
]
)
#endregion
#region Serializers definition for Websocket Authenticated Channels
Order = generate_labeler_serializer("Order", klass=types.Order, labels=[
"id",
"gid",
"cid",
"symbol",
"mts_create",
"mts_update",
"amount",
"amount_orig",
"order_type",
"type_prev",
"mts_tif",
"_PLACEHOLDER",
"flags",
"order_status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"price",
"price_avg",
"price_trailing",
"price_aux_limit",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"notify",
"hidden",
"placed_id",
"_PLACEHOLDER",
"_PLACEHOLDER",
"routing",
"_PLACEHOLDER",
"_PLACEHOLDER",
"meta"
])
Order = generate_labeler_serializer(
name="Order",
klass=types.Order,
labels=[
"id",
"gid",
"cid",
"symbol",
"mts_create",
"mts_update",
"amount",
"amount_orig",
"order_type",
"type_prev",
"mts_tif",
"_PLACEHOLDER",
"flags",
"order_status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"price",
"price_avg",
"price_trailing",
"price_aux_limit",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"notify",
"hidden",
"placed_id",
"_PLACEHOLDER",
"_PLACEHOLDER",
"routing",
"_PLACEHOLDER",
"_PLACEHOLDER",
"meta"
]
)
Position = generate_labeler_serializer("Position", klass=types.Position, labels=[
"symbol",
"status",
"amount",
"base_price",
"margin_funding",
"margin_funding_type",
"pl",
"pl_perc",
"price_liq",
"leverage",
"flag",
"position_id",
"mts_create",
"mts_update",
"_PLACEHOLDER",
"type",
"_PLACEHOLDER",
"collateral",
"collateral_min",
"meta"
])
Position = generate_labeler_serializer(
name="Position",
klass=types.Position,
labels=[
"symbol",
"status",
"amount",
"base_price",
"margin_funding",
"margin_funding_type",
"pl",
"pl_perc",
"price_liq",
"leverage",
"flag",
"position_id",
"mts_create",
"mts_update",
"_PLACEHOLDER",
"type",
"_PLACEHOLDER",
"collateral",
"collateral_min",
"meta"
]
)
Trade = generate_labeler_serializer("Trade", klass=types.Trade, labels=[
"id",
"symbol",
"mts_create",
"order_id",
"exec_amount",
"exec_price",
"order_type",
"order_price",
"maker",
"fee",
"fee_currency",
"cid"
])
Trade = generate_labeler_serializer(
name="Trade",
klass=types.Trade,
labels=[
"id",
"symbol",
"mts_create",
"order_id",
"exec_amount",
"exec_price",
"order_type",
"order_price",
"maker",
"fee",
"fee_currency",
"cid"
]
)
FundingOffer = generate_labeler_serializer("FundingOffer", klass=types.FundingOffer, labels=[
"id",
"symbol",
"mts_create",
"mts_update",
"amount",
"amount_orig",
"offer_type",
"_PLACEHOLDER",
"_PLACEHOLDER",
"flags",
"offer_status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"rate",
"period",
"notify",
"hidden",
"_PLACEHOLDER",
"renew",
"_PLACEHOLDER"
])
FundingOffer = generate_labeler_serializer(
name="FundingOffer",
klass=types.FundingOffer,
labels=[
"id",
"symbol",
"mts_create",
"mts_update",
"amount",
"amount_orig",
"offer_type",
"_PLACEHOLDER",
"_PLACEHOLDER",
"flags",
"offer_status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"rate",
"period",
"notify",
"hidden",
"_PLACEHOLDER",
"renew",
"_PLACEHOLDER"
]
)
FundingCredit = generate_labeler_serializer("FundingCredit", klass=types.FundingCredit, labels=[
"id",
"symbol",
"side",
"mts_create",
"mts_update",
"amount",
"flags",
"status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"rate",
"period",
"mts_opening",
"mts_last_payout",
"notify",
"hidden",
"_PLACEHOLDER",
"renew",
"_PLACEHOLDER",
"no_close",
"position_pair"
])
FundingCredit = generate_labeler_serializer(
name="FundingCredit",
klass=types.FundingCredit,
labels=[
"id",
"symbol",
"side",
"mts_create",
"mts_update",
"amount",
"flags",
"status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"rate",
"period",
"mts_opening",
"mts_last_payout",
"notify",
"hidden",
"_PLACEHOLDER",
"renew",
"_PLACEHOLDER",
"no_close",
"position_pair"
]
)
FundingLoan = generate_labeler_serializer("FundingLoan", klass=types.FundingLoan, labels=[
"id",
"symbol",
"side",
"mts_create",
"mts_update",
"amount",
"flags",
"status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"rate",
"period",
"mts_opening",
"mts_last_payout",
"notify",
"hidden",
"_PLACEHOLDER",
"renew",
"_PLACEHOLDER",
"no_close"
])
FundingLoan = generate_labeler_serializer(
name="FundingLoan",
klass=types.FundingLoan,
labels=[
"id",
"symbol",
"side",
"mts_create",
"mts_update",
"amount",
"flags",
"status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"rate",
"period",
"mts_opening",
"mts_last_payout",
"notify",
"hidden",
"_PLACEHOLDER",
"renew",
"_PLACEHOLDER",
"no_close"
]
)
Wallet = generate_labeler_serializer("Wallet", klass=types.Wallet, labels=[
"wallet_type",
"currency",
"balance",
"unsettled_interest",
"available_balance",
"last_change",
"trade_details"
])
Wallet = generate_labeler_serializer(
name="Wallet",
klass=types.Wallet,
labels=[
"wallet_type",
"currency",
"balance",
"unsettled_interest",
"available_balance",
"last_change",
"trade_details"
]
)
Balance = generate_labeler_serializer("Balance", klass=types.Balance, labels=[
"aum",
"aum_net",
])
Balance = generate_labeler_serializer(
name="Balance",
klass=types.Balance,
labels=[
"aum",
"aum_net"
]
)
#endregion
#endregion

View File

@@ -15,12 +15,14 @@ _Header = TypedDict("_Header", { "event": Literal["subscribed"], "channel": str,
Subscription = Union["Ticker", "Trades", "Book", "Candles", "Status"]
class Ticker(TypedDict):
subId: str; symbol: str
subId: str
symbol: str
pair: Optional[str]
currency: Optional[str]
class Trades(TypedDict):
subId: str; symbol: str
subId: str
symbol: str
pair: Optional[str]
currency: Optional[str]
@@ -38,4 +40,4 @@ class Candles(TypedDict):
class Status(TypedDict):
subId: str
key: str
key: str

View File

@@ -1,10 +1,16 @@
#pylint: disable=duplicate-code
#pylint: disable-next=wildcard-import,unused-wildcard-import
from typing import *
from dataclasses import dataclass
from .. labeler import _Type
#pylint: disable-next=unused-import
from .. notification import Notification
from .. utils.JSONEncoder import JSON
from ..utils.json_encoder import JSON
#region Type hinting for Websocket Public Channels
@@ -40,43 +46,43 @@ class FundingCurrencyTicker(_Type):
@dataclass
class TradingPairTrade(_Type):
id: int
mts: int
amount: float
id: int
mts: int
amount: float
price: float
@dataclass
class FundingCurrencyTrade(_Type):
id: int
mts: int
amount: float
rate: float
id: int
mts: int
amount: float
rate: float
period: int
@dataclass
class TradingPairBook(_Type):
price: float
count: int
price: float
count: int
amount: float
@dataclass
class FundingCurrencyBook(_Type):
rate: float
period: int
count: int
rate: float
period: int
count: int
amount: float
@dataclass
@dataclass
class TradingPairRawBook(_Type):
order_id: int
price: float
price: float
amount: float
@dataclass
@dataclass
class FundingCurrencyRawBook(_Type):
offer_id: int
period: int
rate: float
offer_id: int
period: int
rate: float
amount: float
@dataclass
@@ -154,14 +160,14 @@ class Position(_Type):
@dataclass
class Trade(_Type):
id: int
symbol: str
id: int
symbol: str
mts_create: int
order_id: int
exec_amount: float
exec_price: float
order_type: str
order_price: float
order_id: int
exec_amount: float
exec_price: float
order_type: str
order_price: float
maker:int
fee: Optional[float]
fee_currency: Optional[str]
@@ -238,4 +244,4 @@ class Balance(_Type):
aum: float
aum_net: float
#endregion
#endregion

View File

@@ -16,4 +16,4 @@ bfx = Client(
for position in bfx.rest.auth.get_positions():
notification: Notification[PositionClaim] = bfx.rest.auth.claim_position(position.position_id)
claim: PositionClaim = notification.data
print(f"Position: {position} | PositionClaim: {claim}")
print(f"Position: {position} | PositionClaim: {claim}")

View File

@@ -19,8 +19,8 @@ wallets: List[Wallet] = bfx.rest.auth.get_wallets()
# Transfers funds (0.001 ETH) from exchange wallet to funding wallet
A: Notification[Transfer] = bfx.rest.auth.transfer_between_wallets(
from_wallet="exchange", to_wallet="funding", from_currency="ETH",
to_currency="ETH", amount=0.001)
from_wallet="exchange", to_wallet="funding", currency="ETH",
currency_to="ETH", amount=0.001)
print("Transfer:", A.data)
@@ -41,4 +41,4 @@ D: Notification[Withdrawal] = bfx.rest.auth.submit_wallet_withdrawal(
wallet="exchange", method="tetheruse", address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e",
amount=1.0)
print("Withdrawal:", D.data)
print("Withdrawal:", D.data)

View File

@@ -32,5 +32,5 @@ print("Status:", bool(derivative_position_collateral.status))
derivative_position_collateral_limits: DerivativePositionCollateralLimits = \
bfx.rest.auth.get_derivative_position_collateral_limits(symbol="tBTCF0:USTF0")
print(f"Minimum collateral: {derivative_position_collateral_limits.min_collateral} | \
Maximum collateral: {derivative_position_collateral_limits.max_collateral}")
print(f"Minimum collateral: {derivative_position_collateral_limits.min_collateral} | " \
f"Maximum collateral: {derivative_position_collateral_limits.max_collateral}")

View File

@@ -14,11 +14,11 @@ bfx = Client(
# Submit a new funding offer
notification: Notification[FundingOffer] = bfx.rest.auth.submit_funding_offer(
type=FundingOfferType.LIMIT,
symbol="fUSD",
amount=123.45,
rate=0.001,
period=2,
type=FundingOfferType.LIMIT,
symbol="fUSD",
amount=123.45,
rate=0.001,
period=2,
flags=Flag.HIDDEN
)
@@ -27,4 +27,4 @@ print("Funding Offer notification:", notification)
# Get all fUSD active funding offers
offers = bfx.rest.auth.get_funding_offers(symbol="fUSD")
print("Offers (fUSD):", offers)
print("Offers (fUSD):", offers)

View File

@@ -15,9 +15,9 @@ bfx = Client(
# Submit a new order
submit_order_notification: Notification[Order] = bfx.rest.auth.submit_order(
type=OrderType.EXCHANGE_LIMIT,
symbol="tBTCUST",
amount=0.015,
price=10000,
symbol="tBTCUST",
amount=0.015,
price=10000,
flags=Flag.HIDDEN + Flag.OCO + Flag.CLOSE
)
@@ -28,7 +28,7 @@ order: Order = submit_order_notification.data
# Update its amount and its price
update_order_notification: Notification[Order] = bfx.rest.auth.update_order(
id=order.id,
amount=0.020,
amount=0.020,
price=10150
)
@@ -39,4 +39,4 @@ cancel_order_notification: Notification[Order] = bfx.rest.auth.cancel_order(
id=order.id
)
print("Cancel order notification:", cancel_order_notification)
print("Cancel order notification:", cancel_order_notification)

View File

@@ -16,9 +16,9 @@ loans: List[FundingLoan] = bfx.rest.auth.get_funding_loans(symbol="fUSD")
# Set every loan's keep funding status to <off> (1: <on>, 2: <off>)
notification: Notification[None] = bfx.rest.auth.toggle_keep_funding(
funding_type="loan",
type="loan",
ids=[ loan.id for loan in loans ],
changes={ loan.id: 2 for loan in loans }
)
print("Toggle keep funding notification:", notification)
print("Toggle keep funding notification:", notification)

View File

@@ -2,7 +2,7 @@
import os
from bfxapi import Client, REST_HOST
from bfxapi import Client, REST_HOST
from bfxapi.rest.enums import MerchantSettingsKey
@@ -15,7 +15,7 @@ bfx = Client(
if not bfx.rest.merchant.set_merchant_settings(MerchantSettingsKey.RECOMMEND_STORE, 1):
print(f"Cannot set <{MerchantSettingsKey.RECOMMEND_STORE}> to <1>.")
print(f"The current <{MerchantSettingsKey.PREFERRED_FIAT}> value is:",
print(f"The current <{MerchantSettingsKey.PREFERRED_FIAT}> value is:",
bfx.rest.merchant.get_merchant_settings(MerchantSettingsKey.PREFERRED_FIAT))
settings = bfx.rest.merchant.list_merchant_settings([
@@ -25,4 +25,4 @@ settings = bfx.rest.merchant.list_merchant_settings([
])
for key, value in settings.items():
print(f"<{key}>:", value)
print(f"<{key}>:", value)

View File

@@ -2,7 +2,7 @@
import os
from bfxapi import Client, REST_HOST
from bfxapi import Client, REST_HOST
from bfxapi.rest.types import InvoiceSubmission
@@ -42,4 +42,4 @@ print(bfx.rest.merchant.complete_invoice(
print(bfx.rest.merchant.get_invoices(limit=25))
print(bfx.rest.merchant.get_invoices_paginated(page=1, page_size=60, sort="asc", sort_field="t"))
print(bfx.rest.merchant.get_invoices_paginated(page=1, page_size=60, sort="asc", sort_field="t"))

View File

@@ -21,4 +21,4 @@ print("25 price points of fUSD order book (with precision P0):", f_book)
f_raw_book: List[FundingCurrencyRawBook] = bfx.rest.public.get_f_raw_book("fUSD")
print("fUSD raw order book:", f_raw_book)
print("fUSD raw order book:", f_raw_book)

View File

@@ -15,4 +15,4 @@ print (bfx.rest.public.conf(Config.MAP_CURRENCY_SYM))
print(bfx.rest.public.conf(Config.LIST_PAIR_EXCHANGE))
# Prints all the available funding currencies (pub:list:currency)
print(bfx.rest.public.conf(Config.LIST_CURRENCY))
print(bfx.rest.public.conf(Config.LIST_CURRENCY))

View File

@@ -8,4 +8,4 @@ print(f"Candles: {bfx.rest.public.get_candles_hist(symbol='tBTCUSD')}")
# Be sure to specify a period or aggregated period when retrieving funding candles.
# If you wish to mimic the candles found in the UI, use the following setup to aggregate all funding candles: a30:p2:p30
print(f"Candles: {bfx.rest.public.get_candles_hist(tf='15m', symbol='fUSD:a30:p2:p30')}")
print(f"Candles: {bfx.rest.public.get_candles_hist(tf='15m', symbol='fUSD:a30:p2:p30')}")

View File

@@ -9,7 +9,7 @@ from bfxapi.rest.types import List, PulseMessage, PulseProfile
bfx = Client(rest_host=PUB_REST_HOST)
# POSIX timestamp in milliseconds (check https://currentmillis.com/)
end = datetime.datetime(2020, 5, 2).timestamp() * 1000
end = datetime.datetime(2020, 5, 2).timestamp() * 1000
# Retrieves 25 pulse messages up to 2020/05/02
messages: List[PulseMessage] = bfx.rest.public.get_pulse_history(end=end, limit=25)
@@ -21,4 +21,4 @@ for message in messages:
profile: PulseProfile = bfx.rest.public.get_pulse_profile("News")
URL = profile.picture.replace("size", "small")
print(f"<{profile.nickname}>'s profile picture: https://s3-eu-west-1.amazonaws.com/bfx-pub/{URL}")
print(f"<{profile.nickname}>'s profile picture: https://s3-eu-west-1.amazonaws.com/bfx-pub/{URL}")

View File

@@ -25,4 +25,4 @@ print("Average execution rate for fUSD:", funding_market_average_price.rate_avg)
fx_rate: FxRate = bfx.rest.public.get_fx_rate(ccy1="USD", ccy2="EUR")
print("Exchange rate between USD and EUR:", fx_rate.current_rate)
print("Exchange rate between USD and EUR:", fx_rate.current_rate)

View File

@@ -14,4 +14,4 @@ print("Latest 15 trades for tBTCUSD (in ascending order):", t_trades)
f_trades: List[FundingCurrencyTrade] = bfx.rest.public.get_f_trades("fUSD", \
limit=15, sort=Sort.DESCENDING)
print("Latest 15 trades for fUSD (in descending order):", f_trades)
print("Latest 15 trades for fUSD (in descending order):", f_trades)

View File

@@ -41,4 +41,4 @@ async def on_order_new(order_new: Order):
def on_subscribed(subscription):
print(f"Subscription successful for <{subscription}>.")
bfx.wss.run()
bfx.wss.run()

View File

@@ -27,4 +27,4 @@ def on_wallet_snapshot(wallets: List[Wallet]):
def on_wallet_update(wallet: Wallet):
print(f"Wallet update: {wallet}")
bfx.wss.run()
bfx.wss.run()

View File

@@ -10,14 +10,14 @@ bfx = Client(wss_host=PUB_WSS_HOST)
@bfx.wss.on("derivatives_status_update")
def on_derivatives_status_update(subscription: subscriptions.Status, data: DerivativesStatus):
print(f"{subscription}:", data)
print(f"{subscription}:", data)
@bfx.wss.on("wss-error")
def on_wss_error(code: Error, msg: str):
print(code, msg)
@bfx.wss.once("open")
async def open():
async def on_open():
await bfx.wss.subscribe(Channel.STATUS, key="deriv:tBTCF0:USTF0")
bfx.wss.run()
bfx.wss.run()

View File

@@ -10,21 +10,21 @@ from bfxapi.websocket import subscriptions
from bfxapi.websocket.enums import Channel, Error
from bfxapi.websocket.types import TradingPairBook
class OrderBook(object):
class OrderBook:
def __init__(self, symbols: List[str]):
self.__order_book = {
symbol: {
symbol: {
"bids": OrderedDict(), "asks": OrderedDict()
} for symbol in symbols
}
def update(self, symbol: str, data: TradingPairBook) -> None:
price, count, amount = data.price, data.count, data.amount
kind = (amount > 0) and "bids" or "asks"
kind = "bids" if amount > 0 else "asks"
if count > 0:
self.__order_book[symbol][kind][price] = {
self.__order_book[symbol][kind][price] = {
"price": price,
"count": count,
"amount": amount
@@ -62,4 +62,4 @@ def on_t_book_snapshot(subscription: subscriptions.Book, snapshot: List[TradingP
def on_t_book_update(subscription: subscriptions.Book, data: TradingPairBook):
order_book.update(subscription["symbol"], data)
bfx.wss.run()
bfx.wss.run()

View File

@@ -10,21 +10,21 @@ from bfxapi.websocket import subscriptions
from bfxapi.websocket.enums import Channel, Error
from bfxapi.websocket.types import TradingPairRawBook
class RawOrderBook(object):
class RawOrderBook:
def __init__(self, symbols: List[str]):
self.__raw_order_book = {
symbol: {
symbol: {
"bids": OrderedDict(), "asks": OrderedDict()
} for symbol in symbols
}
def update(self, symbol: str, data: TradingPairRawBook) -> None:
order_id, price, amount = data.order_id, data.price, data.amount
kind = (amount > 0) and "bids" or "asks"
kind = "bids" if amount > 0 else "asks"
if price > 0:
self.__raw_order_book[symbol][kind][order_id] = {
self.__raw_order_book[symbol][kind][order_id] = {
"order_id": order_id,
"price": price,
"amount": amount
@@ -62,4 +62,4 @@ def on_t_raw_book_snapshot(subscription: subscriptions.Book, snapshot: List[Trad
def on_t_raw_book_update(subscription: subscriptions.Book, data: TradingPairRawBook):
raw_order_book.update(subscription["symbol"], data)
bfx.wss.run()
bfx.wss.run()

View File

@@ -15,7 +15,7 @@ def on_t_ticker_update(subscription: subscriptions.Ticker, data: TradingPairTick
print(f"Data: {data}")
@bfx.wss.once("open")
async def open():
async def on_open():
await bfx.wss.subscribe(Channel.TICKER, symbol="tBTCUSD")
bfx.wss.run()
bfx.wss.run()

View File

@@ -9,21 +9,21 @@ from bfxapi.websocket import subscriptions
bfx = Client(wss_host=PUB_WSS_HOST)
@bfx.wss.on("candles_update")
def on_candles_update(subscription: subscriptions.Candles, candle: Candle):
print(f"New candle: {candle}")
def on_candles_update(_sub: subscriptions.Candles, candle: Candle):
print(f"New candle: {candle}")
@bfx.wss.on("t_trade_executed")
def on_t_trade_executed(subscription: subscriptions.Trades, trade: TradingPairTrade):
print(f"New trade: {trade}")
def on_t_trade_executed(_sub: subscriptions.Trades, trade: TradingPairTrade):
print(f"New trade: {trade}")
@bfx.wss.on("wss-error")
def on_wss_error(code: Error, msg: str):
print(code, msg)
@bfx.wss.once("open")
async def open():
async def on_open():
await bfx.wss.subscribe(Channel.CANDLES, key="trade:1m:tBTCUSD")
await bfx.wss.subscribe(Channel.TRADES, symbol="tBTCUSD")
bfx.wss.run()
bfx.wss.run()

View File

@@ -27,7 +27,7 @@ setup(
"Bug Reports": "https://github.com/bitfinexcom/bitfinex-api-py/issues",
"Source": "https://github.com/bitfinexcom/bitfinex-api-py",
},
packages=[
packages=[
"bfxapi", "bfxapi.utils",
"bfxapi.websocket", "bfxapi.websocket.client", "bfxapi.websocket.handlers",
"bfxapi.rest", "bfxapi.rest.endpoints", "bfxapi.rest.middleware",
@@ -38,4 +38,4 @@ setup(
"requests~=2.28.1"
],
python_requires=">=3.8"
)
)