Fix bug in module bfxapi.websocket._event_emitter.

This commit is contained in:
Davide Casale
2023-10-26 05:57:10 +02:00
parent 2734ff9e1a
commit c02d6d7bf8
2 changed files with 29 additions and 22 deletions

View File

@@ -335,4 +335,4 @@ class BfxWebSocketClient(Connection):
[ 0, event, None, data], cls=JSONEncoder)) [ 0, event, None, data], cls=JSONEncoder))
def on(self, event, f = None): def on(self, event, f = None):
return self.__event_emitter.on(event, f=f) return self.__event_emitter.on(event, f)

View File

@@ -1,6 +1,7 @@
from typing import \ from typing import \
Callable, List, Dict, \ TypeVar, Callable, List, \
Optional, Any Dict, Union, Optional, \
Any
from collections import defaultdict from collections import defaultdict
from asyncio import AbstractEventLoop from asyncio import AbstractEventLoop
@@ -8,6 +9,8 @@ from pyee.asyncio import AsyncIOEventEmitter
from bfxapi.websocket.exceptions import UnknownEventError from bfxapi.websocket.exceptions import UnknownEventError
_Handler = TypeVar("_Handler", bound=Callable[..., None])
_ONCE_PER_CONNECTION = [ _ONCE_PER_CONNECTION = [
"open", "authenticated", "order_snapshot", "open", "authenticated", "order_snapshot",
"position_snapshot", "funding_offer_snapshot", "funding_credit_snapshot", "position_snapshot", "funding_offer_snapshot", "funding_credit_snapshot",
@@ -21,19 +24,19 @@ _ONCE_PER_SUBSCRIPTION = [
] ]
_COMMON = [ _COMMON = [
"error", "disconnected", "t_ticker_update", "disconnected", "t_ticker_update", "f_ticker_update",
"f_ticker_update", "t_trade_execution", "t_trade_execution_update", "t_trade_execution", "t_trade_execution_update", "f_trade_execution",
"f_trade_execution", "f_trade_execution_update", "t_book_update", "f_trade_execution_update", "t_book_update", "f_book_update",
"f_book_update", "t_raw_book_update", "f_raw_book_update", "t_raw_book_update", "f_raw_book_update", "candles_update",
"candles_update", "derivatives_status_update", "liquidation_feed_update", "derivatives_status_update", "liquidation_feed_update", "order_new",
"order_new", "order_update", "order_cancel", "order_update", "order_cancel", "position_new",
"position_new", "position_update", "position_close", "position_update", "position_close", "funding_offer_new",
"funding_offer_new", "funding_offer_update", "funding_offer_cancel", "funding_offer_update", "funding_offer_cancel", "funding_credit_new",
"funding_credit_new", "funding_credit_update", "funding_credit_close", "funding_credit_update", "funding_credit_close", "funding_loan_new",
"funding_loan_new", "funding_loan_update", "funding_loan_close", "funding_loan_update", "funding_loan_close", "trade_execution",
"trade_execution", "trade_execution_update", "wallet_update", "trade_execution_update", "wallet_update", "notification",
"notification", "on-req-notification", "ou-req-notification", "on-req-notification", "ou-req-notification", "oc-req-notification",
"oc-req-notification", "fon-req-notification", "foc-req-notification" "fon-req-notification", "foc-req-notification"
] ]
class BfxEventEmitter(AsyncIOEventEmitter): class BfxEventEmitter(AsyncIOEventEmitter):
@@ -49,10 +52,12 @@ class BfxEventEmitter(AsyncIOEventEmitter):
self._subscriptions: Dict[str, List[str]] = \ self._subscriptions: Dict[str, List[str]] = \
defaultdict(lambda: [ ]) defaultdict(lambda: [ ])
def emit(self, def emit(
event: str, self,
*args: Any, event: str,
**kwargs: Any) -> bool: *args: Any,
**kwargs: Any
) -> bool:
if event in _ONCE_PER_CONNECTION: if event in _ONCE_PER_CONNECTION:
if event in self._connection: if event in self._connection:
return self._has_listeners(event) return self._has_listeners(event)
@@ -69,12 +74,14 @@ class BfxEventEmitter(AsyncIOEventEmitter):
return super().emit(event, *args, **kwargs) return super().emit(event, *args, **kwargs)
def _add_event_handler(self, event: str, k: Callable, v: Callable): def on(
self, event: str, f: Optional[_Handler] = None
) -> Union[_Handler, Callable[[_Handler], _Handler]]:
if event not in BfxEventEmitter._EVENTS: if event not in BfxEventEmitter._EVENTS:
raise UnknownEventError(f"Can't register to unknown event: <{event}> " + \ raise UnknownEventError(f"Can't register to unknown event: <{event}> " + \
"(to get a full list of available events see https://docs.bitfinex.com/).") "(to get a full list of available events see https://docs.bitfinex.com/).")
super()._add_event_handler(event, k, v) return super().on(event, f)
def _has_listeners(self, event: str) -> bool: def _has_listeners(self, event: str) -> bool:
with self._lock: with self._lock: