Split websocket package in multiple sub-package. Split handlers.py in public_channels_handler.py and authenticated_channels_handler.py. Rename files attaining to new conventions.

This commit is contained in:
Davide Casale
2023-02-13 19:09:38 +01:00
parent 6a7577f98b
commit f0f150cec2
11 changed files with 117 additions and 91 deletions

View File

@@ -1,3 +1,4 @@
from .endpoints import BfxRestInterface, RestPublicEndpoints, RestAuthenticatedEndpoints
from .endpoints import BfxRestInterface, RestPublicEndpoints, RestAuthenticatedEndpoints, \
RestMerchantEndpoints
NAME = "rest"

View File

@@ -1,5 +1,7 @@
from .bfx_rest_interface import BfxRestInterface
from .rest_public_endpoints import RestPublicEndpoints
from .rest_authenticated_endpoints import RestAuthenticatedEndpoints
from .rest_merchant_endpoints import RestMerchantEndpoints
NAME = "endpoints"

View File

@@ -1 +1,3 @@
from .BfxWebsocketClient import BfxWebsocketClient
from .client import BfxWebsocketClient, BfxWebsocketBucket, BfxWebsocketInputs
NAME = "websocket"

View File

@@ -0,0 +1,5 @@
from .bfx_websocket_client import BfxWebsocketClient
from .bfx_websocket_bucket import BfxWebsocketBucket
from .bfx_websocket_inputs import BfxWebsocketInputs
NAME = "client"

View File

@@ -2,9 +2,9 @@ import json, uuid, websockets
from typing import Literal, TypeVar, Callable, cast
from .handlers import PublicChannelsHandler
from ..handlers import PublicChannelsHandler
from .exceptions import ConnectionNotOpen, TooManySubscriptions, OutdatedClientVersion
from ..exceptions import ConnectionNotOpen, TooManySubscriptions, OutdatedClientVersion
_HEARTBEAT = "hb"
@@ -19,7 +19,7 @@ def _require_websocket_connection(function: F) -> F:
return cast(F, wrapper)
class _BfxWebsocketBucket(object):
class BfxWebsocketBucket(object):
VERSION = 2
MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25
@@ -42,8 +42,8 @@ class _BfxWebsocketBucket(object):
message = json.loads(message)
if isinstance(message, dict) and message["event"] == "info" and "version" in message:
if _BfxWebsocketBucket.VERSION != message["version"]:
raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {_BfxWebsocketBucket.VERSION}, server version: {message['version']}).")
if BfxWebsocketBucket.VERSION != message["version"]:
raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketBucket.VERSION}, server version: {message['version']}).")
elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]):
self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ]
self.subscriptions[chanId] = message
@@ -60,7 +60,7 @@ class _BfxWebsocketBucket(object):
@_require_websocket_connection
async def _subscribe(self, channel, subId=None, **kwargs):
if len(self.subscriptions) + len(self.pendings) == _BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT:
if len(self.subscriptions) + len(self.pendings) == BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT:
raise TooManySubscriptions("The client has reached the maximum number of subscriptions.")
subscription = {

View File

@@ -4,15 +4,17 @@ from typing import Literal, TypeVar, Callable, cast
from pyee.asyncio import AsyncIOEventEmitter
from ._BfxWebsocketBucket import _HEARTBEAT, F, _require_websocket_connection, _BfxWebsocketBucket
from .bfx_websocket_bucket import _HEARTBEAT, F, _require_websocket_connection, BfxWebsocketBucket
from ._BfxWebsocketInputs import _BfxWebsocketInputs
from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler
from .exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported
from .bfx_websocket_inputs import BfxWebsocketInputs
from ..handlers import PublicChannelsHandler, AuthenticatedChannelsHandler
from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported
from ..utils.JSONEncoder import JSONEncoder
from ..enums import Channels
from ..utils.logger import Formatter, CustomLogger
from ...utils.JSONEncoder import JSONEncoder
from ...utils.logger import Formatter, CustomLogger
def _require_websocket_authentication(function: F) -> F:
async def wrapper(self, *args, **kwargs):
@@ -24,7 +26,7 @@ def _require_websocket_authentication(function: F) -> F:
return cast(F, wrapper)
class BfxWebsocketClient(object):
VERSION = _BfxWebsocketBucket.VERSION
VERSION = BfxWebsocketBucket.VERSION
MAXIMUM_BUCKETS_AMOUNT = 20
@@ -46,9 +48,9 @@ class BfxWebsocketClient(object):
self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter)
self.buckets = [ _BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ]
self.buckets = [ BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ]
self.inputs = _BfxWebsocketInputs(self.__handle_websocket_input)
self.inputs = BfxWebsocketInputs(self.__handle_websocket_input)
self.logger = CustomLogger("BfxWebsocketClient", logLevel=log_level)

