From f0f150cec2f79d7c40f20cf00a30a364378a27e0 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Mon, 13 Feb 2023 19:09:38 +0100 Subject: [PATCH] Split websocket package in multiple sub-package. Split handlers.py in public_channels_handler.py and authenticated_channels_handler.py. Rename files attaining to new conventions. --- bfxapi/rest/__init__.py | 3 +- bfxapi/rest/endpoints/__init__.py | 2 + bfxapi/websocket/__init__.py | 4 +- bfxapi/websocket/client/__init__.py | 5 ++ .../bfx_websocket_bucket.py} | 12 +-- .../bfx_websocket_client.py} | 20 ++--- .../bfx_websocket_inputs.py} | 6 +- bfxapi/websocket/handlers/__init__.py | 3 + .../authenticated_channels_handler.py | 71 ++++++++++++++++++ .../public_channels_handler.py} | 74 +------------------ bfxapi/websocket/subscriptions.py | 8 +- 11 files changed, 117 insertions(+), 91 deletions(-) create mode 100644 bfxapi/websocket/client/__init__.py rename bfxapi/websocket/{_BfxWebsocketBucket.py => client/bfx_websocket_bucket.py} (87%) rename bfxapi/websocket/{BfxWebsocketClient.py => client/bfx_websocket_client.py} (89%) rename bfxapi/websocket/{_BfxWebsocketInputs.py => client/bfx_websocket_inputs.py} (96%) create mode 100644 bfxapi/websocket/handlers/__init__.py create mode 100644 bfxapi/websocket/handlers/authenticated_channels_handler.py rename bfxapi/websocket/{handlers.py => handlers/public_channels_handler.py} (60%) diff --git a/bfxapi/rest/__init__.py b/bfxapi/rest/__init__.py index 7ee9fed..71e3b54 100644 --- a/bfxapi/rest/__init__.py +++ b/bfxapi/rest/__init__.py @@ -1,3 +1,4 @@ -from .endpoints import BfxRestInterface, RestPublicEndpoints, RestAuthenticatedEndpoints +from .endpoints import BfxRestInterface, RestPublicEndpoints, RestAuthenticatedEndpoints, \ + RestMerchantEndpoints NAME = "rest" \ No newline at end of file diff --git a/bfxapi/rest/endpoints/__init__.py b/bfxapi/rest/endpoints/__init__.py index 24a005d..e35d6fb 100644 --- a/bfxapi/rest/endpoints/__init__.py +++ b/bfxapi/rest/endpoints/__init__.py @@ -1,5 +1,7 @@ from .bfx_rest_interface import BfxRestInterface + from .rest_public_endpoints import RestPublicEndpoints from .rest_authenticated_endpoints import RestAuthenticatedEndpoints +from .rest_merchant_endpoints import RestMerchantEndpoints NAME = "endpoints" \ No newline at end of file diff --git a/bfxapi/websocket/__init__.py b/bfxapi/websocket/__init__.py index e24f778..1287433 100644 --- a/bfxapi/websocket/__init__.py +++ b/bfxapi/websocket/__init__.py @@ -1 +1,3 @@ -from .BfxWebsocketClient import BfxWebsocketClient \ No newline at end of file +from .client import BfxWebsocketClient, BfxWebsocketBucket, BfxWebsocketInputs + +NAME = "websocket" \ No newline at end of file diff --git a/bfxapi/websocket/client/__init__.py b/bfxapi/websocket/client/__init__.py new file mode 100644 index 0000000..50057cb --- /dev/null +++ b/bfxapi/websocket/client/__init__.py @@ -0,0 +1,5 @@ +from .bfx_websocket_client import BfxWebsocketClient +from .bfx_websocket_bucket import BfxWebsocketBucket +from .bfx_websocket_inputs import BfxWebsocketInputs + +NAME = "client" \ No newline at end of file diff --git a/bfxapi/websocket/_BfxWebsocketBucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py similarity index 87% rename from bfxapi/websocket/_BfxWebsocketBucket.py rename to bfxapi/websocket/client/bfx_websocket_bucket.py index 2cfe48c..550581d 100644 --- a/bfxapi/websocket/_BfxWebsocketBucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -2,9 +2,9 @@ import json, uuid, websockets from typing import Literal, TypeVar, Callable, cast -from .handlers import PublicChannelsHandler +from ..handlers import PublicChannelsHandler -from .exceptions import ConnectionNotOpen, TooManySubscriptions, OutdatedClientVersion +from ..exceptions import ConnectionNotOpen, TooManySubscriptions, OutdatedClientVersion _HEARTBEAT = "hb" @@ -19,7 +19,7 @@ def _require_websocket_connection(function: F) -> F: return cast(F, wrapper) -class _BfxWebsocketBucket(object): +class BfxWebsocketBucket(object): VERSION = 2 MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 @@ -42,8 +42,8 @@ class _BfxWebsocketBucket(object): message = json.loads(message) if isinstance(message, dict) and message["event"] == "info" and "version" in message: - if _BfxWebsocketBucket.VERSION != message["version"]: - raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {_BfxWebsocketBucket.VERSION}, server version: {message['version']}).") + if BfxWebsocketBucket.VERSION != message["version"]: + raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketBucket.VERSION}, server version: {message['version']}).") elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]): self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ] self.subscriptions[chanId] = message @@ -60,7 +60,7 @@ class _BfxWebsocketBucket(object): @_require_websocket_connection async def _subscribe(self, channel, subId=None, **kwargs): - if len(self.subscriptions) + len(self.pendings) == _BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT: + if len(self.subscriptions) + len(self.pendings) == BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT: raise TooManySubscriptions("The client has reached the maximum number of subscriptions.") subscription = { diff --git a/bfxapi/websocket/BfxWebsocketClient.py b/bfxapi/websocket/client/bfx_websocket_client.py similarity index 89% rename from bfxapi/websocket/BfxWebsocketClient.py rename to bfxapi/websocket/client/bfx_websocket_client.py index 98b5b75..8dccf79 100644 --- a/bfxapi/websocket/BfxWebsocketClient.py +++ b/bfxapi/websocket/client/bfx_websocket_client.py @@ -4,15 +4,17 @@ from typing import Literal, TypeVar, Callable, cast from pyee.asyncio import AsyncIOEventEmitter -from ._BfxWebsocketBucket import _HEARTBEAT, F, _require_websocket_connection, _BfxWebsocketBucket +from .bfx_websocket_bucket import _HEARTBEAT, F, _require_websocket_connection, BfxWebsocketBucket -from ._BfxWebsocketInputs import _BfxWebsocketInputs -from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler -from .exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported +from .bfx_websocket_inputs import BfxWebsocketInputs +from ..handlers import PublicChannelsHandler, AuthenticatedChannelsHandler +from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported -from ..utils.JSONEncoder import JSONEncoder +from ..enums import Channels -from ..utils.logger import Formatter, CustomLogger +from ...utils.JSONEncoder import JSONEncoder + +from ...utils.logger import Formatter, CustomLogger def _require_websocket_authentication(function: F) -> F: async def wrapper(self, *args, **kwargs): @@ -24,7 +26,7 @@ def _require_websocket_authentication(function: F) -> F: return cast(F, wrapper) class BfxWebsocketClient(object): - VERSION = _BfxWebsocketBucket.VERSION + VERSION = BfxWebsocketBucket.VERSION MAXIMUM_BUCKETS_AMOUNT = 20 @@ -46,9 +48,9 @@ class BfxWebsocketClient(object): self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter) - self.buckets = [ _BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ] + self.buckets = [ BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ] - self.inputs = _BfxWebsocketInputs(self.__handle_websocket_input) + self.inputs = BfxWebsocketInputs(self.__handle_websocket_input) self.logger = CustomLogger("BfxWebsocketClient", logLevel=log_level) diff --git a/bfxapi/websocket/_BfxWebsocketInputs.py b/bfxapi/websocket/client/bfx_websocket_inputs.py similarity index 96% rename from bfxapi/websocket/_BfxWebsocketInputs.py rename to bfxapi/websocket/client/bfx_websocket_inputs.py index 0d9ee0b..48e3137 100644 --- a/bfxapi/websocket/_BfxWebsocketInputs.py +++ b/bfxapi/websocket/client/bfx_websocket_inputs.py @@ -2,10 +2,10 @@ from decimal import Decimal from datetime import datetime from typing import Union, Optional, List, Tuple -from .types import JSON -from .enums import OrderType, FundingOfferType +from ..types import JSON +from ..enums import OrderType, FundingOfferType -class _BfxWebsocketInputs(object): +class BfxWebsocketInputs(object): def __init__(self, __handle_websocket_input): self.__handle_websocket_input = __handle_websocket_input diff --git a/bfxapi/websocket/handlers/__init__.py b/bfxapi/websocket/handlers/__init__.py new file mode 100644 index 0000000..4fe650a --- /dev/null +++ b/bfxapi/websocket/handlers/__init__.py @@ -0,0 +1,3 @@ +from .public_channels_handler import PublicChannelsHandler +from .authenticated_channels_handler import AuthenticatedChannelsHandler +NAME = "handlers" \ No newline at end of file diff --git a/bfxapi/websocket/handlers/authenticated_channels_handler.py b/bfxapi/websocket/handlers/authenticated_channels_handler.py new file mode 100644 index 0000000..2205012 --- /dev/null +++ b/bfxapi/websocket/handlers/authenticated_channels_handler.py @@ -0,0 +1,71 @@ +from typing import List + +from ..types import * + +from .. import serializers + +from ..exceptions import BfxWebsocketException + +class AuthenticatedChannelsHandler(object): + __abbreviations = { + "os": "order_snapshot", "on": "order_new", "ou": "order_update", "oc": "order_cancel", + "ps": "position_snapshot", "pn": "position_new", "pu": "position_update", "pc": "position_close", + "te": "trade_executed", "tu": "trade_execution_update", + "fos": "funding_offer_snapshot", "fon": "funding_offer_new", "fou": "funding_offer_update", "foc": "funding_offer_cancel", + "fcs": "funding_credit_snapshot", "fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close", + "fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update", "flc": "funding_loan_close", + "ws": "wallet_snapshot", "wu": "wallet_update", + "bu": "balance_update", + } + + __serializers = { + ("os", "on", "ou", "oc",): serializers.Order, + ("ps", "pn", "pu", "pc",): serializers.Position, + ("te", "tu"): serializers.Trade, + ("fos", "fon", "fou", "foc",): serializers.FundingOffer, + ("fcs", "fcn", "fcu", "fcc",): serializers.FundingCredit, + ("fls", "fln", "flu", "flc",): serializers.FundingLoan, + ("ws", "wu",): serializers.Wallet, + ("bu",): serializers.Balance + } + + EVENTS = [ + "notification", + "on-req-notification", "ou-req-notification", "oc-req-notification", + "oc_multi-notification", + "fon-req-notification", "foc-req-notification", + *list(__abbreviations.values()) + ] + + def __init__(self, event_emitter, strict = False): + self.event_emitter, self.strict = event_emitter, strict + + def handle(self, type, stream): + if type == "n": + return self.__notification(stream) + + for types, serializer in AuthenticatedChannelsHandler.__serializers.items(): + if type in types: + event = AuthenticatedChannelsHandler.__abbreviations[type] + + if all(isinstance(substream, list) for substream in stream): + return self.event_emitter.emit(event, [ serializer.parse(*substream) for substream in stream ]) + + return self.event_emitter.emit(event, serializer.parse(*stream)) + + if self.strict == True: + raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.") + + def __notification(self, stream): + type, serializer = "notification", serializers._Notification(serializer=None) + + if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req": + type, serializer = f"{stream[1]}-notification", serializers._Notification[Order](serializer=serializers.Order) + + if stream[1] == "oc_multi-req": + type, serializer = f"{stream[1]}-notification", serializers._Notification[List[Order]](serializer=serializers.Order, iterate=True) + + if stream[1] == "fon-req" or stream[1] == "foc-req": + type, serializer = f"{stream[1]}-notification", serializers._Notification[FundingOffer](serializer=serializers.FundingOffer) + + return self.event_emitter.emit(type, serializer.parse(*stream)) \ No newline at end of file diff --git a/bfxapi/websocket/handlers.py b/bfxapi/websocket/handlers/public_channels_handler.py similarity index 60% rename from bfxapi/websocket/handlers.py rename to bfxapi/websocket/handlers/public_channels_handler.py index 686501b..154e677 100644 --- a/bfxapi/websocket/handlers.py +++ b/bfxapi/websocket/handlers/public_channels_handler.py @@ -1,10 +1,8 @@ -from typing import List +from ..types import * -from .types import * +from .. import serializers -from . import serializers -from .enums import Channels -from .exceptions import BfxWebsocketException +from ..enums import Channels class PublicChannelsHandler(object): EVENTS = [ @@ -117,68 +115,4 @@ class PublicChannelsHandler(object): "derivatives_status_update", subscription, serializers.DerivativesStatus.parse(*stream[0]) - ) - -class AuthenticatedChannelsHandler(object): - __abbreviations = { - "os": "order_snapshot", "on": "order_new", "ou": "order_update", "oc": "order_cancel", - "ps": "position_snapshot", "pn": "position_new", "pu": "position_update", "pc": "position_close", - "te": "trade_executed", "tu": "trade_execution_update", - "fos": "funding_offer_snapshot", "fon": "funding_offer_new", "fou": "funding_offer_update", "foc": "funding_offer_cancel", - "fcs": "funding_credit_snapshot", "fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close", - "fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update", "flc": "funding_loan_close", - "ws": "wallet_snapshot", "wu": "wallet_update", - "bu": "balance_update", - } - - __serializers = { - ("os", "on", "ou", "oc",): serializers.Order, - ("ps", "pn", "pu", "pc",): serializers.Position, - ("te", "tu"): serializers.Trade, - ("fos", "fon", "fou", "foc",): serializers.FundingOffer, - ("fcs", "fcn", "fcu", "fcc",): serializers.FundingCredit, - ("fls", "fln", "flu", "flc",): serializers.FundingLoan, - ("ws", "wu",): serializers.Wallet, - ("bu",): serializers.Balance - } - - EVENTS = [ - "notification", - "on-req-notification", "ou-req-notification", "oc-req-notification", - "oc_multi-notification", - "fon-req-notification", "foc-req-notification", - *list(__abbreviations.values()) - ] - - def __init__(self, event_emitter, strict = False): - self.event_emitter, self.strict = event_emitter, strict - - def handle(self, type, stream): - if type == "n": - return self.__notification(stream) - - for types, serializer in AuthenticatedChannelsHandler.__serializers.items(): - if type in types: - event = AuthenticatedChannelsHandler.__abbreviations[type] - - if all(isinstance(substream, list) for substream in stream): - return self.event_emitter.emit(event, [ serializer.parse(*substream) for substream in stream ]) - - return self.event_emitter.emit(event, serializer.parse(*stream)) - - if self.strict == True: - raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.") - - def __notification(self, stream): - type, serializer = "notification", serializers._Notification(serializer=None) - - if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req": - type, serializer = f"{stream[1]}-notification", serializers._Notification[Order](serializer=serializers.Order) - - if stream[1] == "oc_multi-req": - type, serializer = f"{stream[1]}-notification", serializers._Notification[List[Order]](serializer=serializers.Order, iterate=True) - - if stream[1] == "fon-req" or stream[1] == "foc-req": - type, serializer = f"{stream[1]}-notification", serializers._Notification[FundingOffer](serializer=serializers.FundingOffer) - - return self.event_emitter.emit(type, serializer.parse(*stream)) \ No newline at end of file + ) \ No newline at end of file diff --git a/bfxapi/websocket/subscriptions.py b/bfxapi/websocket/subscriptions.py index e22bb5e..5e2d692 100644 --- a/bfxapi/websocket/subscriptions.py +++ b/bfxapi/websocket/subscriptions.py @@ -1,6 +1,8 @@ -from typing import TypedDict, Optional +from typing import TypedDict, Union, Optional __all__ = [ + "Subscription", + "Ticker", "Trades", "Book", @@ -8,6 +10,10 @@ __all__ = [ "Status" ] +_Header = TypedDict("_Header", { "event": str, "channel": str, "subId": str }) + +Subscription = Union["Ticker", "Trades", "Book", "Candles", "Status"] + class Ticker(TypedDict): chanId: int; symbol: str pair: Optional[str]