From 39f317ba4001095d23e53e06188de28d0718971f Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Mon, 13 Feb 2023 16:22:39 +0100 Subject: [PATCH 01/18] Rename bfxapi.utils.camel_and_snake_case_adapters to bfxapi.utils.camel_and_snake_case_helpers. --- bfxapi/rest/endpoints/rest_merchant_endpoints.py | 2 +- ...d_snake_case_adapters.py => camel_and_snake_case_helpers.py} | 0 2 files changed, 1 insertion(+), 1 deletion(-) rename bfxapi/utils/{camel_and_snake_case_adapters.py => camel_and_snake_case_helpers.py} (100%) diff --git a/bfxapi/rest/endpoints/rest_merchant_endpoints.py b/bfxapi/rest/endpoints/rest_merchant_endpoints.py index dd760c5..8290a2c 100644 --- a/bfxapi/rest/endpoints/rest_merchant_endpoints.py +++ b/bfxapi/rest/endpoints/rest_merchant_endpoints.py @@ -3,7 +3,7 @@ from decimal import Decimal from .. types import * from .. middleware import Middleware -from ... utils.camel_and_snake_case_adapters import to_snake_case_keys, to_camel_case_keys +from ...utils.camel_and_snake_case_helpers import to_snake_case_keys, to_camel_case_keys _CustomerInfo = TypedDict("_CustomerInfo", { "nationality": str, "resid_country": str, "resid_city": str, diff --git a/bfxapi/utils/camel_and_snake_case_adapters.py b/bfxapi/utils/camel_and_snake_case_helpers.py similarity index 100% rename from bfxapi/utils/camel_and_snake_case_adapters.py rename to bfxapi/utils/camel_and_snake_case_helpers.py From 821541134a91c23676820ba5c88a173d1c1937c1 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Mon, 13 Feb 2023 17:29:35 +0100 Subject: [PATCH 02/18] Fix bug and refactor code in bfxapi.rest.types sub-package. --- bfxapi/rest/types.py | 85 ++++++++++++++++++++++---------------------- 1 file changed, 42 insertions(+), 43 deletions(-) diff --git a/bfxapi/rest/types.py b/bfxapi/rest/types.py index b0c7270..5d6b22c 100644 --- a/bfxapi/rest/types.py +++ b/bfxapi/rest/types.py @@ -578,66 +578,65 @@ class InvoiceSubmission(_Type): webhook: Optional[str] redirect_url: Optional[str] status: Literal["CREATED", "PENDING", "COMPLETED", "EXPIRED"] - customer_info: Optional["_CustomerInfo"] - invoices: List["_Invoice"] - payment: Optional["_Payment"] - additional_payments: Optional[List["_Payment"]] - + customer_info: Optional["CustomerInfo"] + invoices: List["Invoice"] + payment: Optional["Payment"] + additional_payments: Optional[List["Payment"]] merchant_name: str @classmethod def parse(cls, data: Dict[str, Any]) -> "InvoiceSubmission": if "customer_info" in data and data["customer_info"] != None: - data["customer_info"] = _CustomerInfo(**data["customer_info"]) + data["customer_info"] = InvoiceSubmission.CustomerInfo(**data["customer_info"]) for index, invoice in enumerate(data["invoices"]): - data["invoices"][index] = _Invoice(**invoice) + data["invoices"][index] = InvoiceSubmission.Invoice(**invoice) if "payment" in data and data["payment"] != None: - data["payment"] = _Payment(**data["payment"]) + data["payment"] = InvoiceSubmission.Payment(**data["payment"]) if "additional_payments" in data and data["additional_payments"] != None: for index, additional_payment in enumerate(data["additional_payments"]): - data["additional_payments"][index] = _Payment(**additional_payment) + data["additional_payments"][index] = InvoiceSubmission.Payment(**additional_payment) return InvoiceSubmission(**data) -@compose(dataclass, partial) -class _CustomerInfo: - nationality: str - resid_country: str - resid_state: Optional[str] - resid_city: str - resid_zip_code: str - resid_street: str - resid_building_no: Optional[str] - full_name: str - email: str - tos_accepted: bool + @compose(dataclass, partial) + class CustomerInfo: + nationality: str + resid_country: str + resid_state: Optional[str] + resid_city: str + resid_zip_code: str + resid_street: str + resid_building_no: Optional[str] + full_name: str + email: str + tos_accepted: bool -@compose(dataclass, partial) -class _Invoice: - amount: float - currency: str - pay_currency: str - pool_currency: str - address: str - ext: JSON + @compose(dataclass, partial) + class Invoice: + amount: float + currency: str + pay_currency: str + pool_currency: str + address: str + ext: JSON -@compose(dataclass, partial) -class _Payment: - txid: str - amount: float - currency: str - method: str - status: Literal["CREATED", "COMPLETED", "PROCESSING"] - confirmations: int - created_at: str - updated_at: str - deposit_id: Optional[int] - ledger_id: Optional[int] - force_completed: Optional[bool] - amount_diff: Optional[str] + @compose(dataclass, partial) + class Payment: + txid: str + amount: float + currency: str + method: str + status: Literal["CREATED", "COMPLETED", "PROCESSING"] + confirmations: int + created_at: str + updated_at: str + deposit_id: Optional[int] + ledger_id: Optional[int] + force_completed: Optional[bool] + amount_diff: Optional[str] @dataclass class InvoiceStats(_Type): From 6a7577f98b30525603971492c51c975741e67c55 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Mon, 13 Feb 2023 17:32:46 +0100 Subject: [PATCH 03/18] Remove Optional typing in bfxapi.rest.types. --- bfxapi/rest/types.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/bfxapi/rest/types.py b/bfxapi/rest/types.py index 5d6b22c..12fdb29 100644 --- a/bfxapi/rest/types.py +++ b/bfxapi/rest/types.py @@ -575,13 +575,13 @@ class InvoiceSubmission(_Type): currency: str order_id: str pay_currencies: List[str] - webhook: Optional[str] - redirect_url: Optional[str] + webhook: str + redirect_url: str status: Literal["CREATED", "PENDING", "COMPLETED", "EXPIRED"] - customer_info: Optional["CustomerInfo"] + customer_info: "CustomerInfo" invoices: List["Invoice"] - payment: Optional["Payment"] - additional_payments: Optional[List["Payment"]] + payment: "Payment" + additional_payments: List["Payment"] merchant_name: str @classmethod @@ -605,11 +605,11 @@ class InvoiceSubmission(_Type): class CustomerInfo: nationality: str resid_country: str - resid_state: Optional[str] + resid_state: str resid_city: str resid_zip_code: str resid_street: str - resid_building_no: Optional[str] + resid_building_no: str full_name: str email: str tos_accepted: bool @@ -633,10 +633,10 @@ class InvoiceSubmission(_Type): confirmations: int created_at: str updated_at: str - deposit_id: Optional[int] - ledger_id: Optional[int] - force_completed: Optional[bool] - amount_diff: Optional[str] + deposit_id: int + ledger_id: int + force_completed: bool + amount_diff: str @dataclass class InvoiceStats(_Type): From f0f150cec2f79d7c40f20cf00a30a364378a27e0 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Mon, 13 Feb 2023 19:09:38 +0100 Subject: [PATCH 04/18] Split websocket package in multiple sub-package. Split handlers.py in public_channels_handler.py and authenticated_channels_handler.py. Rename files attaining to new conventions. --- bfxapi/rest/__init__.py | 3 +- bfxapi/rest/endpoints/__init__.py | 2 + bfxapi/websocket/__init__.py | 4 +- bfxapi/websocket/client/__init__.py | 5 ++ .../bfx_websocket_bucket.py} | 12 +-- .../bfx_websocket_client.py} | 20 ++--- .../bfx_websocket_inputs.py} | 6 +- bfxapi/websocket/handlers/__init__.py | 3 + .../authenticated_channels_handler.py | 71 ++++++++++++++++++ .../public_channels_handler.py} | 74 +------------------ bfxapi/websocket/subscriptions.py | 8 +- 11 files changed, 117 insertions(+), 91 deletions(-) create mode 100644 bfxapi/websocket/client/__init__.py rename bfxapi/websocket/{_BfxWebsocketBucket.py => client/bfx_websocket_bucket.py} (87%) rename bfxapi/websocket/{BfxWebsocketClient.py => client/bfx_websocket_client.py} (89%) rename bfxapi/websocket/{_BfxWebsocketInputs.py => client/bfx_websocket_inputs.py} (96%) create mode 100644 bfxapi/websocket/handlers/__init__.py create mode 100644 bfxapi/websocket/handlers/authenticated_channels_handler.py rename bfxapi/websocket/{handlers.py => handlers/public_channels_handler.py} (60%) diff --git a/bfxapi/rest/__init__.py b/bfxapi/rest/__init__.py index 7ee9fed..71e3b54 100644 --- a/bfxapi/rest/__init__.py +++ b/bfxapi/rest/__init__.py @@ -1,3 +1,4 @@ -from .endpoints import BfxRestInterface, RestPublicEndpoints, RestAuthenticatedEndpoints +from .endpoints import BfxRestInterface, RestPublicEndpoints, RestAuthenticatedEndpoints, \ + RestMerchantEndpoints NAME = "rest" \ No newline at end of file diff --git a/bfxapi/rest/endpoints/__init__.py b/bfxapi/rest/endpoints/__init__.py index 24a005d..e35d6fb 100644 --- a/bfxapi/rest/endpoints/__init__.py +++ b/bfxapi/rest/endpoints/__init__.py @@ -1,5 +1,7 @@ from .bfx_rest_interface import BfxRestInterface + from .rest_public_endpoints import RestPublicEndpoints from .rest_authenticated_endpoints import RestAuthenticatedEndpoints +from .rest_merchant_endpoints import RestMerchantEndpoints NAME = "endpoints" \ No newline at end of file diff --git a/bfxapi/websocket/__init__.py b/bfxapi/websocket/__init__.py index e24f778..1287433 100644 --- a/bfxapi/websocket/__init__.py +++ b/bfxapi/websocket/__init__.py @@ -1 +1,3 @@ -from .BfxWebsocketClient import BfxWebsocketClient \ No newline at end of file +from .client import BfxWebsocketClient, BfxWebsocketBucket, BfxWebsocketInputs + +NAME = "websocket" \ No newline at end of file diff --git a/bfxapi/websocket/client/__init__.py b/bfxapi/websocket/client/__init__.py new file mode 100644 index 0000000..50057cb --- /dev/null +++ b/bfxapi/websocket/client/__init__.py @@ -0,0 +1,5 @@ +from .bfx_websocket_client import BfxWebsocketClient +from .bfx_websocket_bucket import BfxWebsocketBucket +from .bfx_websocket_inputs import BfxWebsocketInputs + +NAME = "client" \ No newline at end of file diff --git a/bfxapi/websocket/_BfxWebsocketBucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py similarity index 87% rename from bfxapi/websocket/_BfxWebsocketBucket.py rename to bfxapi/websocket/client/bfx_websocket_bucket.py index 2cfe48c..550581d 100644 --- a/bfxapi/websocket/_BfxWebsocketBucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -2,9 +2,9 @@ import json, uuid, websockets from typing import Literal, TypeVar, Callable, cast -from .handlers import PublicChannelsHandler +from ..handlers import PublicChannelsHandler -from .exceptions import ConnectionNotOpen, TooManySubscriptions, OutdatedClientVersion +from ..exceptions import ConnectionNotOpen, TooManySubscriptions, OutdatedClientVersion _HEARTBEAT = "hb" @@ -19,7 +19,7 @@ def _require_websocket_connection(function: F) -> F: return cast(F, wrapper) -class _BfxWebsocketBucket(object): +class BfxWebsocketBucket(object): VERSION = 2 MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 @@ -42,8 +42,8 @@ class _BfxWebsocketBucket(object): message = json.loads(message) if isinstance(message, dict) and message["event"] == "info" and "version" in message: - if _BfxWebsocketBucket.VERSION != message["version"]: - raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {_BfxWebsocketBucket.VERSION}, server version: {message['version']}).") + if BfxWebsocketBucket.VERSION != message["version"]: + raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketBucket.VERSION}, server version: {message['version']}).") elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]): self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ] self.subscriptions[chanId] = message @@ -60,7 +60,7 @@ class _BfxWebsocketBucket(object): @_require_websocket_connection async def _subscribe(self, channel, subId=None, **kwargs): - if len(self.subscriptions) + len(self.pendings) == _BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT: + if len(self.subscriptions) + len(self.pendings) == BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT: raise TooManySubscriptions("The client has reached the maximum number of subscriptions.") subscription = { diff --git a/bfxapi/websocket/BfxWebsocketClient.py b/bfxapi/websocket/client/bfx_websocket_client.py similarity index 89% rename from bfxapi/websocket/BfxWebsocketClient.py rename to bfxapi/websocket/client/bfx_websocket_client.py index 98b5b75..8dccf79 100644 --- a/bfxapi/websocket/BfxWebsocketClient.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -4,15 +4,17 @@ from typing import Literal, TypeVar, Callable, cast from pyee.asyncio import AsyncIOEventEmitter -from ._BfxWebsocketBucket import _HEARTBEAT, F, _require_websocket_connection, _BfxWebsocketBucket +from .bfx_websocket_bucket import _HEARTBEAT, F, _require_websocket_connection, BfxWebsocketBucket -from ._BfxWebsocketInputs import _BfxWebsocketInputs -from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler -from .exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported +from .bfx_websocket_inputs import BfxWebsocketInputs +from ..handlers import PublicChannelsHandler, AuthenticatedChannelsHandler +from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported -from ..utils.JSONEncoder import JSONEncoder +from ..enums import Channels -from ..utils.logger import Formatter, CustomLogger +from ...utils.JSONEncoder import JSONEncoder + +from ...utils.logger import Formatter, CustomLogger def _require_websocket_authentication(function: F) -> F: async def wrapper(self, *args, **kwargs): @@ -24,7 +26,7 @@ def _require_websocket_authentication(function: F) -> F: return cast(F, wrapper) class BfxWebsocketClient(object): - VERSION = _BfxWebsocketBucket.VERSION + VERSION = BfxWebsocketBucket.VERSION MAXIMUM_BUCKETS_AMOUNT = 20 @@ -46,9 +48,9 @@ class BfxWebsocketClient(object): self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter) - self.buckets = [ _BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ] + self.buckets = [ BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ] - self.inputs = _BfxWebsocketInputs(self.__handle_websocket_input) + self.inputs = BfxWebsocketInputs(self.__handle_websocket_input) self.logger = CustomLogger("BfxWebsocketClient", logLevel=log_level) diff --git a/bfxapi/websocket/_BfxWebsocketInputs.py b/bfxapi/websocket/client/bfx_websocket_inputs.py similarity index 96% rename from bfxapi/websocket/_BfxWebsocketInputs.py rename to bfxapi/websocket/client/bfx_websocket_inputs.py index 0d9ee0b..48e3137 100644 --- a/bfxapi/websocket/_BfxWebsocketInputs.py +++ b/bfxapi/websocket/client/bfx_websocket_inputs.py @@ -2,10 +2,10 @@ from decimal import Decimal from datetime import datetime from typing import Union, Optional, List, Tuple -from .types import JSON -from .enums import OrderType, FundingOfferType +from ..types import JSON +from ..enums import OrderType, FundingOfferType -class _BfxWebsocketInputs(object): +class BfxWebsocketInputs(object): def __init__(self, __handle_websocket_input): self.__handle_websocket_input = __handle_websocket_input diff --git a/bfxapi/websocket/handlers/__init__.py b/bfxapi/websocket/handlers/__init__.py new file mode 100644 index 0000000..4fe650a --- /dev/null +++ b/bfxapi/websocket/handlers/__init__.py @@ -0,0 +1,3 @@ +from .public_channels_handler import PublicChannelsHandler +from .authenticated_channels_handler import AuthenticatedChannelsHandler +NAME = "handlers" \ No newline at end of file diff --git a/bfxapi/websocket/handlers/authenticated_channels_handler.py b/bfxapi/websocket/handlers/authenticated_channels_handler.py new file mode 100644 index 0000000..2205012 --- /dev/null +++ b/bfxapi/websocket/handlers/authenticated_channels_handler.py @@ -0,0 +1,71 @@ +from typing import List + +from ..types import * + +from .. import serializers + +from ..exceptions import BfxWebsocketException + +class AuthenticatedChannelsHandler(object): + __abbreviations = { + "os": "order_snapshot", "on": "order_new", "ou": "order_update", "oc": "order_cancel", + "ps": "position_snapshot", "pn": "position_new", "pu": "position_update", "pc": "position_close", + "te": "trade_executed", "tu": "trade_execution_update", + "fos": "funding_offer_snapshot", "fon": "funding_offer_new", "fou": "funding_offer_update", "foc": "funding_offer_cancel", + "fcs": "funding_credit_snapshot", "fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close", + "fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update", "flc": "funding_loan_close", + "ws": "wallet_snapshot", "wu": "wallet_update", + "bu": "balance_update", + } + + __serializers = { + ("os", "on", "ou", "oc",): serializers.Order, + ("ps", "pn", "pu", "pc",): serializers.Position, + ("te", "tu"): serializers.Trade, + ("fos", "fon", "fou", "foc",): serializers.FundingOffer, + ("fcs", "fcn", "fcu", "fcc",): serializers.FundingCredit, + ("fls", "fln", "flu", "flc",): serializers.FundingLoan, + ("ws", "wu",): serializers.Wallet, + ("bu",): serializers.Balance + } + + EVENTS = [ + "notification", + "on-req-notification", "ou-req-notification", "oc-req-notification", + "oc_multi-notification", + "fon-req-notification", "foc-req-notification", + *list(__abbreviations.values()) + ] + + def __init__(self, event_emitter, strict = False): + self.event_emitter, self.strict = event_emitter, strict + + def handle(self, type, stream): + if type == "n": + return self.__notification(stream) + + for types, serializer in AuthenticatedChannelsHandler.__serializers.items(): + if type in types: + event = AuthenticatedChannelsHandler.__abbreviations[type] + + if all(isinstance(substream, list) for substream in stream): + return self.event_emitter.emit(event, [ serializer.parse(*substream) for substream in stream ]) + + return self.event_emitter.emit(event, serializer.parse(*stream)) + + if self.strict == True: + raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.") + + def __notification(self, stream): + type, serializer = "notification", serializers._Notification(serializer=None) + + if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req": + type, serializer = f"{stream[1]}-notification", serializers._Notification[Order](serializer=serializers.Order) + + if stream[1] == "oc_multi-req": + type, serializer = f"{stream[1]}-notification", serializers._Notification[List[Order]](serializer=serializers.Order, iterate=True) + + if stream[1] == "fon-req" or stream[1] == "foc-req": + type, serializer = f"{stream[1]}-notification", serializers._Notification[FundingOffer](serializer=serializers.FundingOffer) + + return self.event_emitter.emit(type, serializer.parse(*stream)) \ No newline at end of file diff --git a/bfxapi/websocket/handlers.py b/bfxapi/websocket/handlers/public_channels_handler.py similarity index 60% rename from bfxapi/websocket/handlers.py rename to bfxapi/websocket/handlers/public_channels_handler.py index 686501b..154e677 100644 --- a/bfxapi/websocket/handlers.py +++ b/bfxapi/websocket/handlers/public_channels_handler.py @@ -1,10 +1,8 @@ -from typing import List +from ..types import * -from .types import * +from .. import serializers -from . import serializers -from .enums import Channels -from .exceptions import BfxWebsocketException +from ..enums import Channels class PublicChannelsHandler(object): EVENTS = [ @@ -117,68 +115,4 @@ class PublicChannelsHandler(object): "derivatives_status_update", subscription, serializers.DerivativesStatus.parse(*stream[0]) - ) - -class AuthenticatedChannelsHandler(object): - __abbreviations = { - "os": "order_snapshot", "on": "order_new", "ou": "order_update", "oc": "order_cancel", - "ps": "position_snapshot", "pn": "position_new", "pu": "position_update", "pc": "position_close", - "te": "trade_executed", "tu": "trade_execution_update", - "fos": "funding_offer_snapshot", "fon": "funding_offer_new", "fou": "funding_offer_update", "foc": "funding_offer_cancel", - "fcs": "funding_credit_snapshot", "fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close", - "fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update", "flc": "funding_loan_close", - "ws": "wallet_snapshot", "wu": "wallet_update", - "bu": "balance_update", - } - - __serializers = { - ("os", "on", "ou", "oc",): serializers.Order, - ("ps", "pn", "pu", "pc",): serializers.Position, - ("te", "tu"): serializers.Trade, - ("fos", "fon", "fou", "foc",): serializers.FundingOffer, - ("fcs", "fcn", "fcu", "fcc",): serializers.FundingCredit, - ("fls", "fln", "flu", "flc",): serializers.FundingLoan, - ("ws", "wu",): serializers.Wallet, - ("bu",): serializers.Balance - } - - EVENTS = [ - "notification", - "on-req-notification", "ou-req-notification", "oc-req-notification", - "oc_multi-notification", - "fon-req-notification", "foc-req-notification", - *list(__abbreviations.values()) - ] - - def __init__(self, event_emitter, strict = False): - self.event_emitter, self.strict = event_emitter, strict - - def handle(self, type, stream): - if type == "n": - return self.__notification(stream) - - for types, serializer in AuthenticatedChannelsHandler.__serializers.items(): - if type in types: - event = AuthenticatedChannelsHandler.__abbreviations[type] - - if all(isinstance(substream, list) for substream in stream): - return self.event_emitter.emit(event, [ serializer.parse(*substream) for substream in stream ]) - - return self.event_emitter.emit(event, serializer.parse(*stream)) - - if self.strict == True: - raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.") - - def __notification(self, stream): - type, serializer = "notification", serializers._Notification(serializer=None) - - if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req": - type, serializer = f"{stream[1]}-notification", serializers._Notification[Order](serializer=serializers.Order) - - if stream[1] == "oc_multi-req": - type, serializer = f"{stream[1]}-notification", serializers._Notification[List[Order]](serializer=serializers.Order, iterate=True) - - if stream[1] == "fon-req" or stream[1] == "foc-req": - type, serializer = f"{stream[1]}-notification", serializers._Notification[FundingOffer](serializer=serializers.FundingOffer) - - return self.event_emitter.emit(type, serializer.parse(*stream)) \ No newline at end of file + ) \ No newline at end of file diff --git a/bfxapi/websocket/subscriptions.py b/bfxapi/websocket/subscriptions.py index e22bb5e..5e2d692 100644 --- a/bfxapi/websocket/subscriptions.py +++ b/bfxapi/websocket/subscriptions.py @@ -1,6 +1,8 @@ -from typing import TypedDict, Optional +from typing import TypedDict, Union, Optional __all__ = [ + "Subscription", + "Ticker", "Trades", "Book", @@ -8,6 +10,10 @@ __all__ = [ "Status" ] +_Header = TypedDict("_Header", { "event": str, "channel": str, "subId": str }) + +Subscription = Union["Ticker", "Trades", "Book", "Candles", "Status"] + class Ticker(TypedDict): chanId: int; symbol: str pair: Optional[str] From 17c95027334c7c1d5efe6a0108cbd34c1690b563 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Tue, 14 Feb 2023 16:29:50 +0100 Subject: [PATCH 05/18] Apply fixes and refactoring to the bfxapi.handlers sub-package. --- bfxapi/websocket/exceptions.py | 7 +++++ .../authenticated_channels_handler.py | 22 +++++++-------- .../handlers/public_channels_handler.py | 27 ++++++++++--------- 3 files changed, 32 insertions(+), 24 deletions(-) diff --git a/bfxapi/websocket/exceptions.py b/bfxapi/websocket/exceptions.py index 5691af8..40a6a1e 100644 --- a/bfxapi/websocket/exceptions.py +++ b/bfxapi/websocket/exceptions.py @@ -58,4 +58,11 @@ class InvalidAuthenticationCredentials(BfxWebsocketException): This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication. """ + pass + +class HandlerNotFound(BfxWebsocketException): + """ + This error indicates that a handler was not found for an incoming message. + """ + pass \ No newline at end of file diff --git a/bfxapi/websocket/handlers/authenticated_channels_handler.py b/bfxapi/websocket/handlers/authenticated_channels_handler.py index 2205012..2dbd83f 100644 --- a/bfxapi/websocket/handlers/authenticated_channels_handler.py +++ b/bfxapi/websocket/handlers/authenticated_channels_handler.py @@ -1,10 +1,8 @@ -from typing import List - -from ..types import * - from .. import serializers -from ..exceptions import BfxWebsocketException +from .. types import * + +from .. exceptions import HandlerNotFound class AuthenticatedChannelsHandler(object): __abbreviations = { @@ -37,7 +35,7 @@ class AuthenticatedChannelsHandler(object): *list(__abbreviations.values()) ] - def __init__(self, event_emitter, strict = False): + def __init__(self, event_emitter, strict = True): self.event_emitter, self.strict = event_emitter, strict def handle(self, type, stream): @@ -52,20 +50,20 @@ class AuthenticatedChannelsHandler(object): return self.event_emitter.emit(event, [ serializer.parse(*substream) for substream in stream ]) return self.event_emitter.emit(event, serializer.parse(*stream)) - - if self.strict == True: - raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.") + + if self.strict: + raise HandlerNotFound(f"No handler found for event of type <{type}>.") def __notification(self, stream): type, serializer = "notification", serializers._Notification(serializer=None) if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req": - type, serializer = f"{stream[1]}-notification", serializers._Notification[Order](serializer=serializers.Order) + type, serializer = f"{stream[1]}-notification", serializers._Notification(serializer=serializers.Order) if stream[1] == "oc_multi-req": - type, serializer = f"{stream[1]}-notification", serializers._Notification[List[Order]](serializer=serializers.Order, iterate=True) + type, serializer = f"{stream[1]}-notification", serializers._Notification(serializer=serializers.Order, iterate=True) if stream[1] == "fon-req" or stream[1] == "foc-req": - type, serializer = f"{stream[1]}-notification", serializers._Notification[FundingOffer](serializer=serializers.FundingOffer) + type, serializer = f"{stream[1]}-notification", serializers._Notification(serializer=serializers.FundingOffer) return self.event_emitter.emit(type, serializer.parse(*stream)) \ No newline at end of file diff --git a/bfxapi/websocket/handlers/public_channels_handler.py b/bfxapi/websocket/handlers/public_channels_handler.py index 154e677..d63ea1f 100644 --- a/bfxapi/websocket/handlers/public_channels_handler.py +++ b/bfxapi/websocket/handlers/public_channels_handler.py @@ -1,8 +1,8 @@ -from ..types import * - from .. import serializers -from ..enums import Channels +from .. types import * + +from .. exceptions import HandlerNotFound class PublicChannelsHandler(object): EVENTS = [ @@ -13,23 +13,26 @@ class PublicChannelsHandler(object): "derivatives_status_update", ] - def __init__(self, event_emitter): - self.event_emitter = event_emitter + def __init__(self, event_emitter, strict = True): + self.event_emitter, self.strict = event_emitter, strict self.__handlers = { - Channels.TICKER: self.__ticker_channel_handler, - Channels.TRADES: self.__trades_channel_handler, - Channels.BOOK: self.__book_channel_handler, - Channels.CANDLES: self.__candles_channel_handler, - Channels.STATUS: self.__status_channel_handler + "ticker": self.__ticker_channel_handler, + "trades": self.__trades_channel_handler, + "book": self.__book_channel_handler, + "candles": self.__candles_channel_handler, + "status": self.__status_channel_handler } def handle(self, subscription, *stream): _clear = lambda dictionary, *args: { key: value for key, value in dictionary.items() if key not in args } - if channel := subscription["channel"] or channel in self.__handlers.keys(): + if (channel := subscription["channel"]) and channel in self.__handlers.keys(): return self.__handlers[channel](_clear(subscription, "event", "channel", "subId"), *stream) + if self.strict: + raise HandlerNotFound(f"No handler found for channel <{subscription['channel']}>.") + def __ticker_channel_handler(self, subscription, *stream): if subscription["symbol"].startswith("t"): return self.event_emitter.emit( @@ -46,7 +49,7 @@ class PublicChannelsHandler(object): ) def __trades_channel_handler(self, subscription, *stream): - if type := stream[0] or type in [ "te", "tu", "fte", "ftu" ]: + if (type := stream[0]) and type in [ "te", "tu", "fte", "ftu" ]: if subscription["symbol"].startswith("t"): return self.event_emitter.emit( { "te": "t_trade_executed", "tu": "t_trade_execution_update" }[type], From 49517f9709c708c52ddd29af42c7627bf55e28af Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Tue, 14 Feb 2023 17:03:16 +0100 Subject: [PATCH 06/18] Small fix in package import statements. --- bfxapi/rest/endpoints/rest_authenticated_endpoints.py | 2 +- bfxapi/rest/endpoints/rest_merchant_endpoints.py | 3 ++- bfxapi/rest/types.py | 2 +- bfxapi/utils/JSONEncoder.py | 2 -- bfxapi/websocket/client/bfx_websocket_inputs.py | 4 ++-- bfxapi/websocket/enums.py | 2 +- bfxapi/websocket/types.py | 8 ++++---- 7 files changed, 11 insertions(+), 12 deletions(-) diff --git a/bfxapi/rest/endpoints/rest_authenticated_endpoints.py b/bfxapi/rest/endpoints/rest_authenticated_endpoints.py index 947032b..7b4e11e 100644 --- a/bfxapi/rest/endpoints/rest_authenticated_endpoints.py +++ b/bfxapi/rest/endpoints/rest_authenticated_endpoints.py @@ -1,4 +1,4 @@ -from typing import List, Union, Literal, Optional +from typing import List, Tuple, Union, Literal, Optional from decimal import Decimal from datetime import datetime diff --git a/bfxapi/rest/endpoints/rest_merchant_endpoints.py b/bfxapi/rest/endpoints/rest_merchant_endpoints.py index 8290a2c..0c80110 100644 --- a/bfxapi/rest/endpoints/rest_merchant_endpoints.py +++ b/bfxapi/rest/endpoints/rest_merchant_endpoints.py @@ -1,4 +1,5 @@ -from typing import List, Union, Literal, Optional +from typing import TypedDict, List, Union, Literal, Optional + from decimal import Decimal from .. types import * diff --git a/bfxapi/rest/types.py b/bfxapi/rest/types.py index 12fdb29..e1c39af 100644 --- a/bfxapi/rest/types.py +++ b/bfxapi/rest/types.py @@ -1,4 +1,4 @@ -from typing import Type, Tuple, List, Dict, TypedDict, Union, Optional, Literal, Any +from typing import List, Dict, Optional, Literal, Any from dataclasses import dataclass diff --git a/bfxapi/utils/JSONEncoder.py b/bfxapi/utils/JSONEncoder.py index 5124376..edaba00 100644 --- a/bfxapi/utils/JSONEncoder.py +++ b/bfxapi/utils/JSONEncoder.py @@ -2,8 +2,6 @@ import json from decimal import Decimal from datetime import datetime -from types import SimpleNamespace - from typing import Type, List, Dict, Union, Any JSON = Union[Dict[str, "JSON"], List["JSON"], bool, int, float, str, Type[None]] diff --git a/bfxapi/websocket/client/bfx_websocket_inputs.py b/bfxapi/websocket/client/bfx_websocket_inputs.py index 48e3137..141f817 100644 --- a/bfxapi/websocket/client/bfx_websocket_inputs.py +++ b/bfxapi/websocket/client/bfx_websocket_inputs.py @@ -2,8 +2,8 @@ from decimal import Decimal from datetime import datetime from typing import Union, Optional, List, Tuple -from ..types import JSON -from ..enums import OrderType, FundingOfferType +from .. enums import OrderType, FundingOfferType +from ... utils.JSONEncoder import JSON class BfxWebsocketInputs(object): def __init__(self, __handle_websocket_input): diff --git a/bfxapi/websocket/enums.py b/bfxapi/websocket/enums.py index 8f06f62..b9530db 100644 --- a/bfxapi/websocket/enums.py +++ b/bfxapi/websocket/enums.py @@ -1,4 +1,4 @@ -from ..enums import * +from .. enums import * class Channels(str, Enum): TICKER = "ticker" diff --git a/bfxapi/websocket/types.py b/bfxapi/websocket/types.py index 0ffa870..063836a 100644 --- a/bfxapi/websocket/types.py +++ b/bfxapi/websocket/types.py @@ -1,10 +1,10 @@ -from typing import Type, Tuple, List, Dict, TypedDict, Union, Optional, Any +from typing import Optional from dataclasses import dataclass -from ..labeler import _Type -from ..notification import Notification -from ..utils.JSONEncoder import JSON +from .. labeler import _Type +from .. notification import Notification +from .. utils.JSONEncoder import JSON #region Type hinting for Websocket Public Channels From 99f58ddb0410e4425668bdae5f944c85f056e32b Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Tue, 14 Feb 2023 18:49:45 +0100 Subject: [PATCH 07/18] Add new packages to setup.py. Add new feature in .on and .once methods in bfxapi.websocket.client.bfx_websocket_client. Fix small typo in __init__.py. --- .../websocket/client/bfx_websocket_client.py | 64 +++++++++++-------- bfxapi/websocket/handlers/__init__.py | 1 + setup.py | 2 +- 3 files changed, 38 insertions(+), 29 deletions(-) diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 8dccf79..2439bc0 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -1,6 +1,6 @@ -import traceback, json, asyncio, hmac, hashlib, time, uuid, websockets +import traceback, json, asyncio, hmac, hashlib, time, websockets -from typing import Literal, TypeVar, Callable, cast +from typing import cast from pyee.asyncio import AsyncIOEventEmitter @@ -10,8 +10,6 @@ from .bfx_websocket_inputs import BfxWebsocketInputs from ..handlers import PublicChannelsHandler, AuthenticatedChannelsHandler from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported -from ..enums import Channels - from ...utils.JSONEncoder import JSONEncoder from ...utils.logger import Formatter, CustomLogger @@ -36,28 +34,28 @@ class BfxWebsocketClient(object): *AuthenticatedChannelsHandler.EVENTS ] - def __init__(self, host, buckets=5, log_level = "WARNING", API_KEY=None, API_SECRET=None, filter=None): + def __init__(self, host, API_KEY = None, API_SECRET = None, filter = None, buckets = 5, log_level = "WARNING"): self.host, self.websocket, self.event_emitter = host, None, AsyncIOEventEmitter() - self.event_emitter.add_listener("error", - lambda exception: self.logger.error(str(exception) + "\n" + - str().join(traceback.format_exception(type(exception), exception, exception.__traceback__))[:-1]) - ) - self.API_KEY, self.API_SECRET, self.filter, self.authentication = API_KEY, API_SECRET, filter, False self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter) - self.buckets = [ BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ] - - self.inputs = BfxWebsocketInputs(self.__handle_websocket_input) - self.logger = CustomLogger("BfxWebsocketClient", logLevel=log_level) + self.event_emitter.add_listener("error", + lambda exception: self.logger.error(f"{type(exception).__name__}: {str(exception)}" + "\n" + + str().join(traceback.format_exception(type(exception), exception, exception.__traceback__))[:-1]) + ) + if buckets > BfxWebsocketClient.MAXIMUM_BUCKETS_AMOUNT: self.logger.warning(f"It is not safe to use more than {BfxWebsocketClient.MAXIMUM_BUCKETS_AMOUNT} buckets from the same \ connection ({buckets} in use), the server could momentarily block the client with <429 Too Many Requests>.") + self.buckets = [ BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ] + + self.inputs = BfxWebsocketInputs(self.__handle_websocket_input) + def run(self): return asyncio.run(self.start()) @@ -136,24 +134,34 @@ class BfxWebsocketClient(object): if all(bucket.websocket != None and bucket.websocket.open == True for bucket in self.buckets): self.event_emitter.emit("open") - def on(self, event, callback = None): - if event not in BfxWebsocketClient.EVENTS: - raise EventNotSupported(f"Event <{event}> is not supported. To get a list of available events print BfxWebsocketClient.EVENTS") + def on(self, *events, callback = None): + for event in events: + if event not in BfxWebsocketClient.EVENTS: + raise EventNotSupported(f"Event <{event}> is not supported. To get a list of available events print BfxWebsocketClient.EVENTS") if callback != None: - return self.event_emitter.on(event, callback) + for event in events: + self.event_emitter.on(event, callback) - def handler(function): - self.event_emitter.on(event, function) - return handler + if callback == None: + def handler(function): + for event in events: + self.event_emitter.on(event, function) - def once(self, event, callback = None): - if event not in BfxWebsocketClient.EVENTS: - raise EventNotSupported(f"Event <{event}> is not supported. To get a list of available events print BfxWebsocketClient.EVENTS") + return handler + + def once(self, *events, callback = None): + for event in events: + if event not in BfxWebsocketClient.EVENTS: + raise EventNotSupported(f"Event <{event}> is not supported. To get a list of available events print BfxWebsocketClient.EVENTS") if callback != None: - return self.event_emitter.once(event, callback) + for event in events: + self.event_emitter.once(event, callback) - def handler(function): - self.event_emitter.once(event, function) - return handler \ No newline at end of file + if callback == None: + def handler(function): + for event in events: + self.event_emitter.once(event, function) + + return handler \ No newline at end of file diff --git a/bfxapi/websocket/handlers/__init__.py b/bfxapi/websocket/handlers/__init__.py index 4fe650a..02e9c81 100644 --- a/bfxapi/websocket/handlers/__init__.py +++ b/bfxapi/websocket/handlers/__init__.py @@ -1,3 +1,4 @@ from .public_channels_handler import PublicChannelsHandler from .authenticated_channels_handler import AuthenticatedChannelsHandler + NAME = "handlers" \ No newline at end of file diff --git a/setup.py b/setup.py index 54db508..ce3ae3e 100644 --- a/setup.py +++ b/setup.py @@ -5,7 +5,7 @@ setup( version="3.0.0", packages=[ "bfxapi", "bfxapi.utils", - "bfxapi.websocket", + "bfxapi.websocket", "bfxapi.websocket.client", "bfxapi.websocket.handlers", "bfxapi.rest", "bfxapi.rest.endpoints", "bfxapi.rest.middleware", ], url="https://github.com/bitfinexcom/bitfinex-api-py", From fa9bdfc33351f8221ae41999da9d37db8b3d6a44 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Wed, 15 Feb 2023 21:48:34 +0100 Subject: [PATCH 08/18] Rewrite reconnection system with numerous fixes. --- .../websocket/client/bfx_websocket_bucket.py | 31 +++++--- .../websocket/client/bfx_websocket_client.py | 79 +++++++++++++------ .../websocket/client/bfx_websocket_inputs.py | 18 ++--- bfxapi/websocket/enums.py | 2 +- .../handlers/public_channels_handler.py | 2 +- bfxapi/websocket/subscriptions.py | 14 ++-- examples/websocket/order_book.py | 4 +- examples/websocket/raw_order_book.py | 4 +- examples/websocket/ticker.py | 4 +- 9 files changed, 98 insertions(+), 60 deletions(-) diff --git a/bfxapi/websocket/client/bfx_websocket_bucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py index 550581d..15caffc 100644 --- a/bfxapi/websocket/client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -24,27 +24,35 @@ class BfxWebsocketBucket(object): MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 - def __init__(self, host, event_emitter, __bucket_open_signal): - self.host, self.event_emitter, self.__bucket_open_signal = host, event_emitter, __bucket_open_signal + def __init__(self, host, event_emitter, on_open_event): + self.host, self.event_emitter, self.on_open_event = host, event_emitter, on_open_event self.websocket, self.subscriptions, self.pendings = None, dict(), list() self.handler = PublicChannelsHandler(event_emitter=self.event_emitter) async def _connect(self, index): + reconnection = False + async for websocket in websockets.connect(self.host): self.websocket = websocket - self.__bucket_open_signal(index) + if reconnection == True or (reconnection := False): + for pending in self.pendings: + await self.websocket.send(json.dumps(pending)) + + for _, subscription in self.subscriptions.items(): + await self._subscribe(**subscription) + + self.subscriptions.clear() + + self.on_open_event.set() try: async for message in websocket: message = json.loads(message) - if isinstance(message, dict) and message["event"] == "info" and "version" in message: - if BfxWebsocketBucket.VERSION != message["version"]: - raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketBucket.VERSION}, server version: {message['version']}).") - elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]): + if isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]): self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ] self.subscriptions[chanId] = message self.event_emitter.emit("subscribed", message) @@ -55,8 +63,9 @@ class BfxWebsocketBucket(object): self.event_emitter.emit("wss-error", message["code"], message["msg"]) elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT: self.handler.handle(self.subscriptions[chanId], *message[1:]) - except websockets.ConnectionClosedError: continue - finally: await self.websocket.wait_closed(); break + except websockets.ConnectionClosedError: self.on_open_event.clear(); reconnection = True; continue + + await self.websocket.wait_closed(); break @_require_websocket_connection async def _subscribe(self, channel, subId=None, **kwargs): @@ -64,11 +73,11 @@ class BfxWebsocketBucket(object): raise TooManySubscriptions("The client has reached the maximum number of subscriptions.") subscription = { + **kwargs, + "event": "subscribe", "channel": channel, "subId": subId or str(uuid.uuid4()), - - **kwargs } self.pendings.append(subscription) diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 2439bc0..8f2a54d 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -2,13 +2,17 @@ import traceback, json, asyncio, hmac, hashlib, time, websockets from typing import cast +from collections import namedtuple + +from datetime import datetime + from pyee.asyncio import AsyncIOEventEmitter from .bfx_websocket_bucket import _HEARTBEAT, F, _require_websocket_connection, BfxWebsocketBucket from .bfx_websocket_inputs import BfxWebsocketInputs from ..handlers import PublicChannelsHandler, AuthenticatedChannelsHandler -from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported +from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion from ...utils.JSONEncoder import JSONEncoder @@ -16,7 +20,7 @@ from ...utils.logger import Formatter, CustomLogger def _require_websocket_authentication(function: F) -> F: async def wrapper(self, *args, **kwargs): - if self.authentication == False: + if hasattr(self, "authentication") and self.authentication == False: raise WebsocketAuthenticationRequired("To perform this action you need to authenticate using your API_KEY and API_SECRET.") await _require_websocket_connection(function)(self, *args, **kwargs) @@ -26,7 +30,7 @@ def _require_websocket_authentication(function: F) -> F: class BfxWebsocketClient(object): VERSION = BfxWebsocketBucket.VERSION - MAXIMUM_BUCKETS_AMOUNT = 20 + MAXIMUM_CONNECTIONS_AMOUNT = 20 EVENTS = [ "open", "subscribed", "authenticated", "wss-error", @@ -34,10 +38,12 @@ class BfxWebsocketClient(object): *AuthenticatedChannelsHandler.EVENTS ] - def __init__(self, host, API_KEY = None, API_SECRET = None, filter = None, buckets = 5, log_level = "WARNING"): + def __init__(self, host, API_KEY = None, API_SECRET = None, filter = None, log_level = "WARNING"): self.host, self.websocket, self.event_emitter = host, None, AsyncIOEventEmitter() - self.API_KEY, self.API_SECRET, self.filter, self.authentication = API_KEY, API_SECRET, filter, False + self.API_KEY, self.API_SECRET, self.filter = API_KEY, API_SECRET, filter + + self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input) self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter) @@ -48,36 +54,58 @@ class BfxWebsocketClient(object): str().join(traceback.format_exception(type(exception), exception, exception.__traceback__))[:-1]) ) - if buckets > BfxWebsocketClient.MAXIMUM_BUCKETS_AMOUNT: - self.logger.warning(f"It is not safe to use more than {BfxWebsocketClient.MAXIMUM_BUCKETS_AMOUNT} buckets from the same \ - connection ({buckets} in use), the server could momentarily block the client with <429 Too Many Requests>.") + def run(self, connections = 5): + return asyncio.run(self.start(connections)) - self.buckets = [ BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ] + async def start(self, connections = 5): + if connections > BfxWebsocketClient.MAXIMUM_CONNECTIONS_AMOUNT: + self.logger.warning(f"It is not safe to use more than {BfxWebsocketClient.MAXIMUM_CONNECTIONS_AMOUNT} buckets from the same " + + f"connection ({connections} in use), the server could momentarily block the client with <429 Too Many Requests>.") - self.inputs = BfxWebsocketInputs(self.__handle_websocket_input) + self.on_open_events = [ asyncio.Event() for _ in range(connections) ] - def run(self): - return asyncio.run(self.start()) + self.buckets = [ + BfxWebsocketBucket(self.host, self.event_emitter, self.on_open_events[index]) + for index in range(connections) + ] - async def start(self): tasks = [ bucket._connect(index) for index, bucket in enumerate(self.buckets) ] - if self.API_KEY != None and self.API_SECRET != None: - tasks.append(self.__connect(self.API_KEY, self.API_SECRET, self.filter)) + tasks.append(self.__connect(self.API_KEY, self.API_SECRET, self.filter)) await asyncio.gather(*tasks) async def __connect(self, API_KEY, API_SECRET, filter=None): + Reconnection = namedtuple("Reconnection", ["status", "code", "timestamp"]) + + reconnection = Reconnection(status=False, code=0, timestamp=None) + async for websocket in websockets.connect(self.host): - self.websocket = websocket - - await self.__authenticate(API_KEY, API_SECRET, filter) + self.websocket, self.authentication = websocket, False + + if (await asyncio.gather(*[ on_open_event.wait() for on_open_event in self.on_open_events ])): + self.event_emitter.emit("open") + + if self.API_KEY != None and self.API_SECRET != None: + await self.__authenticate(API_KEY=API_KEY, API_SECRET=API_SECRET, filter=filter) try: async for message in websocket: + if reconnection.status == True: + self.logger.warning(f"Reconnect Attempt Successful (error <{reconnection.code}>): The " + + f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " + + f"(first reconnection attempt: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).") + + reconnection = Reconnection(status=False, code=0, timestamp=None) + message = json.loads(message) - if isinstance(message, dict) and message["event"] == "auth": + if isinstance(message, dict) and message["event"] == "info" and "version" in message: + if BfxWebsocketClient.VERSION != message["version"]: + raise OutdatedClientVersion(f"Mismatch between the client version and the server version. " + + f"Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, " + + f"server version: {message['version']}).") + elif isinstance(message, dict) and message["event"] == "auth": if message["status"] == "OK": self.event_emitter.emit("authenticated", message); self.authentication = True else: raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.") @@ -85,8 +113,13 @@ class BfxWebsocketClient(object): self.event_emitter.emit("wss-error", message["code"], message["msg"]) elif isinstance(message, list) and (chanId := message[0]) == 0 and message[1] != _HEARTBEAT: self.handler.handle(message[1], message[2]) - except websockets.ConnectionClosedError: continue - finally: await self.websocket.wait_closed(); break + except websockets.ConnectionClosedError as error: + self.logger.error(f"Connection terminated due to an error (status code: <{error.code}>) -> {str(error)}. Attempting to reconnect...") + reconnection = Reconnection(status=True, code=error.code, timestamp=datetime.now()); + continue + + if reconnection.status == False: + await self.websocket.wait_closed(); break async def __authenticate(self, API_KEY, API_SECRET, filter=None): data = { "event": "auth", "filter": filter, "apiKey": API_KEY } @@ -130,10 +163,6 @@ class BfxWebsocketClient(object): async def __handle_websocket_input(self, input, data): await self.websocket.send(json.dumps([ 0, input, None, data], cls=JSONEncoder)) - def __bucket_open_signal(self, index): - if all(bucket.websocket != None and bucket.websocket.open == True for bucket in self.buckets): - self.event_emitter.emit("open") - def on(self, *events, callback = None): for event in events: if event not in BfxWebsocketClient.EVENTS: diff --git a/bfxapi/websocket/client/bfx_websocket_inputs.py b/bfxapi/websocket/client/bfx_websocket_inputs.py index 141f817..4b4e04c 100644 --- a/bfxapi/websocket/client/bfx_websocket_inputs.py +++ b/bfxapi/websocket/client/bfx_websocket_inputs.py @@ -6,15 +6,15 @@ from .. enums import OrderType, FundingOfferType from ... utils.JSONEncoder import JSON class BfxWebsocketInputs(object): - def __init__(self, __handle_websocket_input): - self.__handle_websocket_input = __handle_websocket_input + def __init__(self, handle_websocket_input): + self.handle_websocket_input = handle_websocket_input async def submit_order(self, type: OrderType, symbol: str, amount: Union[Decimal, float, str], price: Optional[Union[Decimal, float, str]] = None, lev: Optional[int] = None, price_trailing: Optional[Union[Decimal, float, str]] = None, price_aux_limit: Optional[Union[Decimal, float, str]] = None, price_oco_stop: Optional[Union[Decimal, float, str]] = None, gid: Optional[int] = None, cid: Optional[int] = None, flags: Optional[int] = 0, tif: Optional[Union[datetime, str]] = None, meta: Optional[JSON] = None): - await self.__handle_websocket_input("on", { + await self.handle_websocket_input("on", { "type": type, "symbol": symbol, "amount": amount, "price": price, "lev": lev, "price_trailing": price_trailing, "price_aux_limit": price_aux_limit, "price_oco_stop": price_oco_stop, @@ -26,7 +26,7 @@ class BfxWebsocketInputs(object): cid: Optional[int] = None, cid_date: Optional[str] = None, gid: Optional[int] = None, flags: Optional[int] = 0, lev: Optional[int] = None, delta: Optional[Union[Decimal, float, str]] = None, price_aux_limit: Optional[Union[Decimal, float, str]] = None, price_trailing: Optional[Union[Decimal, float, str]] = None, tif: Optional[Union[datetime, str]] = None): - await self.__handle_websocket_input("ou", { + await self.handle_websocket_input("ou", { "id": id, "amount": amount, "price": price, "cid": cid, "cid_date": cid_date, "gid": gid, "flags": flags, "lev": lev, "delta": delta, @@ -34,12 +34,12 @@ class BfxWebsocketInputs(object): }) async def cancel_order(self, id: Optional[int] = None, cid: Optional[int] = None, cid_date: Optional[str] = None): - await self.__handle_websocket_input("oc", { + await self.handle_websocket_input("oc", { "id": id, "cid": cid, "cid_date": cid_date }) async def cancel_order_multi(self, ids: Optional[List[int]] = None, cids: Optional[List[Tuple[int, str]]] = None, gids: Optional[List[int]] = None, all: bool = False): - await self.__handle_websocket_input("oc_multi", { + await self.handle_websocket_input("oc_multi", { "ids": ids, "cids": cids, "gids": gids, "all": int(all) }) @@ -47,14 +47,14 @@ class BfxWebsocketInputs(object): async def submit_funding_offer(self, type: FundingOfferType, symbol: str, amount: Union[Decimal, float, str], rate: Union[Decimal, float, str], period: int, flags: Optional[int] = 0): - await self.__handle_websocket_input("fon", { + await self.handle_websocket_input("fon", { "type": type, "symbol": symbol, "amount": amount, "rate": rate, "period": period, "flags": flags }) async def cancel_funding_offer(self, id: int): - await self.__handle_websocket_input("foc", { "id": id }) + await self.handle_websocket_input("foc", { "id": id }) async def calc(self, *args: str): - await self.__handle_websocket_input("calc", list(map(lambda arg: [arg], args))) \ No newline at end of file + await self.handle_websocket_input("calc", list(map(lambda arg: [arg], args))) \ No newline at end of file diff --git a/bfxapi/websocket/enums.py b/bfxapi/websocket/enums.py index b9530db..1877cea 100644 --- a/bfxapi/websocket/enums.py +++ b/bfxapi/websocket/enums.py @@ -1,6 +1,6 @@ from .. enums import * -class Channels(str, Enum): +class Channel(str, Enum): TICKER = "ticker" TRADES = "trades" BOOK = "book" diff --git a/bfxapi/websocket/handlers/public_channels_handler.py b/bfxapi/websocket/handlers/public_channels_handler.py index d63ea1f..52e47ef 100644 --- a/bfxapi/websocket/handlers/public_channels_handler.py +++ b/bfxapi/websocket/handlers/public_channels_handler.py @@ -28,7 +28,7 @@ class PublicChannelsHandler(object): _clear = lambda dictionary, *args: { key: value for key, value in dictionary.items() if key not in args } if (channel := subscription["channel"]) and channel in self.__handlers.keys(): - return self.__handlers[channel](_clear(subscription, "event", "channel", "subId"), *stream) + return self.__handlers[channel](_clear(subscription, "event", "channel", "chanId"), *stream) if self.strict: raise HandlerNotFound(f"No handler found for channel <{subscription['channel']}>.") diff --git a/bfxapi/websocket/subscriptions.py b/bfxapi/websocket/subscriptions.py index 5e2d692..10cbbfe 100644 --- a/bfxapi/websocket/subscriptions.py +++ b/bfxapi/websocket/subscriptions.py @@ -1,4 +1,4 @@ -from typing import TypedDict, Union, Optional +from typing import TypedDict, Union, Literal, Optional __all__ = [ "Subscription", @@ -10,22 +10,22 @@ __all__ = [ "Status" ] -_Header = TypedDict("_Header", { "event": str, "channel": str, "subId": str }) +_Header = TypedDict("_Header", { "event": Literal["subscribed"], "channel": str, "chanId": int }) Subscription = Union["Ticker", "Trades", "Book", "Candles", "Status"] class Ticker(TypedDict): - chanId: int; symbol: str + subId: str; symbol: str pair: Optional[str] currency: Optional[str] class Trades(TypedDict): - chanId: int; symbol: str + subId: str; symbol: str pair: Optional[str] currency: Optional[str] class Book(TypedDict): - chanId: int + subId: str symbol: str prec: str freq: str @@ -33,9 +33,9 @@ class Book(TypedDict): pair: str class Candles(TypedDict): - chanId: int + subId: str key: str class Status(TypedDict): - chanId: int + subId: str key: str \ No newline at end of file diff --git a/examples/websocket/order_book.py b/examples/websocket/order_book.py index 82bd105..a419454 100644 --- a/examples/websocket/order_book.py +++ b/examples/websocket/order_book.py @@ -7,7 +7,7 @@ from typing import List from bfxapi import Client, Constants from bfxapi.websocket import subscriptions -from bfxapi.websocket.enums import Channels, Error +from bfxapi.websocket.enums import Channel, Error from bfxapi.websocket.types import TradingPairBook class OrderBook(object): @@ -47,7 +47,7 @@ def on_wss_error(code: Error, msg: str): @bfx.wss.on("open") async def on_open(): for symbol in SYMBOLS: - await bfx.wss.subscribe(Channels.BOOK, symbol=symbol) + await bfx.wss.subscribe(Channel.BOOK, symbol=symbol) @bfx.wss.on("subscribed") def on_subscribed(subscription): diff --git a/examples/websocket/raw_order_book.py b/examples/websocket/raw_order_book.py index 896e351..a039060 100644 --- a/examples/websocket/raw_order_book.py +++ b/examples/websocket/raw_order_book.py @@ -7,7 +7,7 @@ from typing import List from bfxapi import Client, Constants from bfxapi.websocket import subscriptions -from bfxapi.websocket.enums import Channels, Error +from bfxapi.websocket.enums import Channel, Error from bfxapi.websocket.types import TradingPairRawBook class RawOrderBook(object): @@ -47,7 +47,7 @@ def on_wss_error(code: Error, msg: str): @bfx.wss.on("open") async def on_open(): for symbol in SYMBOLS: - await bfx.wss.subscribe(Channels.BOOK, symbol=symbol, prec="R0") + await bfx.wss.subscribe(Channel.BOOK, symbol=symbol, prec="R0") @bfx.wss.on("subscribed") def on_subscribed(subscription): diff --git a/examples/websocket/ticker.py b/examples/websocket/ticker.py index 1c081b2..729f3ea 100644 --- a/examples/websocket/ticker.py +++ b/examples/websocket/ticker.py @@ -3,7 +3,7 @@ from bfxapi import Client, Constants from bfxapi.websocket import subscriptions -from bfxapi.websocket.enums import Channels +from bfxapi.websocket.enums import Channel from bfxapi.websocket.types import TradingPairTicker bfx = Client(WSS_HOST=Constants.PUB_WSS_HOST) @@ -16,6 +16,6 @@ def on_t_ticker_update(subscription: subscriptions.Ticker, data: TradingPairTick @bfx.wss.once("open") async def open(): - await bfx.wss.subscribe(Channels.TICKER, symbol="tBTCUSD") + await bfx.wss.subscribe(Channel.TICKER, symbol="tBTCUSD") bfx.wss.run() \ No newline at end of file From b8a5bcb5157874f4250c89bb73b105fbbb3be4a4 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Thu, 16 Feb 2023 20:08:05 +0100 Subject: [PATCH 09/18] Fix bugs and rewrite regions of new reconnection system. --- bfxapi/client.py | 12 ++- .../websocket/client/bfx_websocket_bucket.py | 7 +- .../websocket/client/bfx_websocket_client.py | 97 +++++++++++++------ 3 files changed, 83 insertions(+), 33 deletions(-) diff --git a/bfxapi/client.py b/bfxapi/client.py index ec72fb3..84b9d76 100644 --- a/bfxapi/client.py +++ b/bfxapi/client.py @@ -1,7 +1,7 @@ from .rest import BfxRestInterface from .websocket import BfxWebsocketClient -from typing import Optional +from typing import List, Optional from enum import Enum @@ -21,8 +21,15 @@ class Client(object): WSS_HOST: str = Constants.WSS_HOST, API_KEY: Optional[str] = None, API_SECRET: Optional[str] = None, + filter: Optional[List[str]] = None, log_level: str = "WARNING" ): + credentials = { + "API_KEY": API_KEY, + "API_SECRET": API_SECRET, + "filter": filter + } + self.rest = BfxRestInterface( host=REST_HOST, API_KEY=API_KEY, @@ -31,7 +38,6 @@ class Client(object): self.wss = BfxWebsocketClient( host=WSS_HOST, - API_KEY=API_KEY, - API_SECRET=API_SECRET, + credentials=credentials, log_level=log_level ) \ No newline at end of file diff --git a/bfxapi/websocket/client/bfx_websocket_bucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py index 15caffc..b4c573b 100644 --- a/bfxapi/websocket/client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -63,9 +63,12 @@ class BfxWebsocketBucket(object): self.event_emitter.emit("wss-error", message["code"], message["msg"]) elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT: self.handler.handle(self.subscriptions[chanId], *message[1:]) - except websockets.ConnectionClosedError: self.on_open_event.clear(); reconnection = True; continue + except websockets.ConnectionClosedError: + self.on_open_event.clear() + reconnection = True + continue - await self.websocket.wait_closed(); break + break @_require_websocket_connection async def _subscribe(self, channel, subId=None, **kwargs): diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 8f2a54d..15faf89 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -1,4 +1,4 @@ -import traceback, json, asyncio, hmac, hashlib, time, websockets +import traceback, json, asyncio, hmac, hashlib, time, websockets, socket, random from typing import cast @@ -38,10 +38,10 @@ class BfxWebsocketClient(object): *AuthenticatedChannelsHandler.EVENTS ] - def __init__(self, host, API_KEY = None, API_SECRET = None, filter = None, log_level = "WARNING"): - self.host, self.websocket, self.event_emitter = host, None, AsyncIOEventEmitter() + def __init__(self, host, credentials = None, log_level = "WARNING"): + self.websocket = None - self.API_KEY, self.API_SECRET, self.filter = API_KEY, API_SECRET, filter + self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter() self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input) @@ -71,33 +71,35 @@ class BfxWebsocketClient(object): tasks = [ bucket._connect(index) for index, bucket in enumerate(self.buckets) ] - tasks.append(self.__connect(self.API_KEY, self.API_SECRET, self.filter)) + tasks.append(self.__connect(self.credentials)) await asyncio.gather(*tasks) - async def __connect(self, API_KEY, API_SECRET, filter=None): - Reconnection = namedtuple("Reconnection", ["status", "code", "timestamp"]) + async def __connect(self, credentials = None): + Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"]) - reconnection = Reconnection(status=False, code=0, timestamp=None) + reconnection = Reconnection(status=False, attempts=0, timestamp=None) - async for websocket in websockets.connect(self.host): - self.websocket, self.authentication = websocket, False + async def _connection(): + nonlocal reconnection - if (await asyncio.gather(*[ on_open_event.wait() for on_open_event in self.on_open_events ])): - self.event_emitter.emit("open") + async with websockets.connect(self.host) as websocket: + if reconnection.status == True: + self.logger.info(f"Reconnect attempt successful (attempt N°{reconnection.attempts}): The " + + f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " + + f"(first reconnection attempt: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).") - if self.API_KEY != None and self.API_SECRET != None: - await self.__authenticate(API_KEY=API_KEY, API_SECRET=API_SECRET, filter=filter) + reconnection = Reconnection(status=False, attempts=0, timestamp=None) + + self.websocket, self.authentication = websocket, False + + if await asyncio.gather(*[on_open_event.wait() for on_open_event in self.on_open_events]): + self.event_emitter.emit("open") + + if self.credentials != None: + await self.__authenticate(**self.credentials) - try: async for message in websocket: - if reconnection.status == True: - self.logger.warning(f"Reconnect Attempt Successful (error <{reconnection.code}>): The " + - f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " + - f"(first reconnection attempt: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).") - - reconnection = Reconnection(status=False, code=0, timestamp=None) - message = json.loads(message) if isinstance(message, dict) and message["event"] == "info" and "version" in message: @@ -113,13 +115,52 @@ class BfxWebsocketClient(object): self.event_emitter.emit("wss-error", message["code"], message["msg"]) elif isinstance(message, list) and (chanId := message[0]) == 0 and message[1] != _HEARTBEAT: self.handler.handle(message[1], message[2]) - except websockets.ConnectionClosedError as error: - self.logger.error(f"Connection terminated due to an error (status code: <{error.code}>) -> {str(error)}. Attempting to reconnect...") - reconnection = Reconnection(status=True, code=error.code, timestamp=datetime.now()); - continue - + + class _Delay: + BACKOFF_MIN, BACKOFF_MAX = 1.92, 60.0 + + BACKOFF_INITIAL = 5.0 + + def __init__(self, backoff_factor): + self.__backoff_factor = backoff_factor + self.__backoff_delay = _Delay.BACKOFF_MIN + self.__initial_delay = random.random() * _Delay.BACKOFF_INITIAL + + def next(self): + backoff_delay = self.peek() + + __backoff_delay = self.__backoff_delay * self.__backoff_factor + self.__backoff_delay = min(__backoff_delay, _Delay.BACKOFF_MAX) + return backoff_delay + + def peek(self): + return (self.__backoff_delay == _Delay.BACKOFF_MIN) \ + and self.__initial_delay or self.__backoff_delay + + delay = _Delay(backoff_factor=1.618) + + while True: + if reconnection.status == True: + await asyncio.sleep(delay.next()) + + try: + await _connection() + except (websockets.ConnectionClosedError, socket.gaierror) as error: + if isinstance(error, websockets.ConnectionClosedError) and error.code == 1006: + self.logger.error("Connection lost: no close frame received " + + "or sent (1006). Attempting to reconnect...") + + reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()); + elif isinstance(error, socket.gaierror) and reconnection.status == True: + self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " + + f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds." + + f"(at the moment the client has been offline for {datetime.now() - reconnection.timestamp})") + + reconnection = reconnection._replace(attempts=reconnection.attempts + 1) + else: raise error + if reconnection.status == False: - await self.websocket.wait_closed(); break + break async def __authenticate(self, API_KEY, API_SECRET, filter=None): data = { "event": "auth", "filter": filter, "apiKey": API_KEY } From e536515bbdd1ee788fc885088dd6ac82b7b628bc Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Fri, 17 Feb 2023 02:58:53 +0100 Subject: [PATCH 10/18] Fix bugs and rewrite code in bfxapi.websocket.client sub-package. --- .../websocket/client/bfx_websocket_bucket.py | 24 ++++++++++++------- .../websocket/client/bfx_websocket_client.py | 10 ++++---- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/bfxapi/websocket/client/bfx_websocket_bucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py index b4c573b..90c8d21 100644 --- a/bfxapi/websocket/client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -37,6 +37,8 @@ class BfxWebsocketBucket(object): async for websocket in websockets.connect(self.host): self.websocket = websocket + self.on_open_event.set() + if reconnection == True or (reconnection := False): for pending in self.pendings: await self.websocket.send(json.dumps(pending)) @@ -46,8 +48,6 @@ class BfxWebsocketBucket(object): self.subscriptions.clear() - self.on_open_event.set() - try: async for message in websocket: message = json.loads(message) @@ -63,11 +63,14 @@ class BfxWebsocketBucket(object): self.event_emitter.emit("wss-error", message["code"], message["msg"]) elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT: self.handler.handle(self.subscriptions[chanId], *message[1:]) - except websockets.ConnectionClosedError: - self.on_open_event.clear() - reconnection = True - continue - + except websockets.ConnectionClosedError as error: + if error.code == 1006: + self.on_open_event.clear() + reconnection = True + continue + + raise error + break @_require_websocket_connection @@ -96,4 +99,9 @@ class BfxWebsocketBucket(object): @_require_websocket_connection async def _close(self, code=1000, reason=str()): - await self.websocket.close(code=code, reason=reason) \ No newline at end of file + await self.websocket.close(code=code, reason=reason) + + def _get_chan_id(self, subId): + for subscription in self.subscriptions.values(): + if subscription["subId"] == subId: + return subscription["chanId"] \ No newline at end of file diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 15faf89..64deb0b 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -78,7 +78,7 @@ class BfxWebsocketClient(object): async def __connect(self, credentials = None): Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"]) - reconnection = Reconnection(status=False, attempts=0, timestamp=None) + reconnection, delay = Reconnection(status=False, attempts=0, timestamp=None), None async def _connection(): nonlocal reconnection @@ -137,8 +137,6 @@ class BfxWebsocketClient(object): return (self.__backoff_delay == _Delay.BACKOFF_MIN) \ and self.__initial_delay or self.__backoff_delay - delay = _Delay(backoff_factor=1.618) - while True: if reconnection.status == True: await asyncio.sleep(delay.next()) @@ -151,6 +149,8 @@ class BfxWebsocketClient(object): + "or sent (1006). Attempting to reconnect...") reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()); + + delay = _Delay(backoff_factor=1.618) elif isinstance(error, socket.gaierror) and reconnection.status == True: self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. " + f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds." @@ -184,9 +184,9 @@ class BfxWebsocketClient(object): await self.buckets[index]._subscribe(channel, **kwargs) - async def unsubscribe(self, chanId): + async def unsubscribe(self, subId): for bucket in self.buckets: - if chanId in bucket.subscriptions.keys(): + if (chanId := bucket._get_chan_id(subId)): await bucket._unsubscribe(chanId=chanId) async def close(self, code=1000, reason=str()): From 4d0fa49e2271a650c0dc8e195a18e0bcea8d2935 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Fri, 17 Feb 2023 04:01:47 +0100 Subject: [PATCH 11/18] Rewrite bfxapi/utils/logger.py with new ColoredLogger. --- bfxapi/client.py | 2 +- bfxapi/utils/logger.py | 111 +++++------------- .../websocket/client/bfx_websocket_client.py | 10 +- 3 files changed, 38 insertions(+), 85 deletions(-) diff --git a/bfxapi/client.py b/bfxapi/client.py index 84b9d76..aa7eaf2 100644 --- a/bfxapi/client.py +++ b/bfxapi/client.py @@ -22,7 +22,7 @@ class Client(object): API_KEY: Optional[str] = None, API_SECRET: Optional[str] = None, filter: Optional[List[str]] = None, - log_level: str = "WARNING" + log_level: str = "INFO" ): credentials = { "API_KEY": API_KEY, diff --git a/bfxapi/utils/logger.py b/bfxapi/utils/logger.py index 0ea3894..cf3e970 100644 --- a/bfxapi/utils/logger.py +++ b/bfxapi/utils/logger.py @@ -1,99 +1,52 @@ -""" -Module used to describe all of the different data types -""" - import logging +BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8) + RESET_SEQ = "\033[0m" + COLOR_SEQ = "\033[1;%dm" +ITALIC_COLOR_SEQ = "\033[3;%dm" +UNDERLINE_COLOR_SEQ = "\033[4;%dm" + BOLD_SEQ = "\033[1m" -UNDERLINE_SEQ = "\033[04m" - -YELLOW = '\033[93m' -WHITE = '\33[37m' -BLUE = '\033[34m' -LIGHT_BLUE = '\033[94m' -RED = '\033[91m' -GREY = '\33[90m' - -KEYWORD_COLORS = { - 'WARNING': YELLOW, - 'INFO': LIGHT_BLUE, - 'DEBUG': WHITE, - 'CRITICAL': YELLOW, - 'ERROR': RED, - 'TRADE': '\33[102m\33[30m' -} def formatter_message(message, use_color = True): - """ - Syntax highlight certain keywords - """ if use_color: message = message.replace("$RESET", RESET_SEQ).replace("$BOLD", BOLD_SEQ) else: message = message.replace("$RESET", "").replace("$BOLD", "") return message -def format_word(message, word, color_seq, bold=False, underline=False): - """ - Surround the given word with a sequence - """ - replacer = color_seq + word + RESET_SEQ - if underline: - replacer = UNDERLINE_SEQ + replacer - if bold: - replacer = BOLD_SEQ + replacer - return message.replace(word, replacer) +COLORS = { + "DEBUG": CYAN, + "INFO": BLUE, + "WARNING": YELLOW, + "ERROR": RED +} -class Formatter(logging.Formatter): - """ - This Formatted simply colors in the levelname i.e 'INFO', 'DEBUG' - """ - def __init__(self, msg, use_color = True): - logging.Formatter.__init__(self, msg) - self.use_color = use_color +class _ColoredFormatter(logging.Formatter): + def __init__(self, msg, use_color = True): + logging.Formatter.__init__(self, msg, "%d-%m-%Y %H:%M:%S") + self.use_color = use_color - def format(self, record): - """ - Format and highlight certain keywords - """ - levelname = record.levelname - if self.use_color and levelname in KEYWORD_COLORS: - levelname_color = KEYWORD_COLORS[levelname] + levelname + RESET_SEQ - record.levelname = levelname_color - record.name = GREY + record.name + RESET_SEQ - return logging.Formatter.format(self, record) + def format(self, record): + levelname = record.levelname + if self.use_color and levelname in COLORS: + levelname_color = COLOR_SEQ % (30 + COLORS[levelname]) + levelname + RESET_SEQ + record.levelname = levelname_color + record.name = ITALIC_COLOR_SEQ % (30 + BLACK) + record.name + RESET_SEQ + return logging.Formatter.format(self, record) -class CustomLogger(logging.Logger): - """ - This adds extra logging functions such as logger.trade and also - sets the logger to use the custom formatter - """ - FORMAT = "[$BOLD%(name)s$RESET] [%(levelname)s] %(message)s" +class ColoredLogger(logging.Logger): + FORMAT = "[$BOLD%(name)s$RESET] [%(asctime)s] [%(levelname)s] %(message)s" + COLOR_FORMAT = formatter_message(FORMAT, True) - TRADE = 50 + + def __init__(self, name, level): + logging.Logger.__init__(self, name, level) - def __init__(self, name, logLevel='DEBUG'): - logging.Logger.__init__(self, name, logLevel) - color_formatter = Formatter(self.COLOR_FORMAT) + colored_formatter = _ColoredFormatter(self.COLOR_FORMAT) console = logging.StreamHandler() - console.setFormatter(color_formatter) - self.addHandler(console) - logging.addLevelName(self.TRADE, "TRADE") - return + console.setFormatter(colored_formatter) - def set_level(self, level): - logging.Logger.setLevel(self, level) - - def trade(self, message, *args, **kws): - """ - Print a syntax highlighted trade signal - """ - if self.isEnabledFor(self.TRADE): - message = format_word(message, 'CLOSED ', YELLOW, bold=True) - message = format_word(message, 'OPENED ', LIGHT_BLUE, bold=True) - message = format_word(message, 'UPDATED ', BLUE, bold=True) - message = format_word(message, 'CLOSED_ALL ', RED, bold=True) - # Yes, logger takes its '*args' as 'args'. - self._log(self.TRADE, message, args, **kws) \ No newline at end of file + self.addHandler(console) \ No newline at end of file diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 64deb0b..f3c354d 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -16,7 +16,7 @@ from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationC from ...utils.JSONEncoder import JSONEncoder -from ...utils.logger import Formatter, CustomLogger +from ...utils.logger import ColoredLogger def _require_websocket_authentication(function: F) -> F: async def wrapper(self, *args, **kwargs): @@ -38,7 +38,7 @@ class BfxWebsocketClient(object): *AuthenticatedChannelsHandler.EVENTS ] - def __init__(self, host, credentials = None, log_level = "WARNING"): + def __init__(self, host, credentials = None, log_level = "INFO"): self.websocket = None self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter() @@ -47,7 +47,7 @@ class BfxWebsocketClient(object): self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter) - self.logger = CustomLogger("BfxWebsocketClient", logLevel=log_level) + self.logger = ColoredLogger("BfxWebsocketClient", level=log_level) self.event_emitter.add_listener("error", lambda exception: self.logger.error(f"{type(exception).__name__}: {str(exception)}" + "\n" + @@ -85,9 +85,9 @@ class BfxWebsocketClient(object): async with websockets.connect(self.host) as websocket: if reconnection.status == True: - self.logger.info(f"Reconnect attempt successful (attempt N°{reconnection.attempts}): The " + + self.logger.info(f"Reconnect attempt successful (attempt no.{reconnection.attempts}): The " + f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " + - f"(first reconnection attempt: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).") + f"(connection lost at: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).") reconnection = Reconnection(status=False, attempts=0, timestamp=None) From 9eb2c73407a56d6229311d6ef132ebb47bbc50df Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Fri, 17 Feb 2023 04:04:50 +0100 Subject: [PATCH 12/18] Fix small bug in examples/websocket/ticker.py demo. --- bfxapi/websocket/client/bfx_websocket_client.py | 4 ++-- examples/websocket/ticker.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index f3c354d..4bb826a 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -96,8 +96,8 @@ class BfxWebsocketClient(object): if await asyncio.gather(*[on_open_event.wait() for on_open_event in self.on_open_events]): self.event_emitter.emit("open") - if self.credentials != None: - await self.__authenticate(**self.credentials) + if credentials and credentials["API_KEY"] and credentials["API_SECRET"]: + await self.__authenticate(**credentials) async for message in websocket: message = json.loads(message) diff --git a/examples/websocket/ticker.py b/examples/websocket/ticker.py index 729f3ea..a335a28 100644 --- a/examples/websocket/ticker.py +++ b/examples/websocket/ticker.py @@ -10,7 +10,7 @@ bfx = Client(WSS_HOST=Constants.PUB_WSS_HOST) @bfx.wss.on("t_ticker_update") def on_t_ticker_update(subscription: subscriptions.Ticker, data: TradingPairTicker): - print(f"Subscription with channel ID: {subscription['chanId']}") + print(f"Subscription with subId: {subscription['subId']}") print(f"Data: {data}") From fde27e933f1f21156a07c62f877284b0819e08f9 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Fri, 17 Feb 2023 04:15:38 +0100 Subject: [PATCH 13/18] Add handling for <20051 : Stop/Restart Websocket Server (please reconnect)>. --- bfxapi/websocket/client/bfx_websocket_client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 4bb826a..0218cd7 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -107,6 +107,8 @@ class BfxWebsocketClient(object): raise OutdatedClientVersion(f"Mismatch between the client version and the server version. " + f"Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, " + f"server version: {message['version']}).") + elif isinstance(message, dict) and message["event"] == "info" and message["code"] == 20051: + raise websockets.ConnectionClosedError(rcvd=None, sent=None) elif isinstance(message, dict) and message["event"] == "auth": if message["status"] == "OK": self.event_emitter.emit("authenticated", message); self.authentication = True From 16d0ee525edaabd56953eca4a557f3fd73baa46f Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Fri, 17 Feb 2023 04:29:26 +0100 Subject: [PATCH 14/18] Remove test_rest_serializers_and_types.py and test_websocket_serializers_and_types.py. Add new test_rest_serializers.py and test_websocket_serializers.py unit tests. Edit bfxapi.tests.__init__.py's test suite. --- bfxapi/tests/__init__.py | 8 ++++---- bfxapi/tests/test_rest_serializers.py | 17 +++++++++++++++++ bfxapi/tests/test_rest_serializers_and_types.py | 17 ----------------- bfxapi/tests/test_websocket_serializers.py | 17 +++++++++++++++++ .../test_websocket_serializers_and_types.py | 17 ----------------- 5 files changed, 38 insertions(+), 38 deletions(-) create mode 100644 bfxapi/tests/test_rest_serializers.py delete mode 100644 bfxapi/tests/test_rest_serializers_and_types.py create mode 100644 bfxapi/tests/test_websocket_serializers.py delete mode 100644 bfxapi/tests/test_websocket_serializers_and_types.py diff --git a/bfxapi/tests/__init__.py b/bfxapi/tests/__init__.py index a63ea0d..057c2c0 100644 --- a/bfxapi/tests/__init__.py +++ b/bfxapi/tests/__init__.py @@ -1,6 +1,6 @@ import unittest -from .test_rest_serializers_and_types import TestRestSerializersAndTypes -from .test_websocket_serializers_and_types import TestWebsocketSerializersAndTypes +from .test_rest_serializers import TestRestSerializers +from .test_websocket_serializers import TestWebsocketSerializers from .test_labeler import TestLabeler from .test_notification import TestNotification @@ -8,8 +8,8 @@ NAME = "tests" def suite(): return unittest.TestSuite([ - unittest.makeSuite(TestRestSerializersAndTypes), - unittest.makeSuite(TestWebsocketSerializersAndTypes), + unittest.makeSuite(TestRestSerializers), + unittest.makeSuite(TestWebsocketSerializers), unittest.makeSuite(TestLabeler), unittest.makeSuite(TestNotification), ]) diff --git a/bfxapi/tests/test_rest_serializers.py b/bfxapi/tests/test_rest_serializers.py new file mode 100644 index 0000000..4c24992 --- /dev/null +++ b/bfxapi/tests/test_rest_serializers.py @@ -0,0 +1,17 @@ +import unittest + +from ..labeler import _Type + +from ..rest import serializers + +class TestRestSerializers(unittest.TestCase): + def test_rest_serializers(self): + for serializer in map(serializers.__dict__.get, serializers.__serializers__): + self.assertTrue(issubclass(serializer.klass, _Type), + f"_Serializer <{serializer.name}>: .klass field must be a subclass of _Type (got {serializer.klass}).") + + self.assertListEqual(serializer.get_labels(), list(serializer.klass.__annotations__), + f"_Serializer <{serializer.name}> and _Type <{serializer.klass.__name__}> must have matching labels and fields.") + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/bfxapi/tests/test_rest_serializers_and_types.py b/bfxapi/tests/test_rest_serializers_and_types.py deleted file mode 100644 index 7bc7242..0000000 --- a/bfxapi/tests/test_rest_serializers_and_types.py +++ /dev/null @@ -1,17 +0,0 @@ -import unittest - -from ..rest import serializers, types - -class TestRestSerializersAndTypes(unittest.TestCase): - def test_consistency(self): - for serializer in map(serializers.__dict__.get, serializers.__serializers__): - type = types.__dict__.get(serializer.name) - - self.assertIsNotNone(type, f"_Serializer <{serializer.name}>: no respective _Type found in bfxapi.rest.types.") - self.assertEqual(serializer.klass, type, f"_Serializer <{serializer.name}>.klass: field does not match with respective _Type in bfxapi.rest.types.") - - self.assertListEqual(serializer.get_labels(), list(type.__annotations__), - f"_Serializer <{serializer.name}> and _Type <{type.__name__}> must have matching labels and fields.") - -if __name__ == "__main__": - unittest.main() \ No newline at end of file diff --git a/bfxapi/tests/test_websocket_serializers.py b/bfxapi/tests/test_websocket_serializers.py new file mode 100644 index 0000000..a559565 --- /dev/null +++ b/bfxapi/tests/test_websocket_serializers.py @@ -0,0 +1,17 @@ +import unittest + +from ..labeler import _Type + +from ..websocket import serializers + +class TestWebsocketSerializers(unittest.TestCase): + def test_websocket_serializers(self): + for serializer in map(serializers.__dict__.get, serializers.__serializers__): + self.assertTrue(issubclass(serializer.klass, _Type), + f"_Serializer <{serializer.name}>: .klass field must be a subclass of _Type (got {serializer.klass}).") + + self.assertListEqual(serializer.get_labels(), list(serializer.klass.__annotations__), + f"_Serializer <{serializer.name}> and _Type <{serializer.klass.__name__}> must have matching labels and fields.") + +if __name__ == "__main__": + unittest.main() \ No newline at end of file diff --git a/bfxapi/tests/test_websocket_serializers_and_types.py b/bfxapi/tests/test_websocket_serializers_and_types.py deleted file mode 100644 index 338b959..0000000 --- a/bfxapi/tests/test_websocket_serializers_and_types.py +++ /dev/null @@ -1,17 +0,0 @@ -import unittest - -from ..websocket import serializers, types - -class TestWebsocketSerializersAndTypes(unittest.TestCase): - def test_consistency(self): - for serializer in map(serializers.__dict__.get, serializers.__serializers__): - type = types.__dict__.get(serializer.name) - - self.assertIsNotNone(type, f"_Serializer <{serializer.name}>: no respective _Type found in bfxapi.websocket.types.") - self.assertEqual(serializer.klass, type, f"_Serializer <{serializer.name}>.klass: field does not match with respective _Type in bfxapi.websocket.types.") - - self.assertListEqual(serializer.get_labels(), list(type.__annotations__), - f"_Serializer <{serializer.name}> and _Type <{type.__name__}> must have matching labels and fields.") - -if __name__ == "__main__": - unittest.main() \ No newline at end of file From d72fcf3981bb36570c31705c44bfae631610d469 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Fri, 17 Feb 2023 17:02:09 +0100 Subject: [PATCH 15/18] Add better handling for info code 20051. --- bfxapi/websocket/client/bfx_websocket_client.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index 0218cd7..adea696 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -108,7 +108,9 @@ class BfxWebsocketClient(object): f"Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, " + f"server version: {message['version']}).") elif isinstance(message, dict) and message["event"] == "info" and message["code"] == 20051: - raise websockets.ConnectionClosedError(rcvd=None, sent=None) + rcvd = websockets.frames.Close(code=1012, reason="Stop/Restart Websocket Server (please reconnect).") + + raise websockets.ConnectionClosedError(rcvd=rcvd, sent=None) elif isinstance(message, dict) and message["event"] == "auth": if message["status"] == "OK": self.event_emitter.emit("authenticated", message); self.authentication = True @@ -130,9 +132,9 @@ class BfxWebsocketClient(object): def next(self): backoff_delay = self.peek() - __backoff_delay = self.__backoff_delay * self.__backoff_factor self.__backoff_delay = min(__backoff_delay, _Delay.BACKOFF_MAX) + return backoff_delay def peek(self): @@ -146,9 +148,14 @@ class BfxWebsocketClient(object): try: await _connection() except (websockets.ConnectionClosedError, socket.gaierror) as error: - if isinstance(error, websockets.ConnectionClosedError) and error.code == 1006: - self.logger.error("Connection lost: no close frame received " - + "or sent (1006). Attempting to reconnect...") + if isinstance(error, websockets.ConnectionClosedError) and (error.code == 1006 or error.code == 1012): + if error.code == 1006: + self.logger.error("Connection lost: no close frame received " + + "or sent (1006). Attempting to reconnect...") + + if error.code == 1012: + self.logger.info("WSS server is about to restart, reconnection " + + "required (client received 20051). Attempt in progress...") reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now()); From ab66170cf3eb16ff4db3f06a2f4d71f2d5024771 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Fri, 17 Feb 2023 18:38:07 +0100 Subject: [PATCH 16/18] Apply refactoring to root package bfxapi. --- bfxapi/__init__.py | 7 ++++- bfxapi/client.py | 28 ++++++------------- bfxapi/rest/endpoints/bfx_rest_interface.py | 5 +++- .../rest/endpoints/rest_public_endpoints.py | 16 +++++------ bfxapi/urls.py | 7 +++++ .../websocket/client/bfx_websocket_client.py | 4 +-- examples/rest/claim_position.py | 4 +-- examples/rest/create_funding_offer.py | 4 +-- examples/rest/create_order.py | 4 +-- examples/rest/derivatives.py | 4 +-- examples/rest/extra_calcs.py | 4 +-- examples/rest/funding_auto_renew.py | 4 +-- examples/rest/get_authenticated_data.py | 4 +-- examples/rest/get_candles_hist.py | 4 +-- examples/rest/get_funding_info.py | 4 +-- examples/rest/get_funding_trades_history.py | 4 +-- examples/rest/get_liquidations.py | 4 +-- examples/rest/get_positions.py | 4 +-- examples/rest/get_public_data.py | 4 +-- examples/rest/get_pulse_data.py | 4 +-- examples/rest/increase_position.py | 4 +-- examples/rest/keep_taken_funding.py | 4 +-- examples/rest/merchant.py | 4 +-- examples/rest/return_taken_funding.py | 4 +-- examples/rest/transfer_wallet.py | 4 +-- examples/websocket/create_order.py | 4 +-- examples/websocket/order_book.py | 4 +-- examples/websocket/raw_order_book.py | 4 +-- examples/websocket/ticker.py | 4 +-- 29 files changed, 81 insertions(+), 78 deletions(-) create mode 100644 bfxapi/urls.py diff --git a/bfxapi/__init__.py b/bfxapi/__init__.py index c11c9ab..4fbdfd6 100644 --- a/bfxapi/__init__.py +++ b/bfxapi/__init__.py @@ -1 +1,6 @@ -from .client import Client, Constants \ No newline at end of file +from .client import Client + +from .urls import REST_HOST, PUB_REST_HOST, STAGING_REST_HOST, \ + WSS_HOST, PUB_WSS_HOST, STAGING_WSS_HOST + +NAME = "bfxapi" \ No newline at end of file diff --git a/bfxapi/client.py b/bfxapi/client.py index aa7eaf2..f3121ac 100644 --- a/bfxapi/client.py +++ b/bfxapi/client.py @@ -1,39 +1,27 @@ from .rest import BfxRestInterface from .websocket import BfxWebsocketClient +from .urls import REST_HOST, WSS_HOST from typing import List, Optional -from enum import Enum - -class Constants(str, Enum): - REST_HOST = "https://api.bitfinex.com/v2" - PUB_REST_HOST = "https://api-pub.bitfinex.com/v2" - STAGING_REST_HOST = "https://api.staging.bitfinex.com/v2" - - WSS_HOST = "wss://api.bitfinex.com/ws/2" - PUB_WSS_HOST = "wss://api-pub.bitfinex.com/ws/2" - STAGING_WSS_HOST = "wss://api.staging.bitfinex.com/ws/2" - class Client(object): def __init__( self, - REST_HOST: str = Constants.REST_HOST, - WSS_HOST: str = Constants.WSS_HOST, + REST_HOST: str = REST_HOST, + WSS_HOST: str = WSS_HOST, API_KEY: Optional[str] = None, API_SECRET: Optional[str] = None, filter: Optional[List[str]] = None, log_level: str = "INFO" ): - credentials = { - "API_KEY": API_KEY, - "API_SECRET": API_SECRET, - "filter": filter - } + credentials = None + + if API_KEY and API_SECRET: + credentials = { "API_KEY": API_KEY, "API_SECRET": API_SECRET, "filter": filter } self.rest = BfxRestInterface( host=REST_HOST, - API_KEY=API_KEY, - API_SECRET=API_SECRET + credentials=credentials ) self.wss = BfxWebsocketClient( diff --git a/bfxapi/rest/endpoints/bfx_rest_interface.py b/bfxapi/rest/endpoints/bfx_rest_interface.py index a2dc6ec..b117fa6 100644 --- a/bfxapi/rest/endpoints/bfx_rest_interface.py +++ b/bfxapi/rest/endpoints/bfx_rest_interface.py @@ -7,7 +7,10 @@ from .rest_merchant_endpoints import RestMerchantEndpoints class BfxRestInterface(object): VERSION = 2 - def __init__(self, host: str, API_KEY: Optional[str] = None, API_SECRET: Optional[str] = None): + def __init__(self, host, credentials = None): + API_KEY, API_SECRET = credentials and \ + (credentials["API_KEY"], credentials["API_SECRET"]) or (None, None) + self.public = RestPublicEndpoints(host=host) self.auth = RestAuthenticatedEndpoints(host=host, API_KEY=API_KEY, API_SECRET=API_SECRET) self.merchant = RestMerchantEndpoints(host=host, API_KEY=API_KEY, API_SECRET=API_SECRET) \ No newline at end of file diff --git a/bfxapi/rest/endpoints/rest_public_endpoints.py b/bfxapi/rest/endpoints/rest_public_endpoints.py index 3810e97..b5313fd 100644 --- a/bfxapi/rest/endpoints/rest_public_endpoints.py +++ b/bfxapi/rest/endpoints/rest_public_endpoints.py @@ -25,7 +25,7 @@ class RestPublicEndpoints(Middleware): if isinstance(pairs, str) and pairs == "ALL": return [ cast(TradingPairTicker, sub_data) for sub_data in self.get_tickers([ "ALL" ]) if cast(str, sub_data.symbol).startswith("t") ] - data = self.get_tickers([ "t" + pair for pair in pairs ]) + data = self.get_tickers([ pair for pair in pairs ]) return cast(List[TradingPairTicker], data) @@ -33,7 +33,7 @@ class RestPublicEndpoints(Middleware): if isinstance(currencies, str) and currencies == "ALL": return [ cast(FundingCurrencyTicker, sub_data) for sub_data in self.get_tickers([ "ALL" ]) if cast(str, sub_data.symbol).startswith("f") ] - data = self.get_tickers([ "f" + currency for currency in currencies ]) + data = self.get_tickers([ currency for currency in currencies ]) return cast(List[FundingCurrencyTicker], data) @@ -52,25 +52,25 @@ class RestPublicEndpoints(Middleware): def get_t_trades(self, pair: str, limit: Optional[int] = None, start: Optional[str] = None, end: Optional[str] = None, sort: Optional[Sort] = None) -> List[TradingPairTrade]: params = { "limit": limit, "start": start, "end": end, "sort": sort } - data = self._GET(f"trades/{'t' + pair}/hist", params=params) + data = self._GET(f"trades/{pair}/hist", params=params) return [ serializers.TradingPairTrade.parse(*sub_data) for sub_data in data ] def get_f_trades(self, currency: str, limit: Optional[int] = None, start: Optional[str] = None, end: Optional[str] = None, sort: Optional[Sort] = None) -> List[FundingCurrencyTrade]: params = { "limit": limit, "start": start, "end": end, "sort": sort } - data = self._GET(f"trades/{'f' + currency}/hist", params=params) + data = self._GET(f"trades/{currency}/hist", params=params) return [ serializers.FundingCurrencyTrade.parse(*sub_data) for sub_data in data ] def get_t_book(self, pair: str, precision: Literal["P0", "P1", "P2", "P3", "P4"], len: Optional[Literal[1, 25, 100]] = None) -> List[TradingPairBook]: - return [ serializers.TradingPairBook.parse(*sub_data) for sub_data in self._GET(f"book/{'t' + pair}/{precision}", params={ "len": len }) ] + return [ serializers.TradingPairBook.parse(*sub_data) for sub_data in self._GET(f"book/{pair}/{precision}", params={ "len": len }) ] def get_f_book(self, currency: str, precision: Literal["P0", "P1", "P2", "P3", "P4"], len: Optional[Literal[1, 25, 100]] = None) -> List[FundingCurrencyBook]: - return [ serializers.FundingCurrencyBook.parse(*sub_data) for sub_data in self._GET(f"book/{'f' + currency}/{precision}", params={ "len": len }) ] + return [ serializers.FundingCurrencyBook.parse(*sub_data) for sub_data in self._GET(f"book/{currency}/{precision}", params={ "len": len }) ] def get_t_raw_book(self, pair: str, len: Optional[Literal[1, 25, 100]] = None) -> List[TradingPairRawBook]: - return [ serializers.TradingPairRawBook.parse(*sub_data) for sub_data in self._GET(f"book/{'t' + pair}/R0", params={ "len": len }) ] + return [ serializers.TradingPairRawBook.parse(*sub_data) for sub_data in self._GET(f"book/{pair}/R0", params={ "len": len }) ] def get_f_raw_book(self, currency: str, len: Optional[Literal[1, 25, 100]] = None) -> List[FundingCurrencyRawBook]: - return [ serializers.FundingCurrencyRawBook.parse(*sub_data) for sub_data in self._GET(f"book/{'f' + currency}/R0", params={ "len": len }) ] + return [ serializers.FundingCurrencyRawBook.parse(*sub_data) for sub_data in self._GET(f"book/{currency}/R0", params={ "len": len }) ] def get_stats_hist( self, diff --git a/bfxapi/urls.py b/bfxapi/urls.py new file mode 100644 index 0000000..c9a622b --- /dev/null +++ b/bfxapi/urls.py @@ -0,0 +1,7 @@ +REST_HOST = "https://api.bitfinex.com/v2" +PUB_REST_HOST = "https://api-pub.bitfinex.com/v2" +STAGING_REST_HOST = "https://api.staging.bitfinex.com/v2" + +WSS_HOST = "wss://api.bitfinex.com/ws/2" +PUB_WSS_HOST = "wss://api-pub.bitfinex.com/ws/2" +STAGING_WSS_HOST = "wss://api.staging.bitfinex.com/ws/2" \ No newline at end of file diff --git a/bfxapi/websocket/client/bfx_websocket_client.py b/bfxapi/websocket/client/bfx_websocket_client.py index adea696..7dc06ed 100644 --- a/bfxapi/websocket/client/bfx_websocket_client.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -96,8 +96,8 @@ class BfxWebsocketClient(object): if await asyncio.gather(*[on_open_event.wait() for on_open_event in self.on_open_events]): self.event_emitter.emit("open") - if credentials and credentials["API_KEY"] and credentials["API_SECRET"]: - await self.__authenticate(**credentials) + if self.credentials: + await self.__authenticate(**self.credentials) async for message in websocket: message = json.loads(message) diff --git a/examples/rest/claim_position.py b/examples/rest/claim_position.py index ba3e4e0..084c9d0 100644 --- a/examples/rest/claim_position.py +++ b/examples/rest/claim_position.py @@ -2,10 +2,10 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/create_funding_offer.py b/examples/rest/create_funding_offer.py index c1031d8..f462325 100644 --- a/examples/rest/create_funding_offer.py +++ b/examples/rest/create_funding_offer.py @@ -2,11 +2,11 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST from bfxapi.enums import FundingOfferType, Flag bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/create_order.py b/examples/rest/create_order.py index ea70265..607f7c9 100644 --- a/examples/rest/create_order.py +++ b/examples/rest/create_order.py @@ -1,11 +1,11 @@ # python -c "import examples.rest.create_order" import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST from bfxapi.enums import OrderType, Flag bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/derivatives.py b/examples/rest/derivatives.py index 4aedd00..58a0031 100644 --- a/examples/rest/derivatives.py +++ b/examples/rest/derivatives.py @@ -2,10 +2,10 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/extra_calcs.py b/examples/rest/extra_calcs.py index e380aef..8ef93cb 100644 --- a/examples/rest/extra_calcs.py +++ b/examples/rest/extra_calcs.py @@ -1,9 +1,9 @@ # python -c "import examples.rest.extra_calcs" -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST + REST_HOST=REST_HOST ) t_symbol_response = bfx.rest.public.get_trading_market_average_price( diff --git a/examples/rest/funding_auto_renew.py b/examples/rest/funding_auto_renew.py index 11ee7ca..f546707 100644 --- a/examples/rest/funding_auto_renew.py +++ b/examples/rest/funding_auto_renew.py @@ -2,10 +2,10 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/get_authenticated_data.py b/examples/rest/get_authenticated_data.py index ada724a..c3226af 100644 --- a/examples/rest/get_authenticated_data.py +++ b/examples/rest/get_authenticated_data.py @@ -3,10 +3,10 @@ import os import time -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/get_candles_hist.py b/examples/rest/get_candles_hist.py index 98f9da7..d8d9881 100644 --- a/examples/rest/get_candles_hist.py +++ b/examples/rest/get_candles_hist.py @@ -1,9 +1,9 @@ # python -c "import examples.rest.get_candles_hist" -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST + REST_HOST=REST_HOST ) print(f"Candles: {bfx.rest.public.get_candles_hist(symbol='tBTCUSD')}") diff --git a/examples/rest/get_funding_info.py b/examples/rest/get_funding_info.py index 83d0635..82bf150 100644 --- a/examples/rest/get_funding_info.py +++ b/examples/rest/get_funding_info.py @@ -2,10 +2,10 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/get_funding_trades_history.py b/examples/rest/get_funding_trades_history.py index c1cc8e6..3af19d8 100644 --- a/examples/rest/get_funding_trades_history.py +++ b/examples/rest/get_funding_trades_history.py @@ -2,10 +2,10 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/get_liquidations.py b/examples/rest/get_liquidations.py index 6113a25..588c83a 100644 --- a/examples/rest/get_liquidations.py +++ b/examples/rest/get_liquidations.py @@ -2,10 +2,10 @@ import time -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST + REST_HOST=REST_HOST ) now = int(round(time.time() * 1000)) diff --git a/examples/rest/get_positions.py b/examples/rest/get_positions.py index 7e71824..62cd309 100644 --- a/examples/rest/get_positions.py +++ b/examples/rest/get_positions.py @@ -3,10 +3,10 @@ import os import time -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/get_public_data.py b/examples/rest/get_public_data.py index a6c388b..125f97c 100644 --- a/examples/rest/get_public_data.py +++ b/examples/rest/get_public_data.py @@ -2,10 +2,10 @@ import time -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST + REST_HOST=REST_HOST ) now = int(round(time.time() * 1000)) diff --git a/examples/rest/get_pulse_data.py b/examples/rest/get_pulse_data.py index 75b55ae..b0cc369 100644 --- a/examples/rest/get_pulse_data.py +++ b/examples/rest/get_pulse_data.py @@ -2,10 +2,10 @@ import time -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST + REST_HOST=REST_HOST ) now = int(round(time.time() * 1000)) diff --git a/examples/rest/increase_position.py b/examples/rest/increase_position.py index 65595c8..add66e3 100644 --- a/examples/rest/increase_position.py +++ b/examples/rest/increase_position.py @@ -2,10 +2,10 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/keep_taken_funding.py b/examples/rest/keep_taken_funding.py index 1314ffa..21e60f4 100644 --- a/examples/rest/keep_taken_funding.py +++ b/examples/rest/keep_taken_funding.py @@ -2,10 +2,10 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/merchant.py b/examples/rest/merchant.py index ec6727b..84d9b9b 100644 --- a/examples/rest/merchant.py +++ b/examples/rest/merchant.py @@ -2,10 +2,10 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/return_taken_funding.py b/examples/rest/return_taken_funding.py index ccb0c2b..73d5a33 100644 --- a/examples/rest/return_taken_funding.py +++ b/examples/rest/return_taken_funding.py @@ -2,10 +2,10 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/rest/transfer_wallet.py b/examples/rest/transfer_wallet.py index 8de15fd..9384bd8 100644 --- a/examples/rest/transfer_wallet.py +++ b/examples/rest/transfer_wallet.py @@ -2,10 +2,10 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, REST_HOST bfx = Client( - REST_HOST=Constants.REST_HOST, + REST_HOST=REST_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/websocket/create_order.py b/examples/websocket/create_order.py index f72f9d4..48f4957 100644 --- a/examples/websocket/create_order.py +++ b/examples/websocket/create_order.py @@ -2,12 +2,12 @@ import os -from bfxapi.client import Client, Constants +from bfxapi.client import Client, WSS_HOST from bfxapi.websocket.enums import Error, OrderType from bfxapi.websocket.types import Notification, Order bfx = Client( - WSS_HOST=Constants.WSS_HOST, + WSS_HOST=WSS_HOST, API_KEY=os.getenv("BFX_API_KEY"), API_SECRET=os.getenv("BFX_API_SECRET") ) diff --git a/examples/websocket/order_book.py b/examples/websocket/order_book.py index a419454..55e4ae3 100644 --- a/examples/websocket/order_book.py +++ b/examples/websocket/order_book.py @@ -4,7 +4,7 @@ from collections import OrderedDict from typing import List -from bfxapi import Client, Constants +from bfxapi import Client, PUB_WSS_HOST from bfxapi.websocket import subscriptions from bfxapi.websocket.enums import Channel, Error @@ -38,7 +38,7 @@ SYMBOLS = [ "tBTCUSD", "tLTCUSD", "tLTCBTC", "tETHUSD", "tETHBTC" ] order_book = OrderBook(symbols=SYMBOLS) -bfx = Client(WSS_HOST=Constants.PUB_WSS_HOST) +bfx = Client(WSS_HOST=PUB_WSS_HOST) @bfx.wss.on("wss-error") def on_wss_error(code: Error, msg: str): diff --git a/examples/websocket/raw_order_book.py b/examples/websocket/raw_order_book.py index a039060..3ce9c6d 100644 --- a/examples/websocket/raw_order_book.py +++ b/examples/websocket/raw_order_book.py @@ -4,7 +4,7 @@ from collections import OrderedDict from typing import List -from bfxapi import Client, Constants +from bfxapi import Client, PUB_WSS_HOST from bfxapi.websocket import subscriptions from bfxapi.websocket.enums import Channel, Error @@ -38,7 +38,7 @@ SYMBOLS = [ "tBTCUSD", "tLTCUSD", "tLTCBTC", "tETHUSD", "tETHBTC" ] raw_order_book = RawOrderBook(symbols=SYMBOLS) -bfx = Client(WSS_HOST=Constants.PUB_WSS_HOST) +bfx = Client(WSS_HOST=PUB_WSS_HOST) @bfx.wss.on("wss-error") def on_wss_error(code: Error, msg: str): diff --git a/examples/websocket/ticker.py b/examples/websocket/ticker.py index a335a28..d4b4c91 100644 --- a/examples/websocket/ticker.py +++ b/examples/websocket/ticker.py @@ -1,12 +1,12 @@ # python -c "import examples.websocket.ticker" -from bfxapi import Client, Constants +from bfxapi import Client, PUB_WSS_HOST from bfxapi.websocket import subscriptions from bfxapi.websocket.enums import Channel from bfxapi.websocket.types import TradingPairTicker -bfx = Client(WSS_HOST=Constants.PUB_WSS_HOST) +bfx = Client(WSS_HOST=PUB_WSS_HOST) @bfx.wss.on("t_ticker_update") def on_t_ticker_update(subscription: subscriptions.Ticker, data: TradingPairTicker): From 32a179fc004026eab5d8491eb0875bfdd61502e3 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Fri, 17 Feb 2023 19:08:28 +0100 Subject: [PATCH 17/18] Add LICENSE.md file (Apache-V2). Edit setup.py with new arguments. Prepare to distribute on PyPI. --- LICENSE | 1 + setup.py | 37 ++++++++++++++++++++++++++----------- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/LICENSE b/LICENSE index 2bb9ad2..4947287 100644 --- a/LICENSE +++ b/LICENSE @@ -1,3 +1,4 @@ + Apache License Version 2.0, January 2004 http://www.apache.org/licenses/ diff --git a/setup.py b/setup.py index ce3ae3e..93bcd68 100644 --- a/setup.py +++ b/setup.py @@ -2,18 +2,36 @@ from distutils.core import setup setup( name="bitfinex-api-py", - version="3.0.0", + version="3.0.0b1", + description="Official Bitfinex Python API", + long_description="A Python reference implementation of the Bitfinex API for both REST and websocket interaction", + long_description_content_type="text/markdown", + url="https://github.com/bitfinexcom/bitfinex-api-py", + author="Bitfinex", + author_email="support@bitfinex.com", + license="Apache-2.0", + classifiers=[ + "Development Status :: 4 - Beta", + + "Intended Audience :: Developers", + "Topic :: Software Development :: Build Tools", + + "License :: OSI Approved :: Apache-2.0", + + "Programming Language :: Python :: 3.8", + "Programming Language :: Python :: 3.9", + "Programming Language :: Python :: 3.10", + ], + keywords="bitfinex,api,trading", + project_urls={ + "Bug Reports": "https://github.com/bitfinexcom/bitfinex-api-py/issues", + "Source": "https://github.com/bitfinexcom/bitfinex-api-py", + }, packages=[ "bfxapi", "bfxapi.utils", "bfxapi.websocket", "bfxapi.websocket.client", "bfxapi.websocket.handlers", "bfxapi.rest", "bfxapi.rest.endpoints", "bfxapi.rest.middleware", ], - url="https://github.com/bitfinexcom/bitfinex-api-py", - license="OSI Approved :: Apache Software License", - author="Bitfinex", - author_email="support@bitfinex.com", - description="Official Bitfinex Python API", - keywords="bitfinex,api,trading", install_requires=[ "certifi~=2022.12.7", "charset-normalizer~=2.1.1", @@ -29,8 +47,5 @@ setup( "urllib3~=1.26.13", "websockets~=10.4", ], - project_urls={ - "Bug Reports": "https://github.com/bitfinexcom/bitfinex-api-py/issues", - "Source": "https://github.com/bitfinexcom/bitfinex-api-py", - } + python_requires=">=3.8" ) \ No newline at end of file From f4c6a21ef490d1c8d2b19be8a1a707f83c1ef5c7 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Fri, 17 Feb 2023 20:23:59 +0100 Subject: [PATCH 18/18] Ws examples Co-Authored-By: itsdeka --- examples/websocket/derivatives_status.py | 23 ++++++++++++++++++ examples/websocket/trades.py | 29 +++++++++++++++++++++++ examples/websocket/wallet_balance.py | 30 ++++++++++++++++++++++++ 3 files changed, 82 insertions(+) create mode 100644 examples/websocket/derivatives_status.py create mode 100644 examples/websocket/trades.py create mode 100644 examples/websocket/wallet_balance.py diff --git a/examples/websocket/derivatives_status.py b/examples/websocket/derivatives_status.py new file mode 100644 index 0000000..3099431 --- /dev/null +++ b/examples/websocket/derivatives_status.py @@ -0,0 +1,23 @@ +# python -c "import examples.websocket.derivatives_status" + +from bfxapi import Client, PUB_WSS_HOST +from bfxapi.websocket.enums import Error, Channel +from bfxapi.websocket.types import DerivativesStatus + +from bfxapi.websocket import subscriptions + +bfx = Client(WSS_HOST=PUB_WSS_HOST) + +@bfx.wss.on("derivatives_status_update") +def on_derivatives_status_update(subscription: subscriptions.Status, data: DerivativesStatus): + print(f"{subscription}:", data) + +@bfx.wss.on("wss-error") +def on_wss_error(code: Error, msg: str): + print(code, msg) + +@bfx.wss.once("open") +async def open(): + await bfx.wss.subscribe(Channel.STATUS, key="deriv:tBTCF0:USTF0") + +bfx.wss.run() \ No newline at end of file diff --git a/examples/websocket/trades.py b/examples/websocket/trades.py new file mode 100644 index 0000000..0a5291f --- /dev/null +++ b/examples/websocket/trades.py @@ -0,0 +1,29 @@ +# python -c "import examples.websocket.trades" + +from bfxapi import Client, PUB_WSS_HOST +from bfxapi.websocket.enums import Error, Channel +from bfxapi.websocket.types import Candle, TradingPairTrade + +from bfxapi.websocket import subscriptions + +bfx = Client(WSS_HOST=PUB_WSS_HOST) + +@bfx.wss.on("candles_update") +def on_candles_update(subscription: subscriptions.Candles, candle: Candle): + print(f"New candle: {candle}") + +@bfx.wss.on("t_trade_executed") +def on_t_trade_executed(subscription: subscriptions.Trades, trade: TradingPairTrade): + print(f"New trade: {trade}") + +@bfx.wss.on("wss-error") +def on_wss_error(code: Error, msg: str): + print(code, msg) + +@bfx.wss.once("open") +async def open(): + await bfx.wss.subscribe(Channel.CANDLES, key="trade:1m:tBTCUSD") + + await bfx.wss.subscribe(Channel.TRADES, symbol="tBTCUSD") + +bfx.wss.run() \ No newline at end of file diff --git a/examples/websocket/wallet_balance.py b/examples/websocket/wallet_balance.py new file mode 100644 index 0000000..0e1b489 --- /dev/null +++ b/examples/websocket/wallet_balance.py @@ -0,0 +1,30 @@ +# python -c "import examples.websocket.wallet_balance" + +import os + +from typing import List + +from bfxapi import Client, WSS_HOST +from bfxapi.websocket.enums import Error +from bfxapi.websocket.types import Wallet + +bfx = Client( + WSS_HOST=WSS_HOST, + API_KEY=os.getenv("BFX_API_KEY"), + API_SECRET=os.getenv("BFX_API_SECRET") +) + +@bfx.wss.on("wallet_snapshot") +def log_snapshot(wallets: List[Wallet]): + for wallet in wallets: + print(f"Balance: {wallet}") + +@bfx.wss.on("wallet_update") +def log_update(wallet: Wallet): + print(f"Balance update: {wallet}") + +@bfx.wss.on("wss-error") +def on_wss_error(code: Error, msg: str): + print(code, msg) + +bfx.wss.run() \ No newline at end of file