From a03a82d57a6dfdc446574c3ce849dfc305274793 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Fri, 11 Nov 2022 18:54:38 +0100 Subject: [PATCH] Add support for various new authenticated channels. Add new typings in bfxapi/websocket/typings.py. Add BfxWebsocketException in bfxapi/websocket/errors.py. --- bfxapi/websocket/BfxWebsocketClient.py | 4 +- bfxapi/websocket/__init__.py | 2 +- bfxapi/websocket/errors.py | 17 +- bfxapi/websocket/handlers.py | 370 +++++++++++++++++++++++-- bfxapi/websocket/typings.py | 135 ++++++++- 5 files changed, 483 insertions(+), 45 deletions(-) diff --git a/bfxapi/websocket/BfxWebsocketClient.py b/bfxapi/websocket/BfxWebsocketClient.py index 49c9a82..9ba6b45 100644 --- a/bfxapi/websocket/BfxWebsocketClient.py +++ b/bfxapi/websocket/BfxWebsocketClient.py @@ -4,7 +4,7 @@ from pyee.asyncio import AsyncIOEventEmitter from .handlers import Channels, PublicChannelsHandler, AuthenticatedEventsHandler -from .errors import ConnectionNotOpen, AuthenticationCredentialsError +from .errors import BfxWebsocketException, ConnectionNotOpen, InvalidAuthenticationCredentials HEARTBEAT = "hb" @@ -42,7 +42,7 @@ class BfxWebsocketClient(object): elif isinstance(message, dict) and message["event"] == "auth": if message["status"] == "OK": self.event_emitter.emit("authenticated", message) - else: raise AuthenticationCredentialsError("Cannot authenticate with given API-KEY and API-SECRET.") + else: raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.") elif isinstance(message, list) and ((chanId := message[0]) or True) and message[1] != HEARTBEAT: if chanId == 0: self.handlers["authenticated"].handle(message[1], message[2]) diff --git a/bfxapi/websocket/__init__.py b/bfxapi/websocket/__init__.py index e18ac12..c3aee02 100644 --- a/bfxapi/websocket/__init__.py +++ b/bfxapi/websocket/__init__.py @@ -1,3 +1,3 @@ from .BfxWebsocketClient import BfxWebsocketClient from .handlers import Channels -from .errors import ConnectionNotOpen, AuthenticationCredentialsError \ No newline at end of file +from .errors import BfxWebsocketException, ConnectionNotOpen, InvalidAuthenticationCredentials \ No newline at end of file diff --git a/bfxapi/websocket/errors.py b/bfxapi/websocket/errors.py index 693dbe3..d8f4b03 100644 --- a/bfxapi/websocket/errors.py +++ b/bfxapi/websocket/errors.py @@ -1,11 +1,24 @@ -class ConnectionNotOpen(Exception): +__all__ = [ + "BfxWebsocketException", + "ConnectionNotOpen", + "InvalidAuthenticationCredentials" +] + +class BfxWebsocketException(Exception): + """ + Base class for all exceptions defined in bfx/websocket/errors.py. + """ + + pass + +class ConnectionNotOpen(BfxWebsocketException): """ This error indicates an attempt to communicate via websocket before starting the connection with the servers. """ pass -class AuthenticationCredentialsError(Exception): +class InvalidAuthenticationCredentials(BfxWebsocketException): """ This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication. """ diff --git a/bfxapi/websocket/handlers.py b/bfxapi/websocket/handlers.py index 7704bc7..a601aa6 100644 --- a/bfxapi/websocket/handlers.py +++ b/bfxapi/websocket/handlers.py @@ -1,5 +1,7 @@ from enum import Enum +from .errors import BfxWebsocketException + class Channels(str, Enum): TICKER = "ticker" TRADES = "trades" @@ -46,33 +48,45 @@ class PublicChannelsHandler(object): self.event_emitter.emit("status", subscription, parameters[0]) class AuthenticatedEventsHandler(object): - def __init__(self, event_emitter): - self.event_emitter = event_emitter + def __init__(self, event_emitter, strict = False): + self.event_emitter, self.strict = event_emitter, strict self.__handlers = { "bu": self.__bu_event_handler, "ws": self.__ws_event_handler, "wu": self.__wu_event_handler, "os": self.__os_event_handler, - "on": self.__on_event_handler + "on": self.__on_event_handler, + "ou": self.__ou_event_handler, + "oc": self.__oc_event_handler, + "ps": self.__ps_event_handler, + "pn": self.__pn_event_handler, + "pu": self.__pu_event_handler, + "pc": self.__pc_event_handler, + "fos": self.__fos_event_handler, + "fon": self.__fon_event_handler, + "fou": self.__fou_event_handler, + "foc": self.__foc_event_handler, } - def handle(self, type, parameters): + def handle(self, type, stream): if type in self.__handlers: - self.__handlers[type](*parameters) + self.__handlers[type](*stream) + elif self.strict == True: + raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.") - def __bu_event_handler(self, *parameters): - self.event_emitter.emit("balance_update", _label_array_elements( + def __bu_event_handler(self, *stream): + self.event_emitter.emit("balance_update", _label_stream_data( [ "AUM", "AUM_NET" ], - *parameters + *stream )) - def __ws_event_handler(self, *parameters): + def __ws_event_handler(self, *stream): self.event_emitter.emit("wallet_snapshot", [ - _label_array_elements( + _label_stream_data( [ "WALLET_TYPE", "CURRENCY", @@ -82,12 +96,12 @@ class AuthenticatedEventsHandler(object): "DESCRIPTION", "META" ], - *parameter - ) for parameter in parameters + *substream + ) for substream in stream ]) - def __wu_event_handler(self, *parameters): - self.event_emitter.emit("wallet_update", _label_array_elements( + def __wu_event_handler(self, *stream): + self.event_emitter.emit("wallet_update", _label_stream_data( [ "WALLET_TYPE", "CURRENCY", @@ -97,12 +111,12 @@ class AuthenticatedEventsHandler(object): "DESCRIPTION", "META" ], - *parameters + *stream )) - def __os_event_handler(self, *parameters): + def __os_event_handler(self, *stream): self.event_emitter.emit("order_snapshot", [ - _label_array_elements( + _label_stream_data( [ "ID", "GID", @@ -137,12 +151,12 @@ class AuthenticatedEventsHandler(object): "_PLACEHOLDER", "META" ], - *parameter - ) for parameter in parameters + *substream + ) for substream in stream ]) - def __on_event_handler(self, *parameters): - self.event_emitter.emit("new_order", _label_array_elements( + def __on_event_handler(self, *stream): + self.event_emitter.emit("new_order", _label_stream_data( [ "ID", "GID", @@ -177,13 +191,315 @@ class AuthenticatedEventsHandler(object): "_PLACEHOLDER", "_PLACEHOLDER" ], - *parameters + *stream )) -def _label_array_elements(labels, *args): + def __ou_event_handler(self, *stream): + self.event_emitter.emit("order_update", _label_stream_data( + [ + "ID", + "GID", + "CID", + "SYMBOL", + "MTS_CREATE", + "MTS_UPDATE", + "AMOUNT", + "AMOUNT_ORIG", + "ORDER_TYPE", + "TYPE_PREV", + "MTS_TIF", + "_PLACEHOLDER", + "FLAGS", + "ORDER_STATUS", + "_PLACEHOLDER", + "_PLACEHOLDER", + "PRICE", + "PRICE_AVG", + "PRICE_TRAILING", + "PRICE_AUX_LIMIT", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER", + "NOTIFY", + "HIDDEN", + "PLACED_ID", + "_PLACEHOLDER", + "_PLACEHOLDER", + "ROUTING", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER" + ], + *stream + )) + + def __oc_event_handler(self, *stream): + self.event_emitter.emit("order_cancel", _label_stream_data( + [ + "ID", + "GID", + "CID", + "SYMBOL", + "MTS_CREATE", + "MTS_UPDATE", + "AMOUNT", + "AMOUNT_ORIG", + "ORDER_TYPE", + "TYPE_PREV", + "MTS_TIF", + "_PLACEHOLDER", + "FLAGS", + "ORDER_STATUS", + "_PLACEHOLDER", + "_PLACEHOLDER", + "PRICE", + "PRICE_AVG", + "PRICE_TRAILING", + "PRICE_AUX_LIMIT", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER", + "NOTIFY", + "HIDDEN", + "PLACED_ID", + "_PLACEHOLDER", + "_PLACEHOLDER", + "ROUTING", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER" + ], + *stream + )) + + def __ps_event_handler(self, *stream): + self.event_emitter.emit("position_snapshot", [ + _label_stream_data( + [ + "SYMBOL", + "STATUS", + "AMOUNT", + "BASE_PRICE", + "MARGIN_FUNDING", + "MARGIN_FUNDING_TYPE", + "PL", + "PL_PERC", + "PRICE_LIQ", + "LEVERAGE", + "FLAG", + "POSITION_ID", + "MTS_CREATE", + "MTS_UPDATE", + "_PLACEHOLDER", + "TYPE", + "_PLACEHOLDER", + "COLLATERAL", + "COLLATERAL_MIN", + "META" + ], + *substream + ) + for substream in stream + ]) + + def __pn_event_handler(self, *stream): + self.event_emitter.emit("new_position", _label_stream_data( + [ + "SYMBOL", + "STATUS", + "AMOUNT", + "BASE_PRICE", + "MARGIN_FUNDING", + "MARGIN_FUNDING_TYPE", + "PL", + "PL_PERC", + "PRICE_LIQ", + "LEVERAGE", + "FLAG", + "POSITION_ID", + "MTS_CREATE", + "MTS_UPDATE", + "_PLACEHOLDER", + "TYPE", + "_PLACEHOLDER", + "COLLATERAL", + "COLLATERAL_MIN", + "META" + ], + *stream + )) + + def __pu_event_handler(self, *stream): + self.event_emitter.emit("position_update", _label_stream_data( + [ + "SYMBOL", + "STATUS", + "AMOUNT", + "BASE_PRICE", + "MARGIN_FUNDING", + "MARGIN_FUNDING_TYPE", + "PL", + "PL_PERC", + "PRICE_LIQ", + "LEVERAGE", + "FLAG", + "POSITION_ID", + "MTS_CREATE", + "MTS_UPDATE", + "_PLACEHOLDER", + "TYPE", + "_PLACEHOLDER", + "COLLATERAL", + "COLLATERAL_MIN", + "META" + ], + *stream + )) + + def __pc_event_handler(self, *stream): + self.event_emitter.emit("position_cancel", _label_stream_data( + [ + "SYMBOL", + "STATUS", + "AMOUNT", + "BASE_PRICE", + "MARGIN_FUNDING", + "MARGIN_FUNDING_TYPE", + "PL", + "PL_PERC", + "PRICE_LIQ", + "LEVERAGE", + "FLAG", + "POSITION_ID", + "MTS_CREATE", + "MTS_UPDATE", + "_PLACEHOLDER", + "TYPE", + "_PLACEHOLDER", + "COLLATERAL", + "COLLATERAL_MIN", + "META" + ], + *stream + )) + + def __fos_event_handler(self, *stream): + self.event_emitter.emit("funding_offer_snapshot", [ + _label_stream_data( + [ + "ID", + "SYMBOL", + "MTS_CREATED", + "MTS_UPDATED", + "AMOUNT", + "AMOUNT_ORIG", + "OFFER_TYPE", + "_PLACEHOLDER", + "_PLACEHOLDER", + "FLAGS", + "STATUS", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER", + "RATE", + "PERIOD", + "NOTIFY", + "HIDDEN", + "_PLACEHOLDER", + "RENEW", + "_PLACEHOLDER", + ], + *substream + ) + for substream in stream + ]) + + def __fon_event_handler(self, *stream): + self.event_emitter.emit("funding_offer_new", _label_stream_data( + [ + "ID", + "SYMBOL", + "MTS_CREATED", + "MTS_UPDATED", + "AMOUNT", + "AMOUNT_ORIG", + "TYPE", + "_PLACEHOLDER", + "_PLACEHOLDER", + "FLAGS", + "STATUS", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER", + "RATE", + "PERIOD", + "NOTIFY", + "HIDDEN", + "_PLACEHOLDER", + "RENEW", + "RATE_REAL" + ], + *stream + )) + + def __fou_event_handler(self, *stream): + self.event_emitter.emit("funding_offer_update", _label_stream_data( + [ + "ID", + "SYMBOL", + "MTS_CREATED", + "MTS_UPDATED", + "AMOUNT", + "AMOUNT_ORIG", + "TYPE", + "_PLACEHOLDER", + "_PLACEHOLDER", + "FLAGS", + "STATUS", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER", + "RATE", + "PERIOD", + "NOTIFY", + "HIDDEN", + "_PLACEHOLDER", + "RENEW", + "RATE_REAL" + ], + *stream + )) + + def __foc_event_handler(self, *stream): + self.event_emitter.emit("funding_offer_cancel", _label_stream_data( + [ + "ID", + "SYMBOL", + "MTS_CREATED", + "MTS_UPDATED", + "AMOUNT", + "AMOUNT_ORIG", + "TYPE", + "_PLACEHOLDER", + "_PLACEHOLDER", + "FLAGS", + "STATUS", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER", + "RATE", + "PERIOD", + "NOTIFY", + "HIDDEN", + "_PLACEHOLDER", + "RENEW", + "RATE_REAL" + ], + *stream + )) + +def _label_stream_data(labels, *args, IGNORE = [ "_PLACEHOLDER" ]): if len(labels) != len(args): - raise Exception(" and <*args> arguments should contain the same amount of elements.") + raise BfxWebsocketException(" and <*args> arguments should contain the same amount of elements.") - _PLACEHOLDER = "_PLACEHOLDER" - - return { label: args[index] for index, label in enumerate(labels) if label != _PLACEHOLDER } \ No newline at end of file + return { label: args[index] for index, label in enumerate(labels) if label not in IGNORE } \ No newline at end of file diff --git a/bfxapi/websocket/typings.py b/bfxapi/websocket/typings.py index f49c22c..8172da8 100644 --- a/bfxapi/websocket/typings.py +++ b/bfxapi/websocket/typings.py @@ -1,16 +1,125 @@ -from typing import TypedDict, List, Optional +from typing import Type, List, Dict, TypedDict, Union, Optional, Any -class BalanceUpdateStream(TypedDict): - AUM: float - AUM_NET: float +JSON = Union[Dict[str, Any], List[Any], int, str, float, bool, Type[None]] -class WalletUpdateStream(TypedDict): - WALLET_TYPE: str - CURRENCY: str - BALANCE: float - UNSETTLED_INTEREST: float - BALANCE_AVAILABLE: Optional[float] - DESCRIPTION: str - META: dict +BalanceUpdateStream = TypedDict("BalanceUpdateStream", { + "AUM": float, + "AUM_NET": float +}) -WalletSnapshotStream = List[WalletUpdateStream] \ No newline at end of file +WalletSnapshotStream = List[TypedDict("WalletSnapshotStream", { + "WALLET_TYPE": str, + "CURRENCY": str, + "BALANCE": float, + "UNSETTLED_INTEREST": float, + "BALANCE_AVAILABLE": Optional[float], + "DESCRIPTION": str, + "META": JSON +})] + +WalletUpdateStream = TypedDict("WalletUpdateStream", { + "WALLET_TYPE": str, + "CURRENCY": str, + "BALANCE": float, + "UNSETTLED_INTEREST": float, + "BALANCE_AVAILABLE": Optional[float], + "DESCRIPTION": str, + "META": JSON +}) + +OrderSnapshotStream = List[TypedDict("OrderSnapshotStream", { + "ID": int, + "GID": int, + "CID": int, + "SYMBOL": str, + "MTS_CREATE": int, + "MTS_UPDATE": int, + "AMOUNT": float, + "AMOUNT_ORIG": float, + "ORDER_TYPE": str, + "TYPE_PREV": str, + "MTS_TIF": int, + "FLAGS": int, + "STATUS": str, + "PRICE": float, + "PRICE_AVG": float, + "PRICE_TRAILING": float, + "PRICE_AUX_LIMIT": float, + "NOTIFY": int, + "HIDDEN": int, + "PLACED_ID": int, + "ROUTING": str, + "META": JSON +})] + +NewOrderStream = TypedDict("NewOrderStream", { + "ID": int, + "GID": int, + "CID": int, + "SYMBOL": str, + "MTS_CREATE": int, + "MTS_UPDATE": int, + "AMOUNT": float, + "AMOUNT_ORIG": float, + "ORDER_TYPE": str, + "TYPE_PREV": str, + "MTS_TIF": int, + "FLAGS": int, + "ORDER_STATUS": str, + "PRICE": float, + "PRICE_AVG": float, + "PRICE_TRAILING": float, + "PRICE_AUX_LIMIT": float, + "NOTIFY": int, + "HIDDEN": int, + "PLACED_ID": int, + "ROUTING": str +}) + +OrderUpdateStream = TypedDict("OrderUpdateStream", { + "ID": int, + "GID": int, + "CID": int, + "SYMBOL": str, + "MTS_CREATE": int, + "MTS_UPDATE": int, + "AMOUNT": float, + "AMOUNT_ORIG": float, + "ORDER_TYPE": str, + "TYPE_PREV": str, + "MTS_TIF": int, + "FLAGS": int, + "ORDER_STATUS": str, + "PRICE": float, + "PRICE_AVG": float, + "PRICE_TRAILING": float, + "PRICE_AUX_LIMIT": float, + "NOTIFY": int, + "HIDDEN": int, + "PLACED_ID": int, + "ROUTING": str +}) + +OrderCancelStream = TypedDict("OrderCancelStream", { + "ID": int, + "GID": int, + "CID": int, + "SYMBOL": str, + "MTS_CREATE": int, + "MTS_UPDATE": int, + "AMOUNT": float, + "AMOUNT_ORIG": float, + "ORDER_TYPE": str, + "TYPE_PREV": str, + "MTS_TIF": int, + "FLAGS": int, + "ORDER_STATUS": str, + "PRICE": float, + "PRICE_AVG": float, + "PRICE_TRAILING": float, + "PRICE_AUX_LIMIT": float, + "NOTIFY": int, + "HIDDEN": int, + "PLACED_ID": int, + "ROUTING": str +}) \ No newline at end of file