View File

@@ -2,10 +2,10 @@ from decimal import Decimal
from datetime import datetime
from typing import Union, Optional, List, Tuple
from .types import JSON
from .enums import OrderType, FundingOfferType
from ..types import JSON
from ..enums import OrderType, FundingOfferType
class _BfxWebsocketInputs(object):
class BfxWebsocketInputs(object):
def __init__(self, __handle_websocket_input):
self.__handle_websocket_input = __handle_websocket_input

View File

@@ -0,0 +1,3 @@
from .public_channels_handler import PublicChannelsHandler
from .authenticated_channels_handler import AuthenticatedChannelsHandler
NAME = "handlers"

View File

@@ -0,0 +1,71 @@
from typing import List
from ..types import *
from .. import serializers
from ..exceptions import BfxWebsocketException
class AuthenticatedChannelsHandler(object):
__abbreviations = {
"os": "order_snapshot", "on": "order_new", "ou": "order_update", "oc": "order_cancel",
"ps": "position_snapshot", "pn": "position_new", "pu": "position_update", "pc": "position_close",
"te": "trade_executed", "tu": "trade_execution_update",
"fos": "funding_offer_snapshot", "fon": "funding_offer_new", "fou": "funding_offer_update", "foc": "funding_offer_cancel",
"fcs": "funding_credit_snapshot", "fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close",
"fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update", "flc": "funding_loan_close",
"ws": "wallet_snapshot", "wu": "wallet_update",
"bu": "balance_update",
}
__serializers = {
("os", "on", "ou", "oc",): serializers.Order,
("ps", "pn", "pu", "pc",): serializers.Position,
("te", "tu"): serializers.Trade,
("fos", "fon", "fou", "foc",): serializers.FundingOffer,
("fcs", "fcn", "fcu", "fcc",): serializers.FundingCredit,
("fls", "fln", "flu", "flc",): serializers.FundingLoan,
("ws", "wu",): serializers.Wallet,
("bu",): serializers.Balance
}
EVENTS = [
"notification",
"on-req-notification", "ou-req-notification", "oc-req-notification",
"oc_multi-notification",
"fon-req-notification", "foc-req-notification",
*list(__abbreviations.values())
]
def __init__(self, event_emitter, strict = False):
self.event_emitter, self.strict = event_emitter, strict
def handle(self, type, stream):
if type == "n":
return self.__notification(stream)
for types, serializer in AuthenticatedChannelsHandler.__serializers.items():
if type in types:
event = AuthenticatedChannelsHandler.__abbreviations[type]
if all(isinstance(substream, list) for substream in stream):
return self.event_emitter.emit(event, [ serializer.parse(*substream) for substream in stream ])
return self.event_emitter.emit(event, serializer.parse(*stream))
if self.strict == True:
raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.")
def __notification(self, stream):
type, serializer = "notification", serializers._Notification(serializer=None)
if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req":
type, serializer = f"{stream[1]}-notification", serializers._Notification[Order](serializer=serializers.Order)
if stream[1] == "oc_multi-req":
type, serializer = f"{stream[1]}-notification", serializers._Notification[List[Order]](serializer=serializers.Order, iterate=True)
if stream[1] == "fon-req" or stream[1] == "foc-req":
type, serializer = f"{stream[1]}-notification", serializers._Notification[FundingOffer](serializer=serializers.FundingOffer)
return self.event_emitter.emit(type, serializer.parse(*stream))

