From 8a1632d3c21812f900348b0fa3d0c5a2325e8e06 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Sun, 1 Oct 2023 21:27:46 +0200 Subject: [PATCH] Write new implementation for class BfxEventEmitter (bfxapi.websocket._event_emitter). --- .pylintrc | 2 +- README.md | 5 -- .../websocket/_client/bfx_websocket_client.py | 62 ++++----------- .../_event_emitter/bfx_event_emitter.py | 78 +++++++++++++++---- bfxapi/websocket/_handlers/__init__.py | 1 + .../_handlers/auth_events_handler.py | 34 ++------ .../_handlers/public_channels_handler.py | 17 ---- bfxapi/websocket/exceptions.py | 4 +- 8 files changed, 87 insertions(+), 116 deletions(-) diff --git a/.pylintrc b/.pylintrc index 996e616..1f1a19b 100644 --- a/.pylintrc +++ b/.pylintrc @@ -24,7 +24,7 @@ max-line-length=120 expected-line-ending-format=LF [BASIC] -good-names=id,on,pl,t,ip,tf,A,B,C,D,E,F +good-names=t,f,id,ip,on,pl,tf,A,B,C,D,E,F [TYPECHECK] generated-members=websockets diff --git a/README.md b/README.md index 4c181ce..68500c0 100644 --- a/README.md +++ b/README.md @@ -242,11 +242,6 @@ The same can be done without using decorators: bfx.wss.on("candles_update", callback=on_candles_update) ``` -You can pass any number of events to register for the same callback function: -```python -bfx.wss.on("t_ticker_update", "f_ticker_update", callback=on_ticker_update) -``` - # Advanced features ## Using custom notifications diff --git a/bfxapi/websocket/_client/bfx_websocket_client.py b/bfxapi/websocket/_client/bfx_websocket_client.py index 178268c..cd95c76 100644 --- a/bfxapi/websocket/_client/bfx_websocket_client.py +++ b/bfxapi/websocket/_client/bfx_websocket_client.py @@ -1,7 +1,7 @@ from typing import \ - TYPE_CHECKING, TypeVar, TypedDict,\ - Callable, Optional, Tuple, \ - List, Dict, Any + TYPE_CHECKING, TypedDict, List, \ + Dict, Optional, Any, \ + no_type_check from logging import Logger from datetime import datetime @@ -20,19 +20,16 @@ from websockets.legacy.client import \ connect as _websockets__connect from bfxapi._utils.json_encoder import JSONEncoder -from bfxapi.websocket._connection import Connection -from bfxapi.websocket._event_emitter import BfxEventEmitter -from bfxapi.websocket._handlers import \ - PublicChannelsHandler, \ - AuthEventsHandler +from bfxapi.websocket._connection import Connection +from bfxapi.websocket._handlers import AuthEventsHandler +from bfxapi.websocket._event_emitter import BfxEventEmitter from bfxapi.websocket.exceptions import \ InvalidAuthenticationCredentials, \ ReconnectionTimeoutError, \ OutdatedClientVersion, \ - ZeroConnectionsError, \ - EventNotSupported + ZeroConnectionsError from .bfx_websocket_bucket import BfxWebSocketBucket @@ -43,8 +40,6 @@ if TYPE_CHECKING: from asyncio import Task - _T = TypeVar("_T", bound=Callable[..., None]) - _Reconnection = TypedDict("_Reconnection", { "attempts": int, "reason": str, "timestamp": datetime }) @@ -55,18 +50,6 @@ class BfxWebSocketClient(Connection, Connection.Authenticable): MAXIMUM_CONNECTIONS_AMOUNT = 20 - __ONCE_EVENTS = [ - "open", "authenticated", "disconnection", - *AuthEventsHandler.ONCE_EVENTS - ] - - EVENTS = [ - "subscribed", "wss-error", - *__ONCE_EVENTS, - *PublicChannelsHandler.EVENTS, - *AuthEventsHandler.ON_EVENTS - ] - def __init__(self, host: str, *, @@ -82,9 +65,7 @@ class BfxWebSocketClient(Connection, Connection.Authenticable): self.__reconnection: Optional[_Reconnection] = None - self.__event_emitter = BfxEventEmitter(targets = \ - PublicChannelsHandler.ONCE_PER_SUBSCRIPTION + \ - ["subscribed"]) + self.__event_emitter = BfxEventEmitter(loop=None) self.__handler = AuthEventsHandler( \ event_emitter=self.__event_emitter) @@ -92,7 +73,7 @@ class BfxWebSocketClient(Connection, Connection.Authenticable): self.__inputs = BfxWebSocketInputs( \ handle_websocket_input=self.__handle_websocket_input) - @self.__event_emitter.on("error") + @self.__event_emitter.listens_to("error") def error(exception: Exception) -> None: header = f"{type(exception).__name__}: {str(exception)}" @@ -123,7 +104,7 @@ class BfxWebSocketClient(Connection, Connection.Authenticable): _bucket = BfxWebSocketBucket( \ self._host, self.__event_emitter) - self.__buckets.update( { _bucket: None }) + self.__buckets.update({ _bucket: None }) await self.__connect() @@ -340,26 +321,9 @@ class BfxWebSocketClient(Connection, Connection.Authenticable): await self._websocket.send(json.dumps(\ [ 0, event, None, data], cls=JSONEncoder)) - def on(self, *events: str, callback: Optional["_T"] = None) -> Callable[["_T"], None]: - for event in events: - if event not in BfxWebSocketClient.EVENTS: - raise EventNotSupported(f"Event <{event}> is not supported. To get a list " \ - "of available events see BfxWebSocketClient.EVENTS.") - - def _register_events(function: "_T", events: Tuple[str, ...]) -> None: - for event in events: - if event in BfxWebSocketClient.__ONCE_EVENTS: - self.__event_emitter.once(event, function) - else: - self.__event_emitter.on(event, function) - - if callback: - _register_events(callback, events) - - def _handler(function: "_T") -> None: - _register_events(function, events) - - return _handler + @no_type_check + def on(self, event, f = None): + return self.__event_emitter.on(event, f=f) @staticmethod def __build_authentication_message(api_key: str, diff --git a/bfxapi/websocket/_event_emitter/bfx_event_emitter.py b/bfxapi/websocket/_event_emitter/bfx_event_emitter.py index 9ebabd2..ec594ab 100644 --- a/bfxapi/websocket/_event_emitter/bfx_event_emitter.py +++ b/bfxapi/websocket/_event_emitter/bfx_event_emitter.py @@ -1,37 +1,83 @@ from typing import \ - TYPE_CHECKING, List, Dict, Any + Callable, List, Dict, \ + Optional, Any from collections import defaultdict - +from asyncio import AbstractEventLoop from pyee.asyncio import AsyncIOEventEmitter -if TYPE_CHECKING: - from bfxapi.websocket.subscriptions import Subscription +from bfxapi.websocket.exceptions import UnknownEventError + +_ONCE_PER_CONNECTION = [ + "open", "authenticated", "disconnection", + "order_snapshot", "position_snapshot", "funding_offer_snapshot", + "funding_credit_snapshot", "funding_loan_snapshot", "wallet_snapshot" +] + +_ONCE_PER_SUBSCRIPTION = [ + "subscribed", "t_trades_snapshot", "f_trades_snapshot", + "t_book_snapshot", "f_book_snapshot", "t_raw_book_snapshot", + "f_raw_book_snapshot", "candles_snapshot" +] + +_COMMON = [ + "error", "wss-error", "t_ticker_update", + "f_ticker_update", "t_trade_execution", "t_trade_execution_update", + "f_trade_execution", "f_trade_execution_update", "t_book_update", + "f_book_update", "t_raw_book_update", "f_raw_book_update", + "candles_update", "derivatives_status_update", "liquidation_feed_update", + "order_new", "order_update", "order_cancel", + "position_new", "position_update", "position_close", + "funding_offer_new", "funding_offer_update", "funding_offer_cancel", + "funding_credit_new", "funding_credit_update", "funding_credit_close", + "funding_loan_new", "funding_loan_update", "funding_loan_close", + "trade_execution", "trade_execution_update", "wallet_update", + "notification", "on-req-notification", "ou-req-notification", + "oc-req-notification", "fon-req-notification", "foc-req-notification" +] class BfxEventEmitter(AsyncIOEventEmitter): - def __init__(self, targets: List[str]) -> None: - super().__init__() + _EVENTS = _ONCE_PER_CONNECTION + \ + _ONCE_PER_SUBSCRIPTION + \ + _COMMON - self.__targets = targets + def __init__(self, loop: Optional[AbstractEventLoop] = None) -> None: + super().__init__(loop) - self.__log: Dict[str, List[str]] = \ + self._connection: List[str] = [ ] + + self._subscriptions: Dict[str, List[str]] = \ defaultdict(lambda: [ ]) def emit(self, event: str, *args: Any, **kwargs: Any) -> bool: - if event in self.__targets: - subscription: "Subscription" = args[0] + if event in _ONCE_PER_CONNECTION: + if event in self._connection: + return self._has_listeners(event) - sub_id = subscription["subId"] + self._connection += [ event ] - if event in self.__log[sub_id]: - with self._lock: - listeners = self._events.get(event) + if event in _ONCE_PER_SUBSCRIPTION: + sub_id = args[0]["subId"] - return bool(listeners) + if event in self._subscriptions[sub_id]: + return self._has_listeners(event) - self.__log[sub_id] += [ event ] + self._subscriptions[sub_id] += [ event ] return super().emit(event, *args, **kwargs) + + def _add_event_handler(self, event: str, k: Callable, v: Callable): + if event not in BfxEventEmitter._EVENTS: + raise UnknownEventError(f"Can't register to unknown event: <{event}> " + \ + "(to get a full list of available events see https://docs.bitfinex.com/).") + + super()._add_event_handler(event, k, v) + + def _has_listeners(self, event: str) -> bool: + with self._lock: + listeners = self._events.get(event) + + return bool(listeners) diff --git a/bfxapi/websocket/_handlers/__init__.py b/bfxapi/websocket/_handlers/__init__.py index 23f5aad..12a1dd1 100644 --- a/bfxapi/websocket/_handlers/__init__.py +++ b/bfxapi/websocket/_handlers/__init__.py @@ -1,2 +1,3 @@ from .public_channels_handler import PublicChannelsHandler + from .auth_events_handler import AuthEventsHandler diff --git a/bfxapi/websocket/_handlers/auth_events_handler.py b/bfxapi/websocket/_handlers/auth_events_handler.py index 7028043..18940f2 100644 --- a/bfxapi/websocket/_handlers/auth_events_handler.py +++ b/bfxapi/websocket/_handlers/auth_events_handler.py @@ -12,35 +12,17 @@ if TYPE_CHECKING: from pyee.base import EventEmitter class AuthEventsHandler: - __ONCE_ABBREVIATIONS = { - "os": "order_snapshot", "ps": "position_snapshot", "fos": "funding_offer_snapshot", - "fcs": "funding_credit_snapshot", "fls": "funding_loan_snapshot", "ws": "wallet_snapshot" - } - - __ON_ABBREVIATIONS = { - "on": "order_new", "ou": "order_update", "oc": "order_cancel", - "pn": "position_new", "pu": "position_update", "pc": "position_close", - "fon": "funding_offer_new", "fou": "funding_offer_update", "foc": "funding_offer_cancel", - "fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close", - "fln": "funding_loan_new", "flu": "funding_loan_update", "flc": "funding_loan_close", - "te": "trade_execution", "tu": "trade_execution_update", "wu": "wallet_update" - } - __ABBREVIATIONS = { - **__ONCE_ABBREVIATIONS, - **__ON_ABBREVIATIONS + "os": "order_snapshot", "on": "order_new", "ou": "order_update", + "oc": "order_cancel", "ps": "position_snapshot", "pn": "position_new", + "pu": "position_update", "pc": "position_close", "te": "trade_execution", + "tu": "trade_execution_update", "fos": "funding_offer_snapshot", "fon": "funding_offer_new", + "fou": "funding_offer_update", "foc": "funding_offer_cancel", "fcs": "funding_credit_snapshot", + "fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close", + "fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update", + "flc": "funding_loan_close", "ws": "wallet_snapshot", "wu": "wallet_update" } - ONCE_EVENTS = [ - *list(__ONCE_ABBREVIATIONS.values()) - ] - - ON_EVENTS = [ - *list(__ON_ABBREVIATIONS.values()), - "notification", "on-req-notification", "ou-req-notification", - "oc-req-notification", "fon-req-notification", "foc-req-notification" - ] - def __init__(self, event_emitter: "EventEmitter") -> None: self.__event_emitter = event_emitter diff --git a/bfxapi/websocket/_handlers/public_channels_handler.py b/bfxapi/websocket/_handlers/public_channels_handler.py index 0bcd805..5bc22f2 100644 --- a/bfxapi/websocket/_handlers/public_channels_handler.py +++ b/bfxapi/websocket/_handlers/public_channels_handler.py @@ -15,23 +15,6 @@ if TYPE_CHECKING: _CHECKSUM = "cs" class PublicChannelsHandler: - ONCE_PER_SUBSCRIPTION = [ - "t_trades_snapshot", "f_trades_snapshot", "t_book_snapshot", - "f_book_snapshot", "t_raw_book_snapshot", "f_raw_book_snapshot", - "candles_snapshot" - ] - - EVENTS = [ - *ONCE_PER_SUBSCRIPTION, - "t_ticker_update", "f_ticker_update", "t_trade_execution", - "t_trade_execution_update", "f_trade_execution", "f_trade_execution_update", - "t_book_update", "f_book_update", "t_raw_book_update", - "f_raw_book_update", "candles_update", "derivatives_status_update", - "liquidation_feed_update", - - "checksum" - ] - def __init__(self, event_emitter: "EventEmitter") -> None: self.__event_emitter = event_emitter diff --git a/bfxapi/websocket/exceptions.py b/bfxapi/websocket/exceptions.py index 6ed6f3f..6469723 100644 --- a/bfxapi/websocket/exceptions.py +++ b/bfxapi/websocket/exceptions.py @@ -9,7 +9,7 @@ __all__ = [ "ReconnectionTimeoutError", "ActionRequiresAuthentication", "InvalidAuthenticationCredentials", - "EventNotSupported", + "UnknownEventError", "OutdatedClientVersion" ] @@ -48,7 +48,7 @@ class InvalidAuthenticationCredentials(BfxWebSocketException): This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication. """ -class EventNotSupported(BfxWebSocketException): +class UnknownEventError(BfxWebSocketException): """ This error indicates a failed attempt to subscribe to an event not supported by the BfxWebSocketClient. """