Merge pull request #219 from Davi0kProgramsThings/feature/improvements

Merge branch `Davi0kProgramsThings:feature/improvements` into branch `bitfinexcom:v3-beta`.
This commit is contained in:
Vigan Abdurrahmani
2023-04-26 17:03:34 +02:00
committed by GitHub
49 changed files with 487 additions and 1053 deletions

View File

@@ -1,7 +1,7 @@
from typing import List, Optional
from typing import List, Literal, Optional
from .rest import BfxRestInterface
from .websocket import BfxWebsocketClient
from .websocket import BfxWebSocketClient
from .urls import REST_HOST, WSS_HOST
class Client:
@@ -13,8 +13,9 @@ class Client:
*,
rest_host: str = REST_HOST,
wss_host: str = WSS_HOST,
wss_timeout: Optional[float] = 60 * 15,
log_filename: Optional[str] = None,
log_level: str = "INFO"
log_level: Literal["ERROR", "WARNING", "INFO", "DEBUG"] = "INFO"
):
credentials = None
@@ -26,9 +27,10 @@ class Client:
credentials=credentials
)
self.wss = BfxWebsocketClient(
self.wss = BfxWebSocketClient(
host=wss_host,
credentials=credentials,
wss_timeout=wss_timeout,
log_filename=log_filename,
log_level=log_level
)

View File

@@ -1,16 +1,8 @@
__all__ = [
"BfxBaseException",
"LabelerSerializerException",
]
class BfxBaseException(Exception):
"""
Base class for every custom exception in bfxapi/rest/exceptions.py and bfxapi/websocket/exceptions.py.
"""
class LabelerSerializerException(BfxBaseException):
"""
This exception indicates an error thrown by the _Serializer class in bfxapi/labeler.py.
"""

View File

@@ -2,7 +2,11 @@ from typing import Dict, List, Tuple, Union, Literal, Optional
from decimal import Decimal
from datetime import datetime
from .. types import Notification, \
from ..middleware import Middleware
from ..enums import Sort, OrderType, FundingOfferType
from ...types import JSON, Notification, \
UserInfo, LoginHistory, BalanceAvailable, \
Order, Position, Trade, \
FundingTrade, OrderTrade, Ledger, \
@@ -14,13 +18,9 @@ from .. types import Notification, \
PositionIncrease, PositionHistory, PositionSnapshot, \
PositionAudit, DerivativePositionCollateral, DerivativePositionCollateralLimits
from .. import serializers
from ...types import serializers
from .. serializers import _Notification
from .. enums import Sort, OrderType, FundingOfferType
from .. middleware import Middleware
from ...utils.json_encoder import JSON
from ...types.serializers import _Notification
class RestAuthenticatedEndpoints(Middleware):
def get_user_info(self) -> UserInfo:

View File

@@ -1,16 +1,41 @@
from typing import TypedDict, Dict, List, Union, Literal, Optional, Any
import re
from typing import Callable, TypeVar, cast, \
TypedDict, Dict, List, Union, Literal, Optional, Any
from decimal import Decimal
from .. types import \
from ..middleware import Middleware
from ..enums import MerchantSettingsKey
from ...types import \
InvoiceSubmission, InvoicePage, InvoiceStats, \
CurrencyConversion, MerchantDeposit, MerchantUnlinkedDeposit
from .. enums import MerchantSettingsKey
#region Defining methods to convert dictionary keys to snake_case and camelCase.
from .. middleware import Middleware
T = TypeVar("T")
from ...utils.camel_and_snake_case_helpers import to_snake_case_keys, to_camel_case_keys
_to_snake_case: Callable[[str], str] = lambda string: re.sub(r"(?<!^)(?=[A-Z])", "_", string).lower()
_to_camel_case: Callable[[str], str] = lambda string: \
(components := string.split("_"))[0] + str().join(c.title() for c in components[1:])
def _scheme(data: T, adapter: Callable[[str], str]) -> T:
if isinstance(data, list):
return cast(T, [ _scheme(sub_data, adapter) for sub_data in data ])
if isinstance(data, dict):
return cast(T, { adapter(key): _scheme(value, adapter) for key, value in data.items() })
return data
def _to_snake_case_keys(dictionary: T) -> T:
return _scheme(dictionary, _to_snake_case)
def _to_camel_case_keys(dictionary: T) -> T:
return _scheme(dictionary, _to_camel_case)
#endregion
_CustomerInfo = TypedDict("_CustomerInfo", {
"nationality": str, "resid_country": str, "resid_city": str,
@@ -30,13 +55,13 @@ class RestMerchantEndpoints(Middleware):
duration: Optional[int] = None,
webhook: Optional[str] = None,
redirect_url: Optional[str] = None) -> InvoiceSubmission:
body = to_camel_case_keys({
body = _to_camel_case_keys({
"amount": amount, "currency": currency, "order_id": order_id,
"customer_info": customer_info, "pay_currencies": pay_currencies, "duration": duration,
"webhook": webhook, "redirect_url": redirect_url
})
data = to_snake_case_keys(self._post("auth/w/ext/pay/invoice/create", body=body))
data = _to_snake_case_keys(self._post("auth/w/ext/pay/invoice/create", body=body))
return InvoiceSubmission.parse(data)
@@ -53,7 +78,7 @@ class RestMerchantEndpoints(Middleware):
response = self._post("auth/r/ext/pay/invoices", body=body)
return [ InvoiceSubmission.parse(sub_data) for sub_data in to_snake_case_keys(response) ]
return [ InvoiceSubmission.parse(sub_data) for sub_data in _to_snake_case_keys(response) ]
def get_invoices_paginated(self,
page: int = 1,
@@ -66,13 +91,13 @@ class RestMerchantEndpoints(Middleware):
crypto: Optional[List[str]] = None,
id: Optional[str] = None,
order_id: Optional[str] = None) -> InvoicePage:
body = to_camel_case_keys({
body = _to_camel_case_keys({
"page": page, "page_size": page_size, "sort": sort,
"sort_field": sort_field, "status": status, "fiat": fiat,
"crypto": crypto, "id": id, "order_id": order_id
})
data = to_snake_case_keys(self._post("auth/r/ext/pay/invoices/paginated", body=body))
data = _to_snake_case_keys(self._post("auth/r/ext/pay/invoices/paginated", body=body))
return InvoicePage.parse(data)
@@ -94,7 +119,7 @@ class RestMerchantEndpoints(Middleware):
*,
deposit_id: Optional[int] = None,
ledger_id: Optional[int] = None) -> InvoiceSubmission:
return InvoiceSubmission.parse(to_snake_case_keys(self._post("auth/w/ext/pay/invoice/complete", body={
return InvoiceSubmission.parse(_to_snake_case_keys(self._post("auth/w/ext/pay/invoice/complete", body={
"id": id, "payCcy": pay_currency, "depositId": deposit_id,
"ledgerId": ledger_id
})))
@@ -102,7 +127,7 @@ class RestMerchantEndpoints(Middleware):
def expire_invoice(self, id: str) -> InvoiceSubmission:
body = { "id": id }
response = self._post("auth/w/ext/pay/invoice/expire", body=body)
return InvoiceSubmission.parse(to_snake_case_keys(response))
return InvoiceSubmission.parse(_to_snake_case_keys(response))
def get_currency_conversion_list(self) -> List[CurrencyConversion]:
return [
@@ -148,7 +173,7 @@ class RestMerchantEndpoints(Middleware):
unlinked: Optional[bool] = None) -> List[MerchantDeposit]:
body = { "from": start, "to": end, "ccy": ccy, "unlinked": unlinked }
response = self._post("auth/r/ext/pay/deposits", body=body)
return [ MerchantDeposit(**sub_data) for sub_data in to_snake_case_keys(response) ]
return [ MerchantDeposit(**sub_data) for sub_data in _to_snake_case_keys(response) ]
def get_unlinked_deposits(self,
ccy: str,
@@ -157,4 +182,4 @@ class RestMerchantEndpoints(Middleware):
end: Optional[int] = None) -> List[MerchantUnlinkedDeposit]:
body = { "ccy": ccy, "start": start, "end": end }
response = self._post("/auth/r/ext/pay/deposits/unlinked", body=body)
return [ MerchantUnlinkedDeposit(**sub_data) for sub_data in to_snake_case_keys(response) ]
return [ MerchantUnlinkedDeposit(**sub_data) for sub_data in _to_snake_case_keys(response) ]

View File

@@ -1,8 +1,12 @@
from typing import List, Union, Literal, Optional, Any, cast
from typing import List, Dict, Union, Literal, Optional, Any, cast
from decimal import Decimal
from .. types import \
from ..middleware import Middleware
from ..enums import Config, Sort
from ...types import \
PlatformStatus, TradingPairTicker, FundingCurrencyTicker, \
TickersHistory, TradingPairTrade, FundingCurrencyTrade, \
TradingPairBook, FundingCurrencyBook, TradingPairRawBook, \
@@ -11,9 +15,7 @@ from .. types import \
FundingStatistic, PulseProfile, PulseMessage, \
TradingMarketAveragePrice, FundingMarketAveragePrice, FxRate
from .. import serializers
from .. enums import Config, Sort
from .. middleware import Middleware
from ...types import serializers
class RestPublicEndpoints(Middleware):
def conf(self, config: Config) -> Any:
@@ -22,37 +24,46 @@ class RestPublicEndpoints(Middleware):
def get_platform_status(self) -> PlatformStatus:
return serializers.PlatformStatus.parse(*self._get("platform/status"))
def get_tickers(self, symbols: List[str]) -> List[Union[TradingPairTicker, FundingCurrencyTicker]]:
def get_tickers(self, symbols: List[str]) -> Dict[str, Union[TradingPairTicker, FundingCurrencyTicker]]:
data = self._get("tickers", params={ "symbols": ",".join(symbols) })
parsers = { "t": serializers.TradingPairTicker.parse, "f": serializers.FundingCurrencyTicker.parse }
return [ cast(Union[TradingPairTicker, FundingCurrencyTicker], \
parsers[sub_data[0][0]](*sub_data)) for sub_data in data ]
return {
symbol: cast(Union[TradingPairTicker, FundingCurrencyTicker],
parsers[symbol[0]](*sub_data)) for sub_data in data
if (symbol := sub_data.pop(0))
}
def get_t_tickers(self, pairs: Union[List[str], Literal["ALL"]]) -> List[TradingPairTicker]:
if isinstance(pairs, str) and pairs == "ALL":
return [ cast(TradingPairTicker, sub_data) for sub_data in self.get_tickers([ "ALL" ]) \
if cast(str, sub_data.symbol).startswith("t") ]
def get_t_tickers(self, symbols: Union[List[str], Literal["ALL"]]) -> Dict[str, TradingPairTicker]:
if isinstance(symbols, str) and symbols == "ALL":
return {
symbol: cast(TradingPairTicker, sub_data)
for symbol, sub_data in self.get_tickers([ "ALL" ]).items()
if symbol.startswith("t")
}
data = self.get_tickers(list(pairs))
data = self.get_tickers(list(symbols))
return cast(List[TradingPairTicker], data)
return cast(Dict[str, TradingPairTicker], data)
def get_f_tickers(self, currencies: Union[List[str], Literal["ALL"]]) -> List[FundingCurrencyTicker]:
if isinstance(currencies, str) and currencies == "ALL":
return [ cast(FundingCurrencyTicker, sub_data) for sub_data in self.get_tickers([ "ALL" ]) \
if cast(str, sub_data.symbol).startswith("f") ]
def get_f_tickers(self, symbols: Union[List[str], Literal["ALL"]]) -> Dict[str, FundingCurrencyTicker]:
if isinstance(symbols, str) and symbols == "ALL":
return {
symbol: cast(FundingCurrencyTicker, sub_data)
for symbol, sub_data in self.get_tickers([ "ALL" ]).items()
if symbol.startswith("f")
}
data = self.get_tickers(list(currencies))
data = self.get_tickers(list(symbols))
return cast(List[FundingCurrencyTicker], data)
return cast(Dict[str, FundingCurrencyTicker], data)
def get_t_ticker(self, pair: str) -> TradingPairTicker:
return serializers.TradingPairTicker.parse(*([pair] + self._get(f"ticker/{pair}")))
def get_t_ticker(self, symbol: str) -> TradingPairTicker:
return serializers.TradingPairTicker.parse(*self._get(f"ticker/{symbol}"))
def get_f_ticker(self, currency: str) -> FundingCurrencyTicker:
return serializers.FundingCurrencyTicker.parse(*([currency] + self._get(f"ticker/{currency}")))
def get_f_ticker(self, symbol: str) -> FundingCurrencyTicker:
return serializers.FundingCurrencyTicker.parse(*self._get(f"ticker/{symbol}"))
def get_tickers_history(self,
symbols: List[str],
@@ -164,26 +175,29 @@ class RestPublicEndpoints(Middleware):
data = self._get(f"candles/trade:{tf}:{symbol}/last", params=params)
return serializers.Candle.parse(*data)
def get_derivatives_status(self, keys: Union[List[str], Literal["ALL"]]) -> List[DerivativesStatus]:
def get_derivatives_status(self, keys: Union[List[str], Literal["ALL"]]) -> Dict[str, DerivativesStatus]:
if keys == "ALL":
params = { "keys": "ALL" }
else: params = { "keys": ",".join(keys) }
data = self._get("status/deriv", params=params)
return [ serializers.DerivativesStatus.parse(*sub_data) for sub_data in data ]
return {
key: serializers.DerivativesStatus.parse(*sub_data)
for sub_data in data
if (key := sub_data.pop(0))
}
def get_derivatives_status_history(self,
type: str,
symbol: str,
key: str,
*,
sort: Optional[Sort] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[DerivativesStatus]:
params = { "sort": sort, "start": start, "end": end, "limit": limit }
data = self._get(f"status/{type}/{symbol}/hist", params=params)
return [ serializers.DerivativesStatus.parse(*([symbol] + sub_data)) for sub_data in data ]
data = self._get(f"status/deriv/{key}/hist", params=params)
return [ serializers.DerivativesStatus.parse(*sub_data) for sub_data in data ]
def get_liquidations(self,
*,

View File

@@ -1,4 +1,4 @@
from .. exceptions import BfxBaseException
from ..exceptions import BfxBaseException
__all__ = [
"BfxRestException",

View File

@@ -1,16 +1,14 @@
import unittest
from .test_rest_serializers import TestRestSerializers
from .test_websocket_serializers import TestWebsocketSerializers
from .test_labeler import TestLabeler
from .test_notification import TestNotification
from .test_types_labeler import TestTypesLabeler
from .test_types_notification import TestTypesNotification
from .test_types_serializers import TestTypesSerializers
def suite():
return unittest.TestSuite([
unittest.makeSuite(TestRestSerializers),
unittest.makeSuite(TestWebsocketSerializers),
unittest.makeSuite(TestLabeler),
unittest.makeSuite(TestNotification),
unittest.makeSuite(TestTypesLabeler),
unittest.makeSuite(TestTypesNotification),
unittest.makeSuite(TestTypesSerializers),
])
if __name__ == "__main__":

View File

@@ -3,10 +3,10 @@ import unittest
from typing import Optional
from dataclasses import dataclass
from ..exceptions import LabelerSerializerException
from ..labeler import _Type, generate_labeler_serializer, generate_recursive_serializer
class TestLabeler(unittest.TestCase):
from ..types.labeler import _Type, generate_labeler_serializer, generate_recursive_serializer
class TestTypesLabeler(unittest.TestCase):
def test_generate_labeler_serializer(self):
@dataclass
class Test(_Type):
@@ -24,8 +24,8 @@ class TestLabeler(unittest.TestCase):
self.assertListEqual(serializer.get_labels(), [ "A", "B", "C" ],
msg="_Serializer::get_labels() should return the right list of labels.")
with self.assertRaises(LabelerSerializerException,
msg="_Serializer should raise LabelerSerializerException if given " \
with self.assertRaises(AssertionError,
msg="_Serializer should raise an AssertionError if given " \
"fewer arguments than the serializer labels."):
serializer.parse(5, 65.0, "X")

View File

@@ -1,11 +1,11 @@
import unittest
from dataclasses import dataclass
from ..labeler import generate_labeler_serializer
from ..notification import _Type, _Notification, Notification
from ..types.labeler import generate_labeler_serializer
from ..types.notification import _Type, _Notification, Notification
class TestNotification(unittest.TestCase):
def test_notification(self):
class TestTypesNotification(unittest.TestCase):
def test_types_notification(self):
@dataclass
class Test(_Type):
A: int

View File

@@ -1,13 +1,9 @@
#pylint: disable=duplicate-code
import unittest
from ..types import serializers
from ..types.labeler import _Type
from ..labeler import _Type
from ..rest import serializers
class TestRestSerializers(unittest.TestCase):
def test_rest_serializers(self):
class TestTypesSerializers(unittest.TestCase):
def test_types_serializers(self):
for serializer in map(serializers.__dict__.get, serializers.__serializers__):
self.assertTrue(issubclass(serializer.klass, _Type),
f"_Serializer <{serializer.name}>: .klass field must be a subclass " \

View File

@@ -1,21 +0,0 @@
#pylint: disable=duplicate-code
import unittest
from ..labeler import _Type
from ..websocket import serializers
class TestWebsocketSerializers(unittest.TestCase):
def test_websocket_serializers(self):
for serializer in map(serializers.__dict__.get, serializers.__serializers__):
self.assertTrue(issubclass(serializer.klass, _Type),
f"_Serializer <{serializer.name}>: .klass field must be a subclass " \
f"of _Type (got {serializer.klass}).")
self.assertListEqual(serializer.get_labels(), list(serializer.klass.__annotations__),
f"_Serializer <{serializer.name}> and _Type <{serializer.klass.__name__}> " \
"must have matching labels and fields.")
if __name__ == "__main__":
unittest.main()

26
bfxapi/types/__init__.py Normal file
View File

@@ -0,0 +1,26 @@
from .dataclasses import JSON, \
PlatformStatus, TradingPairTicker, FundingCurrencyTicker, \
TickersHistory, TradingPairTrade, FundingCurrencyTrade, \
TradingPairBook, FundingCurrencyBook, TradingPairRawBook, \
FundingCurrencyRawBook, Statistic, Candle, \
DerivativesStatus, Liquidation, Leaderboard, \
FundingStatistic, PulseProfile, PulseMessage, \
TradingMarketAveragePrice, FundingMarketAveragePrice, FxRate
from .dataclasses import \
UserInfo, LoginHistory, BalanceAvailable, \
Order, Position, Trade, \
FundingTrade, OrderTrade, Ledger, \
FundingOffer, FundingCredit, FundingLoan, \
FundingAutoRenew, FundingInfo, Wallet, \
Transfer, Withdrawal, DepositAddress, \
LightningNetworkInvoice, Movement, SymbolMarginInfo, \
BaseMarginInfo, PositionClaim, PositionIncreaseInfo, \
PositionIncrease, PositionHistory, PositionSnapshot, \
PositionAudit, DerivativePositionCollateral, DerivativePositionCollateralLimits
from .dataclasses import \
InvoiceSubmission, InvoicePage, InvoiceStats, \
CurrencyConversion, MerchantDeposit, MerchantUnlinkedDeposit
from .notification import Notification

View File

@@ -1,18 +1,13 @@
#pylint: disable=duplicate-code
#pylint: disable-next=wildcard-import,unused-wildcard-import
from typing import *
from typing import Union, Type, \
List, Dict, Literal, Optional, Any
from dataclasses import dataclass
from .. labeler import _Type, partial, compose
from .labeler import _Type, partial, compose
#pylint: disable-next=unused-import
from .. notification import Notification
JSON = Union[Dict[str, "JSON"], List["JSON"], bool, int, float, str, Type[None]]
from ..utils.json_encoder import JSON
#region Type hinting for Rest Public Endpoints
#region Dataclass definitions for types of public use
@dataclass
class PlatformStatus(_Type):
@@ -20,7 +15,6 @@ class PlatformStatus(_Type):
@dataclass
class TradingPairTicker(_Type):
symbol: str
bid: float
bid_size: float
ask: float
@@ -34,7 +28,6 @@ class TradingPairTicker(_Type):
@dataclass
class FundingCurrencyTicker(_Type):
symbol: str
frr: float
bid: float
bid_period: int
@@ -114,7 +107,6 @@ class Candle(_Type):
@dataclass
class DerivativesStatus(_Type):
key: str
mts: int
deriv_price: float
spot_price: float
@@ -201,7 +193,7 @@ class FxRate(_Type):
#endregion
#region Type hinting for Rest Authenticated Endpoints
#region Dataclass definitions for types of auth use
@dataclass
class UserInfo(_Type):
@@ -568,7 +560,7 @@ class DerivativePositionCollateralLimits(_Type):
#endregion
#region Type hinting for Rest Merchant Endpoints
#region Dataclass definitions for types of merchant use
@compose(dataclass, partial)
class InvoiceSubmission(_Type):

View File

@@ -1,6 +1,5 @@
from typing import Type, Generic, TypeVar, Iterable, Dict, List, Tuple, Any, cast
from .exceptions import LabelerSerializerException
from typing import Type, Generic, TypeVar, Iterable, \
Dict, List, Tuple, Any, cast
T = TypeVar("T", bound="_Type")
@@ -43,7 +42,7 @@ class _Serializer(Generic[T]):
args = tuple(_Serializer.__flatten(list(args)))
if len(self.__labels) > len(args):
raise LabelerSerializerException(f"{self.name} -> <labels> and <*args> " \
raise AssertionError(f"{self.name} -> <labels> and <*args> " \
"arguments should contain the same amount of elements.")
for index, label in enumerate(self.__labels):

View File

@@ -1,11 +1,10 @@
#pylint: disable=duplicate-code
from .import dataclasses
from . import types
from .. labeler import generate_labeler_serializer, generate_recursive_serializer
from .labeler import \
generate_labeler_serializer, generate_recursive_serializer
#pylint: disable-next=unused-import
from .. notification import _Notification
from .notification import _Notification
__serializers__ = [
"PlatformStatus", "TradingPairTicker", "FundingCurrencyTicker",
@@ -28,11 +27,11 @@ __serializers__ = [
"PositionAudit", "DerivativePositionCollateral", "DerivativePositionCollateralLimits",
]
#region Serializers definition for Rest Public Endpoints
#region Serializer definitions for types of public use
PlatformStatus = generate_labeler_serializer(
name="PlatformStatus",
klass=types.PlatformStatus,
klass=dataclasses.PlatformStatus,
labels=[
"status"
]
@@ -40,9 +39,8 @@ PlatformStatus = generate_labeler_serializer(
TradingPairTicker = generate_labeler_serializer(
name="TradingPairTicker",
klass=types.TradingPairTicker,
klass=dataclasses.TradingPairTicker,
labels=[
"symbol",
"bid",
"bid_size",
"ask",
@@ -58,9 +56,8 @@ TradingPairTicker = generate_labeler_serializer(
FundingCurrencyTicker = generate_labeler_serializer(
name="FundingCurrencyTicker",
klass=types.FundingCurrencyTicker,
klass=dataclasses.FundingCurrencyTicker,
labels=[
"symbol",
"frr",
"bid",
"bid_period",
@@ -82,7 +79,7 @@ FundingCurrencyTicker = generate_labeler_serializer(
TickersHistory = generate_labeler_serializer(
name="TickersHistory",
klass=types.TickersHistory,
klass=dataclasses.TickersHistory,
labels=[
"symbol",
"bid",
@@ -102,7 +99,7 @@ TickersHistory = generate_labeler_serializer(
TradingPairTrade = generate_labeler_serializer(
name="TradingPairTrade",
klass=types.TradingPairTrade,
klass=dataclasses.TradingPairTrade,
labels=[
"id",
"mts",
@@ -113,7 +110,7 @@ TradingPairTrade = generate_labeler_serializer(
FundingCurrencyTrade = generate_labeler_serializer(
name="FundingCurrencyTrade",
klass=types.FundingCurrencyTrade,
klass=dataclasses.FundingCurrencyTrade,
labels=[
"id",
"mts",
@@ -125,7 +122,7 @@ FundingCurrencyTrade = generate_labeler_serializer(
TradingPairBook = generate_labeler_serializer(
name="TradingPairBook",
klass=types.TradingPairBook,
klass=dataclasses.TradingPairBook,
labels=[
"price",
"count",
@@ -135,7 +132,7 @@ TradingPairBook = generate_labeler_serializer(
FundingCurrencyBook = generate_labeler_serializer(
name="FundingCurrencyBook",
klass=types.FundingCurrencyBook,
klass=dataclasses.FundingCurrencyBook,
labels=[
"rate",
"period",
@@ -146,7 +143,7 @@ FundingCurrencyBook = generate_labeler_serializer(
TradingPairRawBook = generate_labeler_serializer(
name="TradingPairRawBook",
klass=types.TradingPairRawBook,
klass=dataclasses.TradingPairRawBook,
labels=[
"order_id",
"price",
@@ -156,7 +153,7 @@ TradingPairRawBook = generate_labeler_serializer(
FundingCurrencyRawBook = generate_labeler_serializer(
name="FundingCurrencyRawBook",
klass=types.FundingCurrencyRawBook,
klass=dataclasses.FundingCurrencyRawBook,
labels=[
"offer_id",
"period",
@@ -167,7 +164,7 @@ FundingCurrencyRawBook = generate_labeler_serializer(
Statistic = generate_labeler_serializer(
name="Statistic",
klass=types.Statistic,
klass=dataclasses.Statistic,
labels=[
"mts",
"value"
@@ -176,7 +173,7 @@ Statistic = generate_labeler_serializer(
Candle = generate_labeler_serializer(
name="Candle",
klass=types.Candle,
klass=dataclasses.Candle,
labels=[
"mts",
"open",
@@ -189,9 +186,8 @@ Candle = generate_labeler_serializer(
DerivativesStatus = generate_labeler_serializer(
name="DerivativesStatus",
klass=types.DerivativesStatus,
klass=dataclasses.DerivativesStatus,
labels=[
"key",
"mts",
"_PLACEHOLDER",
"deriv_price",
@@ -220,7 +216,7 @@ DerivativesStatus = generate_labeler_serializer(
Liquidation = generate_labeler_serializer(
name="Liquidation",
klass=types.Liquidation,
klass=dataclasses.Liquidation,
labels=[
"_PLACEHOLDER",
"pos_id",
@@ -239,7 +235,7 @@ Liquidation = generate_labeler_serializer(
Leaderboard = generate_labeler_serializer(
name="Leaderboard",
klass=types.Leaderboard,
klass=dataclasses.Leaderboard,
labels=[
"mts",
"_PLACEHOLDER",
@@ -256,7 +252,7 @@ Leaderboard = generate_labeler_serializer(
FundingStatistic = generate_labeler_serializer(
name="FundingStatistic",
klass=types.FundingStatistic,
klass=dataclasses.FundingStatistic,
labels=[
"mts",
"_PLACEHOLDER",
@@ -275,7 +271,7 @@ FundingStatistic = generate_labeler_serializer(
PulseProfile = generate_labeler_serializer(
name="PulseProfile",
klass=types.PulseProfile,
klass=dataclasses.PulseProfile,
labels=[
"puid",
"mts",
@@ -299,7 +295,7 @@ PulseProfile = generate_labeler_serializer(
PulseMessage = generate_recursive_serializer(
name="PulseMessage",
klass=types.PulseMessage,
klass=dataclasses.PulseMessage,
serializers={ "profile": PulseProfile },
labels=[
"pid",
@@ -329,7 +325,7 @@ PulseMessage = generate_recursive_serializer(
TradingMarketAveragePrice = generate_labeler_serializer(
name="TradingMarketAveragePrice",
klass=types.TradingMarketAveragePrice,
klass=dataclasses.TradingMarketAveragePrice,
labels=[
"price_avg",
"amount"
@@ -338,7 +334,7 @@ TradingMarketAveragePrice = generate_labeler_serializer(
FundingMarketAveragePrice = generate_labeler_serializer(
name="FundingMarketAveragePrice",
klass=types.FundingMarketAveragePrice,
klass=dataclasses.FundingMarketAveragePrice,
labels=[
"rate_avg",
"amount"
@@ -347,7 +343,7 @@ FundingMarketAveragePrice = generate_labeler_serializer(
FxRate = generate_labeler_serializer(
name="FxRate",
klass=types.FxRate,
klass=dataclasses.FxRate,
labels=[
"current_rate"
]
@@ -355,11 +351,11 @@ FxRate = generate_labeler_serializer(
#endregion
#region Serializers definition for Rest Authenticated Endpoints
#region Serializer definitions for types of auth use
UserInfo = generate_labeler_serializer(
name="UserInfo",
klass=types.UserInfo,
klass=dataclasses.UserInfo,
labels=[
"id",
"email",
@@ -421,7 +417,7 @@ UserInfo = generate_labeler_serializer(
LoginHistory = generate_labeler_serializer(
name="LoginHistory",
klass=types.LoginHistory,
klass=dataclasses.LoginHistory,
labels=[
"id",
"_PLACEHOLDER",
@@ -436,7 +432,7 @@ LoginHistory = generate_labeler_serializer(
BalanceAvailable = generate_labeler_serializer(
name="BalanceAvailable",
klass=types.BalanceAvailable,
klass=dataclasses.BalanceAvailable,
labels=[
"amount"
]
@@ -444,7 +440,7 @@ BalanceAvailable = generate_labeler_serializer(
Order = generate_labeler_serializer(
name="Order",
klass=types.Order,
klass=dataclasses.Order,
labels=[
"id",
"gid",
@@ -483,7 +479,7 @@ Order = generate_labeler_serializer(
Position = generate_labeler_serializer(
name="Position",
klass=types.Position,
klass=dataclasses.Position,
labels=[
"symbol",
"status",
@@ -510,7 +506,7 @@ Position = generate_labeler_serializer(
Trade = generate_labeler_serializer(
name="Trade",
klass=types.Trade,
klass=dataclasses.Trade,
labels=[
"id",
"symbol",
@@ -529,7 +525,7 @@ Trade = generate_labeler_serializer(
FundingTrade = generate_labeler_serializer(
name="FundingTrade",
klass=types.FundingTrade,
klass=dataclasses.FundingTrade,
labels=[
"id",
"currency",
@@ -543,7 +539,7 @@ FundingTrade = generate_labeler_serializer(
OrderTrade = generate_labeler_serializer(
name="OrderTrade",
klass=types.OrderTrade,
klass=dataclasses.OrderTrade,
labels=[
"id",
"symbol",
@@ -562,7 +558,7 @@ OrderTrade = generate_labeler_serializer(
Ledger = generate_labeler_serializer(
name="Ledger",
klass=types.Ledger,
klass=dataclasses.Ledger,
labels=[
"id",
"currency",
@@ -578,7 +574,7 @@ Ledger = generate_labeler_serializer(
FundingOffer = generate_labeler_serializer(
name="FundingOffer",
klass=types.FundingOffer,
klass=dataclasses.FundingOffer,
labels=[
"id",
"symbol",
@@ -606,7 +602,7 @@ FundingOffer = generate_labeler_serializer(
FundingCredit = generate_labeler_serializer(
name="FundingCredit",
klass=types.FundingCredit,
klass=dataclasses.FundingCredit,
labels=[
"id",
"symbol",
@@ -635,7 +631,7 @@ FundingCredit = generate_labeler_serializer(
FundingLoan = generate_labeler_serializer(
name="FundingLoan",
klass=types.FundingLoan,
klass=dataclasses.FundingLoan,
labels=[
"id",
"symbol",
@@ -663,7 +659,7 @@ FundingLoan = generate_labeler_serializer(
FundingAutoRenew = generate_labeler_serializer(
name="FundingAutoRenew",
klass=types.FundingAutoRenew,
klass=dataclasses.FundingAutoRenew,
labels=[
"currency",
"period",
@@ -674,7 +670,7 @@ FundingAutoRenew = generate_labeler_serializer(
FundingInfo = generate_labeler_serializer(
name="FundingInfo",
klass=types.FundingInfo,
klass=dataclasses.FundingInfo,
labels=[
"yield_loan",
"yield_lend",
@@ -685,7 +681,7 @@ FundingInfo = generate_labeler_serializer(
Wallet = generate_labeler_serializer(
name="Wallet",
klass=types.Wallet,
klass=dataclasses.Wallet,
labels=[
"wallet_type",
"currency",
@@ -699,7 +695,7 @@ Wallet = generate_labeler_serializer(
Transfer = generate_labeler_serializer(
name="Transfer",
klass=types.Transfer,
klass=dataclasses.Transfer,
labels=[
"mts",
"wallet_from",
@@ -714,7 +710,7 @@ Transfer = generate_labeler_serializer(
Withdrawal = generate_labeler_serializer(
name="Withdrawal",
klass=types.Withdrawal,
klass=dataclasses.Withdrawal,
labels=[
"withdrawal_id",
"_PLACEHOLDER",
@@ -730,7 +726,7 @@ Withdrawal = generate_labeler_serializer(
DepositAddress = generate_labeler_serializer(
name="DepositAddress",
klass=types.DepositAddress,
klass=dataclasses.DepositAddress,
labels=[
"_PLACEHOLDER",
"method",
@@ -743,7 +739,7 @@ DepositAddress = generate_labeler_serializer(
LightningNetworkInvoice = generate_labeler_serializer(
name="LightningNetworkInvoice",
klass=types.LightningNetworkInvoice,
klass=dataclasses.LightningNetworkInvoice,
labels=[
"invoice_hash",
"invoice",
@@ -755,7 +751,7 @@ LightningNetworkInvoice = generate_labeler_serializer(
Movement = generate_labeler_serializer(
name="Movement",
klass=types.Movement,
klass=dataclasses.Movement,
labels=[
"id",
"currency",
@@ -784,7 +780,7 @@ Movement = generate_labeler_serializer(
SymbolMarginInfo = generate_labeler_serializer(
name="SymbolMarginInfo",
klass=types.SymbolMarginInfo,
klass=dataclasses.SymbolMarginInfo,
labels=[
"_PLACEHOLDER",
"symbol",
@@ -799,7 +795,7 @@ SymbolMarginInfo = generate_labeler_serializer(
BaseMarginInfo = generate_labeler_serializer(
name="BaseMarginInfo",
klass=types.BaseMarginInfo,
klass=dataclasses.BaseMarginInfo,
labels=[
"user_pl",
"user_swaps",
@@ -811,7 +807,7 @@ BaseMarginInfo = generate_labeler_serializer(
PositionClaim = generate_labeler_serializer(
name="PositionClaim",
klass=types.PositionClaim,
klass=dataclasses.PositionClaim,
labels=[
"symbol",
"position_status",
@@ -838,7 +834,7 @@ PositionClaim = generate_labeler_serializer(
PositionIncreaseInfo = generate_labeler_serializer(
name="PositionIncreaseInfo",
klass=types.PositionIncreaseInfo,
klass=dataclasses.PositionIncreaseInfo,
labels=[
"max_pos",
"current_pos",
@@ -865,7 +861,7 @@ PositionIncreaseInfo = generate_labeler_serializer(
PositionIncrease = generate_labeler_serializer(
name="PositionIncrease",
klass=types.PositionIncrease,
klass=dataclasses.PositionIncrease,
labels=[
"symbol",
"_PLACEHOLDER",
@@ -876,7 +872,7 @@ PositionIncrease = generate_labeler_serializer(
PositionHistory = generate_labeler_serializer(
name="PositionHistory",
klass=types.PositionHistory,
klass=dataclasses.PositionHistory,
labels=[
"symbol",
"status",
@@ -897,7 +893,7 @@ PositionHistory = generate_labeler_serializer(
PositionSnapshot = generate_labeler_serializer(
name="PositionSnapshot",
klass=types.PositionSnapshot,
klass=dataclasses.PositionSnapshot,
labels=[
"symbol",
"status",
@@ -918,7 +914,7 @@ PositionSnapshot = generate_labeler_serializer(
PositionAudit = generate_labeler_serializer(
name="PositionAudit",
klass=types.PositionAudit,
klass=dataclasses.PositionAudit,
labels=[
"symbol",
"status",
@@ -945,7 +941,7 @@ PositionAudit = generate_labeler_serializer(
DerivativePositionCollateral = generate_labeler_serializer(
name="DerivativePositionCollateral",
klass=types.DerivativePositionCollateral,
klass=dataclasses.DerivativePositionCollateral,
labels=[
"status"
]
@@ -953,7 +949,7 @@ DerivativePositionCollateral = generate_labeler_serializer(
DerivativePositionCollateralLimits = generate_labeler_serializer(
name="DerivativePositionCollateralLimits",
klass=types.DerivativePositionCollateralLimits,
klass=dataclasses.DerivativePositionCollateralLimits,
labels=[
"min_collateral",
"max_collateral"

View File

@@ -1,23 +0,0 @@
import re
from typing import TypeVar, Callable, cast
T = TypeVar("T")
_to_snake_case: Callable[[str], str] = lambda string: re.sub(r"(?<!^)(?=[A-Z])", "_", string).lower()
_to_camel_case: Callable[[str], str] = lambda string: \
(components := string.split("_"))[0] + str().join(c.title() for c in components[1:])
def _scheme(data: T, adapter: Callable[[str], str]) -> T:
if isinstance(data, list):
return cast(T, [ _scheme(sub_data, adapter) for sub_data in data ])
if isinstance(data, dict):
return cast(T, { adapter(key): _scheme(value, adapter) for key, value in data.items() })
return data
def to_snake_case_keys(dictionary: T) -> T:
return _scheme(dictionary, _to_snake_case)
def to_camel_case_keys(dictionary: T) -> T:
return _scheme(dictionary, _to_camel_case)

View File

@@ -1 +1 @@
from .client import BfxWebsocketClient, BfxWebsocketBucket, BfxWebsocketInputs
from .client import BfxWebSocketClient, BfxWebSocketBucket, BfxWebSocketInputs

View File

@@ -1,3 +1,3 @@
from .bfx_websocket_client import BfxWebsocketClient
from .bfx_websocket_bucket import BfxWebsocketBucket
from .bfx_websocket_inputs import BfxWebsocketInputs
from .bfx_websocket_client import BfxWebSocketClient
from .bfx_websocket_bucket import BfxWebSocketBucket
from .bfx_websocket_inputs import BfxWebSocketInputs

View File

@@ -19,47 +19,41 @@ def _require_websocket_connection(function: F) -> F:
return cast(F, wrapper)
class BfxWebsocketBucket:
class BfxWebSocketBucket:
VERSION = 2
MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25
def __init__(self, host, event_emitter):
self.host, self.event_emitter, self.on_open_event = host, event_emitter, asyncio.locks.Event()
def __init__(self, host, event_emitter, events_per_subscription):
self.host, self.event_emitter, self.events_per_subscription = host, event_emitter, events_per_subscription
self.websocket, self.subscriptions, self.pendings = None, {}, []
self.on_open_event = asyncio.locks.Event()
self.handler = PublicChannelsHandler(event_emitter=self.event_emitter)
self.handler = PublicChannelsHandler(event_emitter=self.event_emitter, \
events_per_subscription=self.events_per_subscription)
async def connect(self):
reconnection = False
async def _connection():
async with websockets.connect(self.host) as websocket:
self.websocket = websocket
self.on_open_event.set()
await self.__recover_state()
async for websocket in websockets.connect(self.host):
self.websocket = websocket
self.on_open_event.set()
if reconnection or (reconnection := False):
for pending in self.pendings:
await self.websocket.send(json.dumps(pending))
for _, subscription in self.subscriptions.items():
await self.subscribe(**subscription)
self.subscriptions.clear()
try:
async for message in websocket:
message = json.loads(message)
if isinstance(message, dict):
if message["event"] == "subscribed" and (chan_id := message["chanId"]):
self.pendings = \
[ pending for pending in self.pendings if pending["subId"] != message["subId"] ]
self.pendings = [ pending \
for pending in self.pendings if pending["subId"] != message["subId"] ]
self.subscriptions[chan_id] = message
self.event_emitter.emit("subscribed", message)
sub_id = message["subId"]
if "subscribed" not in self.events_per_subscription.get(sub_id, []):
self.events_per_subscription.setdefault(sub_id, []).append("subscribed")
self.event_emitter.emit("subscribed", message)
elif message["event"] == "unsubscribed" and (chan_id := message["chanId"]):
if message["status"] == "OK":
del self.subscriptions[chan_id]
@@ -69,19 +63,25 @@ class BfxWebsocketBucket:
if isinstance(message, list):
if (chan_id := message[0]) and message[1] != _HEARTBEAT:
self.handler.handle(self.subscriptions[chan_id], *message[1:])
except websockets.ConnectionClosedError as error:
if error.code == 1006:
self.on_open_event.clear()
reconnection = True
continue
raise error
try:
await _connection()
except websockets.exceptions.ConnectionClosedError as error:
if error.code in (1006, 1012):
self.on_open_event.clear()
break
async def __recover_state(self):
for pending in self.pendings:
await self.websocket.send(json.dumps(pending))
for _, subscription in self.subscriptions.items():
await self.subscribe(sub_id=subscription.pop("subId"), **subscription)
self.subscriptions.clear()
@_require_websocket_connection
async def subscribe(self, channel, sub_id=None, **kwargs):
if len(self.subscriptions) + len(self.pendings) == BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT:
if len(self.subscriptions) + len(self.pendings) == BfxWebSocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT:
raise TooManySubscriptions("The client has reached the maximum number of subscriptions.")
subscription = {

View File

@@ -8,12 +8,12 @@ import traceback, json, asyncio, hmac, hashlib, time, socket, random, websockets
from pyee.asyncio import AsyncIOEventEmitter
from .bfx_websocket_bucket import _HEARTBEAT, F, _require_websocket_connection, BfxWebsocketBucket
from .bfx_websocket_bucket import _HEARTBEAT, F, _require_websocket_connection, BfxWebSocketBucket
from .bfx_websocket_inputs import BfxWebsocketInputs
from ..handlers import PublicChannelsHandler, AuthenticatedChannelsHandler
from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, \
ZeroConnectionsError, OutdatedClientVersion
from .bfx_websocket_inputs import BfxWebSocketInputs
from ..handlers import PublicChannelsHandler, AuthenticatedEventsHandler
from ..exceptions import WebSocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, \
ZeroConnectionsError, ReconnectionTimeoutError, OutdatedClientVersion
from ...utils.json_encoder import JSONEncoder
@@ -22,7 +22,7 @@ from ...utils.logger import ColorLogger, FileLogger
def _require_websocket_authentication(function: F) -> F:
async def wrapper(self, *args, **kwargs):
if hasattr(self, "authentication") and not self.authentication:
raise WebsocketAuthenticationRequired("To perform this action you need to " \
raise WebSocketAuthenticationRequired("To perform this action you need to " \
"authenticate using your API_KEY and API_SECRET.")
await _require_websocket_connection(function)(self, *args, **kwargs)
@@ -50,29 +50,39 @@ class _Delay:
return (self.__backoff_delay == _Delay.BACKOFF_MIN) \
and self.__initial_delay or self.__backoff_delay
class BfxWebsocketClient:
VERSION = BfxWebsocketBucket.VERSION
class BfxWebSocketClient:
VERSION = BfxWebSocketBucket.VERSION
MAXIMUM_CONNECTIONS_AMOUNT = 20
EVENTS = [
"open", "subscribed", "authenticated", "wss-error",
*PublicChannelsHandler.EVENTS,
*AuthenticatedChannelsHandler.EVENTS
ONCE_EVENTS = [
"open", "authenticated", "disconnection",
*AuthenticatedEventsHandler.ONCE_EVENTS
]
def __init__(self, host, credentials = None, log_filename = None, log_level = "INFO"):
self.websocket, self.buckets, self.authentication = None, [], False
EVENTS = [
"subscribed", "wss-error",
*ONCE_EVENTS,
*PublicChannelsHandler.EVENTS,
*AuthenticatedEventsHandler.ON_EVENTS
]
self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter()
def __init__(self, host, credentials, *, wss_timeout = 60 * 15, log_filename = None, log_level = "INFO"):
self.websocket, self.authentication, self.buckets = None, False, []
self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input)
self.host, self.credentials, self.wss_timeout = host, credentials, wss_timeout
self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter)
self.events_per_subscription = {}
self.event_emitter = AsyncIOEventEmitter()
self.handler = AuthenticatedEventsHandler(event_emitter=self.event_emitter)
self.inputs = BfxWebSocketInputs(handle_websocket_input=self.__handle_websocket_input)
if log_filename is None:
self.logger = ColorLogger("BfxWebsocketClient", level=log_level)
else: self.logger = FileLogger("BfxWebsocketClient", level=log_level, filename=log_filename)
self.logger = ColorLogger("BfxWebSocketClient", level=log_level)
else: self.logger = FileLogger("BfxWebSocketClient", level=log_level, filename=log_filename)
self.event_emitter.add_listener("error",
lambda exception: self.logger.error(f"{type(exception).__name__}: {str(exception)}" + "\n" +
@@ -87,36 +97,47 @@ class BfxWebsocketClient:
self.logger.info("With connections set to 0 it will not be possible to subscribe to any public channel. " \
"Attempting a subscription will cause a ZeroConnectionsError to be thrown.")
if connections > BfxWebsocketClient.MAXIMUM_CONNECTIONS_AMOUNT:
self.logger.warning(f"It is not safe to use more than {BfxWebsocketClient.MAXIMUM_CONNECTIONS_AMOUNT} " \
if connections > BfxWebSocketClient.MAXIMUM_CONNECTIONS_AMOUNT:
self.logger.warning(f"It is not safe to use more than {BfxWebSocketClient.MAXIMUM_CONNECTIONS_AMOUNT} " \
f"buckets from the same connection ({connections} in use), the server could momentarily " \
"block the client with <429 Too Many Requests>.")
for _ in range(connections):
self.buckets += [BfxWebsocketBucket(self.host, self.event_emitter)]
self.buckets += [BfxWebSocketBucket(self.host, self.event_emitter, self.events_per_subscription)]
tasks = [ bucket.connect() for bucket in self.buckets ] + [ self.__connect() ]
await self.__connect()
await asyncio.gather(*tasks)
#pylint: disable-next=too-many-statements
#pylint: disable-next=too-many-statements,too-many-branches
async def __connect(self):
Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"])
reconnection = Reconnection(status=False, attempts=0, timestamp=None)
timer, tasks, on_timeout_event = None, [], asyncio.locks.Event()
reconnection, delay = Reconnection(status=False, attempts=0, timestamp=None), None
delay = None
def _on_wss_timeout():
on_timeout_event.set()
#pylint: disable-next=too-many-branches
async def _connection():
nonlocal reconnection
nonlocal reconnection, timer, tasks
async with websockets.connect(self.host) as websocket:
async with websockets.connect(self.host, ping_interval=None) as websocket:
if reconnection.status:
self.logger.info(f"Reconnect attempt successful (attempt no.{reconnection.attempts}): The " \
self.logger.info(f"Reconnection attempt successful (no.{reconnection.attempts}): The " \
f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " \
f"(connection lost at: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).")
f"(connection lost on: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).")
reconnection = Reconnection(status=False, attempts=0, timestamp=None)
self.websocket, self.authentication = websocket, False
if isinstance(timer, asyncio.events.TimerHandle):
timer.cancel()
self.websocket = websocket
coroutines = [ BfxWebSocketBucket.connect(bucket) for bucket in self.buckets ]
tasks = [ asyncio.create_task(coroutine) for coroutine in coroutines ]
if len(self.buckets) == 0 or \
(await asyncio.gather(*[bucket.on_open_event.wait() for bucket in self.buckets])):
@@ -130,15 +151,15 @@ class BfxWebsocketClient:
if isinstance(message, dict):
if message["event"] == "info" and "version" in message:
if BfxWebsocketClient.VERSION != message["version"]:
if BfxWebSocketClient.VERSION != message["version"]:
raise OutdatedClientVersion("Mismatch between the client version and the server " \
"version. Update the library to the latest version to continue (client version: " \
f"{BfxWebsocketClient.VERSION}, server version: {message['version']}).")
f"{BfxWebSocketClient.VERSION}, server version: {message['version']}).")
elif message["event"] == "info" and message["code"] == 20051:
rcvd = websockets.frames.Close(code=1012,
reason="Stop/Restart Websocket Server (please reconnect).")
reason="Stop/Restart WebSocket Server (please reconnect).")
raise websockets.ConnectionClosedError(rcvd=rcvd, sent=None)
raise websockets.exceptions.ConnectionClosedError(rcvd=rcvd, sent=None)
elif message["event"] == "auth":
if message["status"] != "OK":
raise InvalidAuthenticationCredentials(
@@ -158,30 +179,46 @@ class BfxWebsocketClient:
if reconnection.status:
await asyncio.sleep(delay.next())
if on_timeout_event.is_set():
raise ReconnectionTimeoutError("Connection has been offline for too long " \
f"without being able to reconnect (wss_timeout: {self.wss_timeout}s).")
try:
await _connection()
except (websockets.ConnectionClosedError, socket.gaierror) as error:
if isinstance(error, websockets.ConnectionClosedError) and error.code in (1006, 1012):
if error.code == 1006:
self.logger.error("Connection lost: no close frame received " \
"or sent (1006). Attempting to reconnect...")
except (websockets.exceptions.ConnectionClosedError, socket.gaierror) as error:
if isinstance(error, websockets.exceptions.ConnectionClosedError):
if error.code in (1006, 1012):
if error.code == 1006:
self.logger.error("Connection lost: no close frame received " \
"or sent (1006). Trying to reconnect...")
if error.code == 1012:
self.logger.info("WSS server is about to restart, reconnection " \
"required (client received 20051). Attempt in progress...")
if error.code == 1012:
self.logger.info("WSS server is about to restart, clients need " \
"to reconnect (server sent 20051). Reconnection attempt in progress...")
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now())
for task in tasks:
task.cancel()
delay = _Delay(backoff_factor=1.618)
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now())
if self.wss_timeout is not None:
timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_wss_timeout)
delay = _Delay(backoff_factor=1.618)
self.authentication = False
elif isinstance(error, socket.gaierror) and reconnection.status:
self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " \
f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds. (at the moment " \
self.logger.warning(f"Reconnection attempt was unsuccessful (no.{reconnection.attempts}). " \
f"Next reconnection attempt in {delay.peek():.2f} seconds. (at the moment " \
f"the client has been offline for {datetime.now() - reconnection.timestamp})")
reconnection = reconnection._replace(attempts=reconnection.attempts + 1)
else: raise error
if not reconnection.status:
self.event_emitter.emit("disconnection",
self.websocket.close_code, self.websocket.close_reason)
break
async def __authenticate(self, api_key, api_secret, filters=None):
@@ -215,12 +252,12 @@ class BfxWebsocketClient:
await bucket.unsubscribe(chan_id=chan_id)
async def close(self, code=1000, reason=str()):
if self.websocket is not None and self.websocket.open:
await self.websocket.close(code=code, reason=reason)
for bucket in self.buckets:
await bucket.close(code=code, reason=reason)
if self.websocket is not None and self.websocket.open:
await self.websocket.close(code=code, reason=reason)
@_require_websocket_authentication
async def notify(self, info, message_id=None, **kwargs):
await self.websocket.send(json.dumps([ 0, "n", message_id, { "type": "ucm-test", "info": info, **kwargs } ]))
@@ -231,34 +268,22 @@ class BfxWebsocketClient:
def on(self, *events, callback = None):
for event in events:
if event not in BfxWebsocketClient.EVENTS:
if event not in BfxWebSocketClient.EVENTS:
raise EventNotSupported(f"Event <{event}> is not supported. To get a list " \
"of available events print BfxWebsocketClient.EVENTS")
"of available events print BfxWebSocketClient.EVENTS")
def _register_event(event, function):
if event in BfxWebSocketClient.ONCE_EVENTS:
self.event_emitter.once(event, function)
else: self.event_emitter.on(event, function)
if callback is not None:
for event in events:
self.event_emitter.on(event, callback)
_register_event(event, callback)
if callback is None:
def handler(function):
for event in events:
self.event_emitter.on(event, function)
return handler
def once(self, *events, callback = None):
for event in events:
if event not in BfxWebsocketClient.EVENTS:
raise EventNotSupported(f"Event <{event}> is not supported. To get a list " \
"of available events print BfxWebsocketClient.EVENTS")
if callback is not None:
for event in events:
self.event_emitter.once(event, callback)
if callback is None:
def handler(function):
for event in events:
self.event_emitter.once(event, function)
_register_event(event, function)
return handler

View File

@@ -2,10 +2,10 @@ from decimal import Decimal
from datetime import datetime
from typing import Union, Optional, List, Tuple
from .. enums import OrderType, FundingOfferType
from ...utils.json_encoder import JSON
from ..enums import OrderType, FundingOfferType
from ...types import JSON
class BfxWebsocketInputs:
class BfxWebSocketInputs:
def __init__(self, handle_websocket_input):
self.__handle_websocket_input = handle_websocket_input

View File

@@ -1,5 +1,5 @@
#pylint: disable-next=wildcard-import,unused-wildcard-import
from .. enums import *
from ..enums import *
class Channel(str, Enum):
TICKER = "ticker"

View File

@@ -1,60 +1,59 @@
from .. exceptions import BfxBaseException
from ..exceptions import BfxBaseException
__all__ = [
"BfxWebsocketException",
"BfxWebSocketException",
"ConnectionNotOpen",
"TooManySubscriptions",
"ZeroConnectionsError",
"WebsocketAuthenticationRequired",
"ReconnectionTimeoutError",
"WebSocketAuthenticationRequired",
"InvalidAuthenticationCredentials",
"EventNotSupported",
"HandlerNotFound",
"OutdatedClientVersion"
]
class BfxWebsocketException(BfxBaseException):
class BfxWebSocketException(BfxBaseException):
"""
Base class for all custom exceptions in bfxapi/websocket/exceptions.py.
"""
class ConnectionNotOpen(BfxWebsocketException):
class ConnectionNotOpen(BfxWebSocketException):
"""
This error indicates an attempt to communicate via websocket before starting the connection with the servers.
"""
class TooManySubscriptions(BfxWebsocketException):
class TooManySubscriptions(BfxWebSocketException):
"""
This error indicates a subscription attempt after reaching the limit of simultaneous connections.
"""
class ZeroConnectionsError(BfxWebsocketException):
class ZeroConnectionsError(BfxWebSocketException):
"""
This error indicates an attempt to subscribe to a public channel while the number of connections is 0.
"""
class WebsocketAuthenticationRequired(BfxWebsocketException):
class ReconnectionTimeoutError(BfxWebSocketException):
"""
This error indicates that the connection has been offline for too long without being able to reconnect.
"""
class WebSocketAuthenticationRequired(BfxWebSocketException):
"""
This error indicates an attempt to access a protected resource without logging in first.
"""
class InvalidAuthenticationCredentials(BfxWebsocketException):
class InvalidAuthenticationCredentials(BfxWebSocketException):
"""
This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication.
"""
class EventNotSupported(BfxWebsocketException):
class EventNotSupported(BfxWebSocketException):
"""
This error indicates a failed attempt to subscribe to an event not supported by the BfxWebsocketClient.
This error indicates a failed attempt to subscribe to an event not supported by the BfxWebSocketClient.
"""
class HandlerNotFound(BfxWebsocketException):
"""
This error indicates that a handler was not found for an incoming message.
"""
class OutdatedClientVersion(BfxWebsocketException):
class OutdatedClientVersion(BfxWebSocketException):
"""
This error indicates a mismatch between the client version and the server WSS version.
"""

View File

@@ -1,2 +1,2 @@
from .public_channels_handler import PublicChannelsHandler
from .authenticated_channels_handler import AuthenticatedChannelsHandler
from .authenticated_events_handler import AuthenticatedEventsHandler

View File

@@ -1,20 +1,25 @@
from .. import serializers
from ...types import serializers
from .. serializers import _Notification
from ...types.serializers import _Notification
from .. exceptions import HandlerNotFound
class AuthenticatedEventsHandler:
__once_abbreviations = {
"os": "order_snapshot", "ps": "position_snapshot", "fos": "funding_offer_snapshot",
"fcs": "funding_credit_snapshot", "fls": "funding_loan_snapshot", "ws": "wallet_snapshot"
}
class AuthenticatedChannelsHandler:
__abbreviations = {
"os": "order_snapshot", "on": "order_new", "ou": "order_update",
"oc": "order_cancel", "ps": "position_snapshot", "pn": "position_new",
"pu": "position_update", "pc": "position_close", "te": "trade_executed",
"tu": "trade_execution_update", "fos": "funding_offer_snapshot", "fon": "funding_offer_new",
"fou": "funding_offer_update", "foc": "funding_offer_cancel", "fcs": "funding_credit_snapshot",
__on_abbreviations = {
"on": "order_new", "ou": "order_update", "oc": "order_cancel",
"pn": "position_new", "pu": "position_update", "pc": "position_close",
"fon": "funding_offer_new", "fou": "funding_offer_update", "foc": "funding_offer_cancel",
"fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close",
"fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update",
"flc": "funding_loan_close", "ws": "wallet_snapshot", "wu": "wallet_update",
"bu": "balance_update",
"fln": "funding_loan_new", "flu": "funding_loan_update", "flc": "funding_loan_close",
"te": "trade_execution", "tu": "trade_execution_update", "wu": "wallet_update"
}
__abbreviations = {
**__once_abbreviations,
**__on_abbreviations
}
__serializers = {
@@ -24,16 +29,17 @@ class AuthenticatedChannelsHandler:
("fos", "fon", "fou", "foc",): serializers.FundingOffer,
("fcs", "fcn", "fcu", "fcc",): serializers.FundingCredit,
("fls", "fln", "flu", "flc",): serializers.FundingLoan,
("ws", "wu",): serializers.Wallet,
("bu",): serializers.Balance
("ws", "wu",): serializers.Wallet
}
EVENTS = [
"notification",
"on-req-notification", "ou-req-notification", "oc-req-notification",
"oc_multi-notification",
"fon-req-notification", "foc-req-notification",
*list(__abbreviations.values())
ONCE_EVENTS = [
*list(__once_abbreviations.values())
]
ON_EVENTS = [
*list(__on_abbreviations.values()),
"notification", "on-req-notification", "ou-req-notification",
"oc-req-notification", "fon-req-notification", "foc-req-notification"
]
def __init__(self, event_emitter):
@@ -43,26 +49,21 @@ class AuthenticatedChannelsHandler:
if abbrevation == "n":
return self.__notification(stream)
for abbrevations, serializer in AuthenticatedChannelsHandler.__serializers.items():
for abbrevations, serializer in AuthenticatedEventsHandler.__serializers.items():
if abbrevation in abbrevations:
event = AuthenticatedChannelsHandler.__abbreviations[abbrevation]
event = AuthenticatedEventsHandler.__abbreviations[abbrevation]
if all(isinstance(substream, list) for substream in stream):
return self.event_emitter.emit(event, [ serializer.parse(*substream) for substream in stream ])
return self.event_emitter.emit(event, serializer.parse(*stream))
raise HandlerNotFound(f"No handler found for event of type <{abbrevation}>.")
def __notification(self, stream):
event, serializer = "notification", _Notification(serializer=None)
if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req":
event, serializer = f"{stream[1]}-notification", _Notification(serializer=serializers.Order)
if stream[1] == "oc_multi-req":
event, serializer = f"{stream[1]}-notification", _Notification(serializer=serializers.Order, iterate=True)
if stream[1] == "fon-req" or stream[1] == "foc-req":
event, serializer = f"{stream[1]}-notification", _Notification(serializer=serializers.FundingOffer)

View File

@@ -1,17 +1,23 @@
from .. import serializers
from .. exceptions import HandlerNotFound
from ...types import serializers
class PublicChannelsHandler:
EVENTS = [
"t_ticker_update", "f_ticker_update", "t_trade_executed", "t_trade_execution_update", "f_trade_executed",
"f_trade_execution_update", "t_trades_snapshot", "f_trades_snapshot", "t_book_snapshot", "f_book_snapshot",
"t_raw_book_snapshot", "f_raw_book_snapshot", "t_book_update", "f_book_update", "t_raw_book_update",
"f_raw_book_update", "candles_snapshot", "candles_update", "derivatives_status_update",
ONCE_PER_SUBSCRIPTION_EVENTS = [
"t_trades_snapshot", "f_trades_snapshot", "t_book_snapshot",
"f_book_snapshot", "t_raw_book_snapshot", "f_raw_book_snapshot",
"candles_snapshot"
]
def __init__(self, event_emitter):
self.event_emitter = event_emitter
EVENTS = [
*ONCE_PER_SUBSCRIPTION_EVENTS,
"t_ticker_update", "f_ticker_update", "t_trade_execution",
"t_trade_execution_update", "f_trade_execution", "f_trade_execution_update",
"t_book_update", "f_book_update", "t_raw_book_update",
"f_raw_book_update", "candles_update", "derivatives_status_update"
]
def __init__(self, event_emitter, events_per_subscription):
self.__event_emitter, self.__events_per_subscription = \
event_emitter, events_per_subscription
self.__handlers = {
"ticker": self.__ticker_channel_handler,
@@ -29,18 +35,29 @@ class PublicChannelsHandler:
if (channel := subscription["channel"]) and channel in self.__handlers.keys():
return self.__handlers[channel](_clear(subscription, "event", "channel", "chanId"), *stream)
raise HandlerNotFound(f"No handler found for channel <{subscription['channel']}>.")
def __emit(self, event, sub, data):
sub_id, should_emit_event = sub["subId"], True
if event in PublicChannelsHandler.ONCE_PER_SUBSCRIPTION_EVENTS:
if sub_id not in self.__events_per_subscription:
self.__events_per_subscription[sub_id] = [ event ]
elif event not in self.__events_per_subscription[sub_id]:
self.__events_per_subscription[sub_id] += [ event ]
else: should_emit_event = False
if should_emit_event:
return self.__event_emitter.emit(event, sub, data)
def __ticker_channel_handler(self, subscription, *stream):
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
return self.__emit(
"t_ticker_update",
subscription,
serializers.TradingPairTicker.parse(*stream[0])
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
return self.__emit(
"f_ticker_update",
subscription,
serializers.FundingCurrencyTicker.parse(*stream[0])
@@ -49,28 +66,28 @@ class PublicChannelsHandler:
def __trades_channel_handler(self, subscription, *stream):
if (event := stream[0]) and event in [ "te", "tu", "fte", "ftu" ]:
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
{ "te": "t_trade_executed", "tu": "t_trade_execution_update" }[event],
return self.__emit(
{ "te": "t_trade_execution", "tu": "t_trade_execution_update" }[event],
subscription,
serializers.TradingPairTrade.parse(*stream[1])
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
{ "fte": "f_trade_executed", "ftu": "f_trade_execution_update" }[event],
return self.__emit(
{ "fte": "f_trade_execution", "ftu": "f_trade_execution_update" }[event],
subscription,
serializers.FundingCurrencyTrade.parse(*stream[1])
)
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
return self.__emit(
"t_trades_snapshot",
subscription,
[ serializers.TradingPairTrade.parse(*substream) for substream in stream[0] ]
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
return self.__emit(
"f_trades_snapshot",
subscription,
[ serializers.FundingCurrencyTrade.parse(*substream) for substream in stream[0] ]
@@ -86,14 +103,14 @@ class PublicChannelsHandler:
serializers.TradingPairBook, serializers.FundingCurrencyBook, False
if all(isinstance(substream, list) for substream in stream[0]):
return self.event_emitter.emit(
return self.__emit(
event + "_" + (is_raw_book and "raw_book" or "book") + "_snapshot",
subscription,
[ { "t": _trading_pair_serializer, "f": _funding_currency_serializer }[event] \
.parse(*substream) for substream in stream[0] ]
)
return self.event_emitter.emit(
return self.__emit(
event + "_" + (is_raw_book and "raw_book" or "book") + "_update",
subscription,
{ "t": _trading_pair_serializer, "f": _funding_currency_serializer }[event].parse(*stream[0])
@@ -101,13 +118,13 @@ class PublicChannelsHandler:
def __candles_channel_handler(self, subscription, *stream):
if all(isinstance(substream, list) for substream in stream[0]):
return self.event_emitter.emit(
return self.__emit(
"candles_snapshot",
subscription,
[ serializers.Candle.parse(*substream) for substream in stream[0] ]
)
return self.event_emitter.emit(
return self.__emit(
"candles_update",
subscription,
serializers.Candle.parse(*stream[0])
@@ -115,7 +132,7 @@ class PublicChannelsHandler:
def __status_channel_handler(self, subscription, *stream):
if subscription["key"].startswith("deriv:"):
return self.event_emitter.emit(
return self.__emit(
"derivatives_status_update",
subscription,
serializers.DerivativesStatus.parse(*stream[0])

View File

@@ -1,368 +0,0 @@
#pylint: disable=duplicate-code
from . import types
from .. labeler import generate_labeler_serializer
#pylint: disable-next=unused-import
from .. notification import _Notification
__serializers__ = [
"TradingPairTicker", "FundingCurrencyTicker", "TradingPairTrade",
"FundingCurrencyTrade", "TradingPairBook", "FundingCurrencyBook",
"TradingPairRawBook", "FundingCurrencyRawBook", "Candle",
"DerivativesStatus",
"Order", "Position", "Trade",
"FundingOffer", "FundingCredit", "FundingLoan",
"Wallet", "Balance",
]
#region Serializers definition for Websocket Public Channels
TradingPairTicker = generate_labeler_serializer(
name="TradingPairTicker",
klass=types.TradingPairTicker,
labels=[
"bid",
"bid_size",
"ask",
"ask_size",
"daily_change",
"daily_change_relative",
"last_price",
"volume",
"high",
"low"
]
)
FundingCurrencyTicker = generate_labeler_serializer(
name="FundingCurrencyTicker",
klass=types.FundingCurrencyTicker,
labels=[
"frr",
"bid",
"bid_period",
"bid_size",
"ask",
"ask_period",
"ask_size",
"daily_change",
"daily_change_relative",
"last_price",
"volume",
"high",
"low",
"_PLACEHOLDER",
"_PLACEHOLDER",
"frr_amount_available"
]
)
TradingPairTrade = generate_labeler_serializer(
name="TradingPairTrade",
klass=types.TradingPairTrade,
labels=[
"id",
"mts",
"amount",
"price"
]
)
FundingCurrencyTrade = generate_labeler_serializer(
name="FundingCurrencyTrade",
klass=types.FundingCurrencyTrade,
labels=[
"id",
"mts",
"amount",
"rate",
"period"
]
)
TradingPairBook = generate_labeler_serializer(
name="TradingPairBook",
klass=types.TradingPairBook,
labels=[
"price",
"count",
"amount"
]
)
FundingCurrencyBook = generate_labeler_serializer(
name="FundingCurrencyBook",
klass=types.FundingCurrencyBook,
labels=[
"rate",
"period",
"count",
"amount"
]
)
TradingPairRawBook = generate_labeler_serializer(
name="TradingPairRawBook",
klass=types.TradingPairRawBook,
labels=[
"order_id",
"price",
"amount"
]
)
FundingCurrencyRawBook = generate_labeler_serializer(
name="FundingCurrencyRawBook",
klass=types.FundingCurrencyRawBook,
labels=[
"offer_id",
"period",
"rate",
"amount"
]
)
Candle = generate_labeler_serializer(
name="Candle",
klass=types.Candle,
labels=[
"mts",
"open",
"close",
"high",
"low",
"volume"
]
)
DerivativesStatus = generate_labeler_serializer(
name="DerivativesStatus",
klass=types.DerivativesStatus,
labels=[
"mts",
"_PLACEHOLDER",
"deriv_price",
"spot_price",
"_PLACEHOLDER",
"insurance_fund_balance",
"_PLACEHOLDER",
"next_funding_evt_mts",
"next_funding_accrued",
"next_funding_step",
"_PLACEHOLDER",
"current_funding",
"_PLACEHOLDER",
"_PLACEHOLDER",
"mark_price",
"_PLACEHOLDER",
"_PLACEHOLDER",
"open_interest",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"clamp_min",
"clamp_max"
]
)
#endregion
#region Serializers definition for Websocket Authenticated Channels
Order = generate_labeler_serializer(
name="Order",
klass=types.Order,
labels=[
"id",
"gid",
"cid",
"symbol",
"mts_create",
"mts_update",
"amount",
"amount_orig",
"order_type",
"type_prev",
"mts_tif",
"_PLACEHOLDER",
"flags",
"order_status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"price",
"price_avg",
"price_trailing",
"price_aux_limit",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"notify",
"hidden",
"placed_id",
"_PLACEHOLDER",
"_PLACEHOLDER",
"routing",
"_PLACEHOLDER",
"_PLACEHOLDER",
"meta"
]
)
Position = generate_labeler_serializer(
name="Position",
klass=types.Position,
labels=[
"symbol",
"status",
"amount",
"base_price",
"margin_funding",
"margin_funding_type",
"pl",
"pl_perc",
"price_liq",
"leverage",
"flag",
"position_id",
"mts_create",
"mts_update",
"_PLACEHOLDER",
"type",
"_PLACEHOLDER",
"collateral",
"collateral_min",
"meta"
]
)
Trade = generate_labeler_serializer(
name="Trade",
klass=types.Trade,
labels=[
"id",
"symbol",
"mts_create",
"order_id",
"exec_amount",
"exec_price",
"order_type",
"order_price",
"maker",
"fee",
"fee_currency",
"cid"
]
)
FundingOffer = generate_labeler_serializer(
name="FundingOffer",
klass=types.FundingOffer,
labels=[
"id",
"symbol",
"mts_create",
"mts_update",
"amount",
"amount_orig",
"offer_type",
"_PLACEHOLDER",
"_PLACEHOLDER",
"flags",
"offer_status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"rate",
"period",
"notify",
"hidden",
"_PLACEHOLDER",
"renew",
"_PLACEHOLDER"
]
)
FundingCredit = generate_labeler_serializer(
name="FundingCredit",
klass=types.FundingCredit,
labels=[
"id",
"symbol",
"side",
"mts_create",
"mts_update",
"amount",
"flags",
"status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"rate",
"period",
"mts_opening",
"mts_last_payout",
"notify",
"hidden",
"_PLACEHOLDER",
"renew",
"_PLACEHOLDER",
"no_close",
"position_pair"
]
)
FundingLoan = generate_labeler_serializer(
name="FundingLoan",
klass=types.FundingLoan,
labels=[
"id",
"symbol",
"side",
"mts_create",
"mts_update",
"amount",
"flags",
"status",
"_PLACEHOLDER",
"_PLACEHOLDER",
"_PLACEHOLDER",
"rate",
"period",
"mts_opening",
"mts_last_payout",
"notify",
"hidden",
"_PLACEHOLDER",
"renew",
"_PLACEHOLDER",
"no_close"
]
)
Wallet = generate_labeler_serializer(
name="Wallet",
klass=types.Wallet,
labels=[
"wallet_type",
"currency",
"balance",
"unsettled_interest",
"available_balance",
"last_change",
"trade_details"
]
)
Balance = generate_labeler_serializer(
name="Balance",
klass=types.Balance,
labels=[
"aum",
"aum_net"
]
)
#endregion

View File

@@ -12,7 +12,7 @@ __all__ = [
_Header = TypedDict("_Header", { "event": Literal["subscribed"], "channel": str, "chanId": int })
Subscription = Union["Ticker", "Trades", "Book", "Candles", "Status"]
Subscription = Union[_Header, "Ticker", "Trades", "Book", "Candles", "Status"]
class Ticker(TypedDict):
subId: str

View File

@@ -1,247 +0,0 @@
#pylint: disable=duplicate-code
#pylint: disable-next=wildcard-import,unused-wildcard-import
from typing import *
from dataclasses import dataclass
from .. labeler import _Type
#pylint: disable-next=unused-import
from .. notification import Notification
from ..utils.json_encoder import JSON
#region Type hinting for Websocket Public Channels
@dataclass
class TradingPairTicker(_Type):
bid: float
bid_size: float
ask: float
ask_size: float
daily_change: float
daily_change_relative: float
last_price: float
volume: float
high: float
low: float
@dataclass
class FundingCurrencyTicker(_Type):
frr: float
bid: float
bid_period: int
bid_size: float
ask: float
ask_period: int
ask_size: float
daily_change: float
daily_change_relative: float
last_price: float
volume: float
high: float
low: float
frr_amount_available: float
@dataclass
class TradingPairTrade(_Type):
id: int
mts: int
amount: float
price: float
@dataclass
class FundingCurrencyTrade(_Type):
id: int
mts: int
amount: float
rate: float
period: int
@dataclass
class TradingPairBook(_Type):
price: float
count: int
amount: float
@dataclass
class FundingCurrencyBook(_Type):
rate: float
period: int
count: int
amount: float
@dataclass
class TradingPairRawBook(_Type):
order_id: int
price: float
amount: float
@dataclass
class FundingCurrencyRawBook(_Type):
offer_id: int
period: int
rate: float
amount: float
@dataclass
class Candle(_Type):
mts: int
open: int
close: int
high: int
low: int
volume: float
@dataclass
class DerivativesStatus(_Type):
mts: int
deriv_price: float
spot_price: float
insurance_fund_balance: float
next_funding_evt_mts: int
next_funding_accrued: float
next_funding_step: int
current_funding: float
mark_price: float
open_interest: float
clamp_min: float
clamp_max: float
#endregion
#region Type hinting for Websocket Authenticated Channels
@dataclass
class Order(_Type):
id: int
gid: int
cid: int
symbol: str
mts_create: int
mts_update: int
amount: float
amount_orig: float
order_type: str
type_prev: str
mts_tif: int
flags: int
order_status: str
price: float
price_avg: float
price_trailing: float
price_aux_limit: float
notify: int
hidden: int
placed_id: int
routing: str
meta: JSON
@dataclass
class Position(_Type):
symbol: str
status: str
amount: float
base_price: float
margin_funding: float
margin_funding_type: int
pl: float
pl_perc: float
price_liq: float
leverage: float
flag: int
position_id: int
mts_create: int
mts_update: int
type: int
collateral: float
collateral_min: float
meta: JSON
@dataclass
class Trade(_Type):
id: int
symbol: str
mts_create: int
order_id: int
exec_amount: float
exec_price: float
order_type: str
order_price: float
maker:int
fee: Optional[float]
fee_currency: Optional[str]
cid: int
@dataclass
class FundingOffer(_Type):
id: int
symbol: str
mts_create: int
mts_update: int
amount: float
amount_orig: float
offer_type: str
flags: int
offer_status: str
rate: float
period: int
notify: int
hidden: int
renew: int
@dataclass
class FundingCredit(_Type):
id: int
symbol: str
side: int
mts_create: int
mts_update: int
amount: float
flags: int
status: str
rate: float
period: int
mts_opening: int
mts_last_payout: int
notify: int
hidden: int
renew: int
no_close: int
position_pair: str
@dataclass
class FundingLoan(_Type):
id: int
symbol: str
side: int
mts_create: int
mts_update: int
amount: float
flags: int
status: str
rate: float
period: int
mts_opening: int
mts_last_payout: int
notify: int
hidden: int
renew: int
no_close: int
@dataclass
class Wallet(_Type):
wallet_type: str
currency: str
balance: float
unsettled_interest: float
available_balance: float
last_change: str
trade_details: JSON
@dataclass
class Balance(_Type):
aum: float
aum_net: float
#endregion

View File

@@ -4,7 +4,7 @@ import os
from bfxapi import Client, REST_HOST
from bfxapi.rest.types import Notification, PositionClaim
from bfxapi.types import Notification, PositionClaim
bfx = Client(
rest_host=REST_HOST,

View File

@@ -2,11 +2,12 @@
import os
from typing import List
from bfxapi import Client, REST_HOST
from bfxapi.rest.types import List, Wallet, Transfer, \
DepositAddress, LightningNetworkInvoice, Withdrawal, \
Notification
from bfxapi.types import Wallet, Transfer, DepositAddress, \
LightningNetworkInvoice, Withdrawal, Notification
bfx = Client(
rest_host=REST_HOST,

View File

@@ -4,7 +4,7 @@ import os
from bfxapi import Client, REST_HOST
from bfxapi.rest.types import DerivativePositionCollateral, DerivativePositionCollateralLimits
from bfxapi.types import DerivativePositionCollateral, DerivativePositionCollateralLimits
bfx = Client(
rest_host=REST_HOST,

View File

@@ -3,8 +3,8 @@
import os
from bfxapi import Client, REST_HOST
from bfxapi.types import Notification, FundingOffer
from bfxapi.enums import FundingOfferType, Flag
from bfxapi.rest.types import Notification, FundingOffer
bfx = Client(
rest_host=REST_HOST,

View File

@@ -3,8 +3,8 @@
import os
from bfxapi import Client, REST_HOST
from bfxapi.types import Notification, Order
from bfxapi.enums import OrderType, Flag
from bfxapi.rest.types import Notification, Order
bfx = Client(
rest_host=REST_HOST,

View File

@@ -2,9 +2,11 @@
import os
from typing import List
from bfxapi import Client, REST_HOST
from bfxapi.rest.types import List, FundingLoan, Notification
from bfxapi.types import FundingLoan, Notification
bfx = Client(
rest_host=REST_HOST,

View File

@@ -4,7 +4,7 @@ import os
from bfxapi import Client, REST_HOST
from bfxapi.rest.types import InvoiceSubmission
from bfxapi.types import InvoiceSubmission
bfx = Client(
rest_host=REST_HOST,

View File

@@ -1,8 +1,10 @@
# python -c "import examples.rest.public.book"
from typing import List
from bfxapi import Client, PUB_REST_HOST
from bfxapi.rest.types import List, TradingPairBook, TradingPairRawBook, \
from bfxapi.types import TradingPairBook, TradingPairRawBook, \
FundingCurrencyBook, FundingCurrencyRawBook
bfx = Client(rest_host=PUB_REST_HOST)

View File

@@ -2,9 +2,11 @@
import datetime
from typing import List
from bfxapi import Client, PUB_REST_HOST
from bfxapi.rest.types import List, PulseMessage, PulseProfile
from bfxapi.types import PulseMessage, PulseProfile
bfx = Client(rest_host=PUB_REST_HOST)

View File

@@ -2,7 +2,7 @@
from bfxapi import Client, PUB_REST_HOST
from bfxapi.rest.types import TradingMarketAveragePrice, FundingMarketAveragePrice, FxRate
from bfxapi.types import TradingMarketAveragePrice, FundingMarketAveragePrice, FxRate
bfx = Client(rest_host=PUB_REST_HOST)

View File

@@ -1,8 +1,10 @@
# python -c "import examples.rest.public.trades"
from typing import List
from bfxapi import Client, PUB_REST_HOST
from bfxapi.types import TradingPairTrade, FundingCurrencyTrade
from bfxapi.rest.enums import Sort
from bfxapi.rest.types import List, TradingPairTrade, FundingCurrencyTrade
bfx = Client(rest_host=PUB_REST_HOST)

View File

@@ -4,7 +4,7 @@ import os
from bfxapi import Client, WSS_HOST
from bfxapi.enums import Error, OrderType
from bfxapi.websocket.types import Notification, Order
from bfxapi.types import Notification, Order
bfx = Client(
wss_host=WSS_HOST,

View File

@@ -2,9 +2,11 @@
import os
from typing import List
from bfxapi import Client
from bfxapi.enums import Error
from bfxapi.websocket.types import List, Wallet
from bfxapi.types import Wallet
bfx = Client(
api_key=os.getenv("BFX_API_KEY"),

View File

@@ -1,22 +1,22 @@
# python -c "import examples.websocket.public.derivatives_status"
from bfxapi import Client, PUB_WSS_HOST
from bfxapi.websocket.enums import Error, Channel
from bfxapi.websocket.types import DerivativesStatus
from bfxapi.types import DerivativesStatus
from bfxapi.websocket.subscriptions import Status
from bfxapi.websocket import subscriptions
from bfxapi.websocket.enums import Error, Channel
bfx = Client(wss_host=PUB_WSS_HOST)
@bfx.wss.on("derivatives_status_update")
def on_derivatives_status_update(subscription: subscriptions.Status, data: DerivativesStatus):
def on_derivatives_status_update(subscription: Status, data: DerivativesStatus):
print(f"{subscription}:", data)
@bfx.wss.on("wss-error")
def on_wss_error(code: Error, msg: str):
print(code, msg)
@bfx.wss.once("open")
@bfx.wss.on("open")
async def on_open():
await bfx.wss.subscribe(Channel.STATUS, key="deriv:tBTCF0:USTF0")

View File

@@ -6,9 +6,9 @@ from typing import List
from bfxapi import Client, PUB_WSS_HOST
from bfxapi.websocket import subscriptions
from bfxapi.types import TradingPairBook
from bfxapi.websocket.subscriptions import Book
from bfxapi.websocket.enums import Channel, Error
from bfxapi.websocket.types import TradingPairBook
class OrderBook:
def __init__(self, symbols: List[str]):
@@ -54,12 +54,12 @@ def on_subscribed(subscription):
print(f"Subscription successful for pair <{subscription['pair']}>")
@bfx.wss.on("t_book_snapshot")
def on_t_book_snapshot(subscription: subscriptions.Book, snapshot: List[TradingPairBook]):
def on_t_book_snapshot(subscription: Book, snapshot: List[TradingPairBook]):
for data in snapshot:
order_book.update(subscription["symbol"], data)
@bfx.wss.on("t_book_update")
def on_t_book_update(subscription: subscriptions.Book, data: TradingPairBook):
def on_t_book_update(subscription: Book, data: TradingPairBook):
order_book.update(subscription["symbol"], data)
bfx.wss.run()

View File

@@ -6,9 +6,9 @@ from typing import List
from bfxapi import Client, PUB_WSS_HOST
from bfxapi.websocket import subscriptions
from bfxapi.types import TradingPairRawBook
from bfxapi.websocket.subscriptions import Book
from bfxapi.websocket.enums import Channel, Error
from bfxapi.websocket.types import TradingPairRawBook
class RawOrderBook:
def __init__(self, symbols: List[str]):
@@ -54,12 +54,12 @@ def on_subscribed(subscription):
print(f"Subscription successful for pair <{subscription['pair']}>")
@bfx.wss.on("t_raw_book_snapshot")
def on_t_raw_book_snapshot(subscription: subscriptions.Book, snapshot: List[TradingPairRawBook]):
def on_t_raw_book_snapshot(subscription: Book, snapshot: List[TradingPairRawBook]):
for data in snapshot:
raw_order_book.update(subscription["symbol"], data)
@bfx.wss.on("t_raw_book_update")
def on_t_raw_book_update(subscription: subscriptions.Book, data: TradingPairRawBook):
def on_t_raw_book_update(subscription: Book, data: TradingPairRawBook):
raw_order_book.update(subscription["symbol"], data)
bfx.wss.run()

View File

@@ -2,19 +2,19 @@
from bfxapi import Client, PUB_WSS_HOST
from bfxapi.websocket import subscriptions
from bfxapi.types import TradingPairTicker
from bfxapi.websocket.subscriptions import Ticker
from bfxapi.websocket.enums import Channel
from bfxapi.websocket.types import TradingPairTicker
bfx = Client(wss_host=PUB_WSS_HOST)
@bfx.wss.on("t_ticker_update")
def on_t_ticker_update(subscription: subscriptions.Ticker, data: TradingPairTicker):
def on_t_ticker_update(subscription: Ticker, data: TradingPairTicker):
print(f"Subscription with subId: {subscription['subId']}")
print(f"Data: {data}")
@bfx.wss.once("open")
@bfx.wss.on("open")
async def on_open():
await bfx.wss.subscribe(Channel.TICKER, symbol="tBTCUSD")

View File

@@ -1,26 +1,26 @@
# python -c "import examples.websocket.public.trades"
from bfxapi import Client, PUB_WSS_HOST
from bfxapi.websocket.enums import Error, Channel
from bfxapi.websocket.types import Candle, TradingPairTrade
from bfxapi.websocket import subscriptions
from bfxapi.types import Candle, TradingPairTrade
from bfxapi.websocket.subscriptions import Candles, Trades
from bfxapi.websocket.enums import Error, Channel
bfx = Client(wss_host=PUB_WSS_HOST)
@bfx.wss.on("candles_update")
def on_candles_update(_sub: subscriptions.Candles, candle: Candle):
def on_candles_update(_sub: Candles, candle: Candle):
print(f"New candle: {candle}")
@bfx.wss.on("t_trade_executed")
def on_t_trade_executed(_sub: subscriptions.Trades, trade: TradingPairTrade):
@bfx.wss.on("t_trade_execution")
def on_t_trade_execution(_sub: Trades, trade: TradingPairTrade):
print(f"New trade: {trade}")
@bfx.wss.on("wss-error")
def on_wss_error(code: Error, msg: str):
print(code, msg)
@bfx.wss.once("open")
@bfx.wss.on("open")
async def on_open():
await bfx.wss.subscribe(Channel.CANDLES, key="trade:1m:tBTCUSD")

View File

@@ -32,7 +32,7 @@ setup(
"Source": "https://github.com/bitfinexcom/bitfinex-api-py",
},
packages=[
"bfxapi", "bfxapi.utils",
"bfxapi", "bfxapi.utils", "bfxapi.types",
"bfxapi.websocket", "bfxapi.websocket.client", "bfxapi.websocket.handlers",
"bfxapi.rest", "bfxapi.rest.endpoints", "bfxapi.rest.middleware",
],