View File

@@ -1,10 +1,8 @@
from typing import List
from ..types import *
from .types import *
from .. import serializers
from . import serializers
from .enums import Channels
from .exceptions import BfxWebsocketException
from ..enums import Channels
class PublicChannelsHandler(object):
EVENTS = [
@@ -118,67 +116,3 @@ class PublicChannelsHandler(object):
subscription,
serializers.DerivativesStatus.parse(*stream[0])
)
class AuthenticatedChannelsHandler(object):
__abbreviations = {
"os": "order_snapshot", "on": "order_new", "ou": "order_update", "oc": "order_cancel",
"ps": "position_snapshot", "pn": "position_new", "pu": "position_update", "pc": "position_close",
"te": "trade_executed", "tu": "trade_execution_update",
"fos": "funding_offer_snapshot", "fon": "funding_offer_new", "fou": "funding_offer_update", "foc": "funding_offer_cancel",
"fcs": "funding_credit_snapshot", "fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close",
"fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update", "flc": "funding_loan_close",
"ws": "wallet_snapshot", "wu": "wallet_update",
"bu": "balance_update",
}
__serializers = {
("os", "on", "ou", "oc",): serializers.Order,
("ps", "pn", "pu", "pc",): serializers.Position,
("te", "tu"): serializers.Trade,
("fos", "fon", "fou", "foc",): serializers.FundingOffer,
("fcs", "fcn", "fcu", "fcc",): serializers.FundingCredit,
("fls", "fln", "flu", "flc",): serializers.FundingLoan,
("ws", "wu",): serializers.Wallet,
("bu",): serializers.Balance
}
EVENTS = [
"notification",
"on-req-notification", "ou-req-notification", "oc-req-notification",
"oc_multi-notification",
"fon-req-notification", "foc-req-notification",
*list(__abbreviations.values())
]
def __init__(self, event_emitter, strict = False):
self.event_emitter, self.strict = event_emitter, strict
def handle(self, type, stream):
if type == "n":
return self.__notification(stream)
for types, serializer in AuthenticatedChannelsHandler.__serializers.items():
if type in types:
event = AuthenticatedChannelsHandler.__abbreviations[type]
if all(isinstance(substream, list) for substream in stream):
return self.event_emitter.emit(event, [ serializer.parse(*substream) for substream in stream ])
return self.event_emitter.emit(event, serializer.parse(*stream))
if self.strict == True:
raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.")
def __notification(self, stream):
type, serializer = "notification", serializers._Notification(serializer=None)
if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req":
type, serializer = f"{stream[1]}-notification", serializers._Notification[Order](serializer=serializers.Order)
if stream[1] == "oc_multi-req":
type, serializer = f"{stream[1]}-notification", serializers._Notification[List[Order]](serializer=serializers.Order, iterate=True)
if stream[1] == "fon-req" or stream[1] == "foc-req":
type, serializer = f"{stream[1]}-notification", serializers._Notification[FundingOffer](serializer=serializers.FundingOffer)
return self.event_emitter.emit(type, serializer.parse(*stream))

View File

@@ -1,6 +1,8 @@
from typing import TypedDict, Optional
from typing import TypedDict, Union, Optional
__all__ = [
"Subscription",
"Ticker",
"Trades",
"Book",
@@ -8,6 +10,10 @@ __all__ = [
"Status"
]
_Header = TypedDict("_Header", { "event": str, "channel": str, "subId": str })
Subscription = Union["Ticker", "Trades", "Book", "Candles", "Status"]
class Ticker(TypedDict):
chanId: int; symbol: str
pair: Optional[str]