Write new implementation for class BfxEventEmitter (bfxapi.websocket._event_emitter).

This commit is contained in:
Davide Casale
2023-10-01 21:27:46 +02:00
parent f6c49f677d
commit 8a1632d3c2
8 changed files with 87 additions and 116 deletions

View File

@@ -24,7 +24,7 @@ max-line-length=120
expected-line-ending-format=LF
[BASIC]
good-names=id,on,pl,t,ip,tf,A,B,C,D,E,F
good-names=t,f,id,ip,on,pl,tf,A,B,C,D,E,F
[TYPECHECK]
generated-members=websockets

View File

@@ -242,11 +242,6 @@ The same can be done without using decorators:
bfx.wss.on("candles_update", callback=on_candles_update)
```
You can pass any number of events to register for the same callback function:
```python
bfx.wss.on("t_ticker_update", "f_ticker_update", callback=on_ticker_update)
```
# Advanced features
## Using custom notifications

View File

@@ -1,7 +1,7 @@
from typing import \
TYPE_CHECKING, TypeVar, TypedDict,\
Callable, Optional, Tuple, \
List, Dict, Any
TYPE_CHECKING, TypedDict, List, \
Dict, Optional, Any, \
no_type_check
from logging import Logger
from datetime import datetime
@@ -20,19 +20,16 @@ from websockets.legacy.client import \
connect as _websockets__connect
from bfxapi._utils.json_encoder import JSONEncoder
from bfxapi.websocket._connection import Connection
from bfxapi.websocket._event_emitter import BfxEventEmitter
from bfxapi.websocket._handlers import \
PublicChannelsHandler, \
AuthEventsHandler
from bfxapi.websocket._connection import Connection
from bfxapi.websocket._handlers import AuthEventsHandler
from bfxapi.websocket._event_emitter import BfxEventEmitter
from bfxapi.websocket.exceptions import \
InvalidAuthenticationCredentials, \
ReconnectionTimeoutError, \
OutdatedClientVersion, \
ZeroConnectionsError, \
EventNotSupported
ZeroConnectionsError
from .bfx_websocket_bucket import BfxWebSocketBucket
@@ -43,8 +40,6 @@ if TYPE_CHECKING:
from asyncio import Task
_T = TypeVar("_T", bound=Callable[..., None])
_Reconnection = TypedDict("_Reconnection",
{ "attempts": int, "reason": str, "timestamp": datetime })
@@ -55,18 +50,6 @@ class BfxWebSocketClient(Connection, Connection.Authenticable):
MAXIMUM_CONNECTIONS_AMOUNT = 20
__ONCE_EVENTS = [
"open", "authenticated", "disconnection",
*AuthEventsHandler.ONCE_EVENTS
]
EVENTS = [
"subscribed", "wss-error",
*__ONCE_EVENTS,
*PublicChannelsHandler.EVENTS,
*AuthEventsHandler.ON_EVENTS
]
def __init__(self,
host: str,
*,
@@ -82,9 +65,7 @@ class BfxWebSocketClient(Connection, Connection.Authenticable):
self.__reconnection: Optional[_Reconnection] = None
self.__event_emitter = BfxEventEmitter(targets = \
PublicChannelsHandler.ONCE_PER_SUBSCRIPTION + \
["subscribed"])
self.__event_emitter = BfxEventEmitter(loop=None)
self.__handler = AuthEventsHandler( \
event_emitter=self.__event_emitter)
@@ -92,7 +73,7 @@ class BfxWebSocketClient(Connection, Connection.Authenticable):
self.__inputs = BfxWebSocketInputs( \
handle_websocket_input=self.__handle_websocket_input)
@self.__event_emitter.on("error")
@self.__event_emitter.listens_to("error")
def error(exception: Exception) -> None:
header = f"{type(exception).__name__}: {str(exception)}"
@@ -123,7 +104,7 @@ class BfxWebSocketClient(Connection, Connection.Authenticable):
_bucket = BfxWebSocketBucket( \
self._host, self.__event_emitter)
self.__buckets.update( { _bucket: None })
self.__buckets.update({ _bucket: None })
await self.__connect()
@@ -340,26 +321,9 @@ class BfxWebSocketClient(Connection, Connection.Authenticable):
await self._websocket.send(json.dumps(\
[ 0, event, None, data], cls=JSONEncoder))
def on(self, *events: str, callback: Optional["_T"] = None) -> Callable[["_T"], None]:
for event in events:
if event not in BfxWebSocketClient.EVENTS:
raise EventNotSupported(f"Event <{event}> is not supported. To get a list " \
"of available events see BfxWebSocketClient.EVENTS.")
def _register_events(function: "_T", events: Tuple[str, ...]) -> None:
for event in events:
if event in BfxWebSocketClient.__ONCE_EVENTS:
self.__event_emitter.once(event, function)
else:
self.__event_emitter.on(event, function)
if callback:
_register_events(callback, events)
def _handler(function: "_T") -> None:
_register_events(function, events)
return _handler
@no_type_check
def on(self, event, f = None):
return self.__event_emitter.on(event, f=f)
@staticmethod
def __build_authentication_message(api_key: str,

View File

@@ -1,37 +1,83 @@
from typing import \
TYPE_CHECKING, List, Dict, Any
Callable, List, Dict, \
Optional, Any
from collections import defaultdict
from asyncio import AbstractEventLoop
from pyee.asyncio import AsyncIOEventEmitter
if TYPE_CHECKING:
from bfxapi.websocket.subscriptions import Subscription
from bfxapi.websocket.exceptions import UnknownEventError
_ONCE_PER_CONNECTION = [
"open", "authenticated", "disconnection",
"order_snapshot", "position_snapshot", "funding_offer_snapshot",
"funding_credit_snapshot", "funding_loan_snapshot", "wallet_snapshot"
]
_ONCE_PER_SUBSCRIPTION = [
"subscribed", "t_trades_snapshot", "f_trades_snapshot",
"t_book_snapshot", "f_book_snapshot", "t_raw_book_snapshot",
"f_raw_book_snapshot", "candles_snapshot"
]
_COMMON = [
"error", "wss-error", "t_ticker_update",
"f_ticker_update", "t_trade_execution", "t_trade_execution_update",
"f_trade_execution", "f_trade_execution_update", "t_book_update",
"f_book_update", "t_raw_book_update", "f_raw_book_update",
"candles_update", "derivatives_status_update", "liquidation_feed_update",
"order_new", "order_update", "order_cancel",
"position_new", "position_update", "position_close",
"funding_offer_new", "funding_offer_update", "funding_offer_cancel",
"funding_credit_new", "funding_credit_update", "funding_credit_close",
"funding_loan_new", "funding_loan_update", "funding_loan_close",
"trade_execution", "trade_execution_update", "wallet_update",
"notification", "on-req-notification", "ou-req-notification",
"oc-req-notification", "fon-req-notification", "foc-req-notification"
]
class BfxEventEmitter(AsyncIOEventEmitter):
def __init__(self, targets: List[str]) -> None:
super().__init__()
_EVENTS = _ONCE_PER_CONNECTION + \
_ONCE_PER_SUBSCRIPTION + \
_COMMON
self.__targets = targets
def __init__(self, loop: Optional[AbstractEventLoop] = None) -> None:
super().__init__(loop)
self.__log: Dict[str, List[str]] = \
self._connection: List[str] = [ ]
self._subscriptions: Dict[str, List[str]] = \
defaultdict(lambda: [ ])
def emit(self,
event: str,
*args: Any,
**kwargs: Any) -> bool:
if event in self.__targets:
subscription: "Subscription" = args[0]
if event in _ONCE_PER_CONNECTION:
if event in self._connection:
return self._has_listeners(event)
sub_id = subscription["subId"]
self._connection += [ event ]
if event in self.__log[sub_id]:
with self._lock:
listeners = self._events.get(event)
if event in _ONCE_PER_SUBSCRIPTION:
sub_id = args[0]["subId"]
return bool(listeners)
if event in self._subscriptions[sub_id]:
return self._has_listeners(event)
self.__log[sub_id] += [ event ]
self._subscriptions[sub_id] += [ event ]
return super().emit(event, *args, **kwargs)
def _add_event_handler(self, event: str, k: Callable, v: Callable):
if event not in BfxEventEmitter._EVENTS:
raise UnknownEventError(f"Can't register to unknown event: <{event}> " + \
"(to get a full list of available events see https://docs.bitfinex.com/).")
super()._add_event_handler(event, k, v)
def _has_listeners(self, event: str) -> bool:
with self._lock:
listeners = self._events.get(event)
return bool(listeners)

View File

@@ -1,2 +1,3 @@
from .public_channels_handler import PublicChannelsHandler
from .auth_events_handler import AuthEventsHandler

View File

@@ -12,35 +12,17 @@ if TYPE_CHECKING:
from pyee.base import EventEmitter
class AuthEventsHandler:
__ONCE_ABBREVIATIONS = {
"os": "order_snapshot", "ps": "position_snapshot", "fos": "funding_offer_snapshot",
"fcs": "funding_credit_snapshot", "fls": "funding_loan_snapshot", "ws": "wallet_snapshot"
}
__ON_ABBREVIATIONS = {
"on": "order_new", "ou": "order_update", "oc": "order_cancel",
"pn": "position_new", "pu": "position_update", "pc": "position_close",
"fon": "funding_offer_new", "fou": "funding_offer_update", "foc": "funding_offer_cancel",
"fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close",
"fln": "funding_loan_new", "flu": "funding_loan_update", "flc": "funding_loan_close",
"te": "trade_execution", "tu": "trade_execution_update", "wu": "wallet_update"
}
__ABBREVIATIONS = {
**__ONCE_ABBREVIATIONS,
**__ON_ABBREVIATIONS
"os": "order_snapshot", "on": "order_new", "ou": "order_update",
"oc": "order_cancel", "ps": "position_snapshot", "pn": "position_new",
"pu": "position_update", "pc": "position_close", "te": "trade_execution",
"tu": "trade_execution_update", "fos": "funding_offer_snapshot", "fon": "funding_offer_new",
"fou": "funding_offer_update", "foc": "funding_offer_cancel", "fcs": "funding_credit_snapshot",
"fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close",
"fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update",
"flc": "funding_loan_close", "ws": "wallet_snapshot", "wu": "wallet_update"
}
ONCE_EVENTS = [
*list(__ONCE_ABBREVIATIONS.values())
]
ON_EVENTS = [
*list(__ON_ABBREVIATIONS.values()),
"notification", "on-req-notification", "ou-req-notification",
"oc-req-notification", "fon-req-notification", "foc-req-notification"
]
def __init__(self, event_emitter: "EventEmitter") -> None:
self.__event_emitter = event_emitter

View File

@@ -15,23 +15,6 @@ if TYPE_CHECKING:
_CHECKSUM = "cs"
class PublicChannelsHandler:
ONCE_PER_SUBSCRIPTION = [
"t_trades_snapshot", "f_trades_snapshot", "t_book_snapshot",
"f_book_snapshot", "t_raw_book_snapshot", "f_raw_book_snapshot",
"candles_snapshot"
]
EVENTS = [
*ONCE_PER_SUBSCRIPTION,
"t_ticker_update", "f_ticker_update", "t_trade_execution",
"t_trade_execution_update", "f_trade_execution", "f_trade_execution_update",
"t_book_update", "f_book_update", "t_raw_book_update",
"f_raw_book_update", "candles_update", "derivatives_status_update",
"liquidation_feed_update",
"checksum"
]
def __init__(self, event_emitter: "EventEmitter") -> None:
self.__event_emitter = event_emitter

View File

@@ -9,7 +9,7 @@ __all__ = [
"ReconnectionTimeoutError",
"ActionRequiresAuthentication",
"InvalidAuthenticationCredentials",
"EventNotSupported",
"UnknownEventError",
"OutdatedClientVersion"
]
@@ -48,7 +48,7 @@ class InvalidAuthenticationCredentials(BfxWebSocketException):
This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication.
"""
class EventNotSupported(BfxWebSocketException):
class UnknownEventError(BfxWebSocketException):
"""
This error indicates a failed attempt to subscribe to an event not supported by the BfxWebSocketClient.
"""