From c8290f144bf5f40203a0ce834a90340dcbc23689 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Fri, 19 May 2023 22:13:15 +0200 Subject: [PATCH] Upgrade to Mypy 1.3.0 (old: 0.991). Fix compatibility problems with Mypy. Add type hints to bfxapi.websocket.handlers. --- .../rest/endpoints/rest_public_endpoints.py | 6 +- bfxapi/types/serializers.py | 3 +- .../websocket/client/bfx_websocket_bucket.py | 2 +- .../websocket/handlers/auth_events_handler.py | 62 ++++--- .../handlers/public_channels_handler.py | 172 +++++++++--------- bfxapi/websocket/subscriptions.py | 49 +++-- dev-requirements.txt | Bin 600 -> 600 bytes 7 files changed, 161 insertions(+), 133 deletions(-) diff --git a/bfxapi/rest/endpoints/rest_public_endpoints.py b/bfxapi/rest/endpoints/rest_public_endpoints.py index 99cb725..e1c20ff 100644 --- a/bfxapi/rest/endpoints/rest_public_endpoints.py +++ b/bfxapi/rest/endpoints/rest_public_endpoints.py @@ -262,9 +262,9 @@ class RestPublicEndpoints(Middleware): limit: Optional[int] = None) -> List[PulseMessage]: messages = [] - for subdata in self._get("pulse/hist", params={ "end": end, "limit": limit }): - subdata[18] = subdata[18][0] - message = serializers.PulseMessage.parse(*subdata) + for sub_data in self._get("pulse/hist", params={ "end": end, "limit": limit }): + sub_data[18] = sub_data[18][0] + message = serializers.PulseMessage.parse(*sub_data) messages.append(message) return messages diff --git a/bfxapi/types/serializers.py b/bfxapi/types/serializers.py index f853ce4..f7838bf 100644 --- a/bfxapi/types/serializers.py +++ b/bfxapi/types/serializers.py @@ -1,6 +1,7 @@ from .import dataclasses -from .labeler import \ +#pylint: disable-next=unused-import +from .labeler import _Serializer, \ generate_labeler_serializer, generate_recursive_serializer #pylint: disable-next=unused-import diff --git a/bfxapi/websocket/client/bfx_websocket_bucket.py b/bfxapi/websocket/client/bfx_websocket_bucket.py index 500e9db..927a3e4 100644 --- a/bfxapi/websocket/client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/client/bfx_websocket_bucket.py @@ -62,7 +62,7 @@ class BfxWebSocketBucket: if isinstance(message, list): if (chan_id := message[0]) and message[1] != _HEARTBEAT: - self.handler.handle(self.subscriptions[chan_id], *message[1:]) + self.handler.handle(self.subscriptions[chan_id], message[1:]) try: await _connection() diff --git a/bfxapi/websocket/handlers/auth_events_handler.py b/bfxapi/websocket/handlers/auth_events_handler.py index 701e395..8c5cab3 100644 --- a/bfxapi/websocket/handlers/auth_events_handler.py +++ b/bfxapi/websocket/handlers/auth_events_handler.py @@ -1,6 +1,15 @@ -from ...types import serializers +from typing import TYPE_CHECKING, \ + Union, Dict, Tuple, Any -from ...types.serializers import _Notification +from bfxapi.types import serializers + +from bfxapi.types.serializers import _Notification + +if TYPE_CHECKING: + from bfxapi.types.dataclasses import \ + Order, FundingOffer + + from pyee.base import EventEmitter class AuthEventsHandler: __once_abbreviations = { @@ -22,16 +31,6 @@ class AuthEventsHandler: **__on_abbreviations } - __serializers = { - ("os", "on", "ou", "oc",): serializers.Order, - ("ps", "pn", "pu", "pc",): serializers.Position, - ("te", "tu"): serializers.Trade, - ("fos", "fon", "fou", "foc",): serializers.FundingOffer, - ("fcs", "fcn", "fcu", "fcc",): serializers.FundingCredit, - ("fls", "fln", "flu", "flc",): serializers.FundingLoan, - ("ws", "wu",): serializers.Wallet - } - ONCE_EVENTS = [ *list(__once_abbreviations.values()) ] @@ -42,29 +41,44 @@ class AuthEventsHandler: "oc-req-notification", "fon-req-notification", "foc-req-notification" ] - def __init__(self, event_emitter): - self.event_emitter = event_emitter + def __init__(self, event_emitter: "EventEmitter") -> None: + self.__event_emitter = event_emitter - def handle(self, abbrevation, stream): + self.__serializers: Dict[Tuple[str, ...], serializers._Serializer] = { + ("os", "on", "ou", "oc",): serializers.Order, + ("ps", "pn", "pu", "pc",): serializers.Position, + ("te", "tu"): serializers.Trade, + ("fos", "fon", "fou", "foc",): serializers.FundingOffer, + ("fcs", "fcn", "fcu", "fcc",): serializers.FundingCredit, + ("fls", "fln", "flu", "flc",): serializers.FundingLoan, + ("ws", "wu",): serializers.Wallet + } + + def handle(self, abbrevation: str, stream: Any) -> None: if abbrevation == "n": return self.__notification(stream) - for abbrevations, serializer in AuthEventsHandler.__serializers.items(): + for abbrevations, serializer in self.__serializers.items(): if abbrevation in abbrevations: event = AuthEventsHandler.__abbreviations[abbrevation] - if all(isinstance(substream, list) for substream in stream): - return self.event_emitter.emit(event, [ serializer.parse(*substream) for substream in stream ]) + if all(isinstance(sub_stream, list) for sub_stream in stream): + data = [ serializer.parse(*sub_stream) for sub_stream in stream ] + else: data = serializer.parse(*stream) - return self.event_emitter.emit(event, serializer.parse(*stream)) + self.__event_emitter.emit(event, data) - def __notification(self, stream): - event, serializer = "notification", _Notification(serializer=None) + break + + def __notification(self, stream: Any) -> None: + _Types = Union[None, "Order", "FundingOffer"] + + event, serializer = "notification", _Notification[_Types](serializer=None) if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req": - event, serializer = f"{stream[1]}-notification", _Notification(serializer=serializers.Order) + event, serializer = f"{stream[1]}-notification", _Notification[_Types](serializer=serializers.Order) if stream[1] == "fon-req" or stream[1] == "foc-req": - event, serializer = f"{stream[1]}-notification", _Notification(serializer=serializers.FundingOffer) + event, serializer = f"{stream[1]}-notification", _Notification[_Types](serializer=serializers.FundingOffer) - return self.event_emitter.emit(event, serializer.parse(*stream)) + self.__event_emitter.emit(event, serializer.parse(*stream)) diff --git a/bfxapi/websocket/handlers/public_channels_handler.py b/bfxapi/websocket/handlers/public_channels_handler.py index f32fe14..456ecd8 100644 --- a/bfxapi/websocket/handlers/public_channels_handler.py +++ b/bfxapi/websocket/handlers/public_channels_handler.py @@ -1,4 +1,16 @@ -from ...types import serializers +from typing import TYPE_CHECKING, \ + Union, Dict, List, Any, cast + +from bfxapi.types import serializers + +if TYPE_CHECKING: + from bfxapi.websocket.subscriptions import Subscription, \ + Ticker, Trades, Book, Candles, Status + + from pyee.base import EventEmitter + + _NoHeaderSubscription = \ + Union[Ticker, Trades, Book, Candles, Status] class PublicChannelsHandler: ONCE_PER_SUBSCRIPTION_EVENTS = [ @@ -15,28 +27,32 @@ class PublicChannelsHandler: "f_raw_book_update", "candles_update", "derivatives_status_update" ] - def __init__(self, event_emitter, events_per_subscription): + def __init__(self, + event_emitter: "EventEmitter", + events_per_subscription: Dict[str, List[str]]) -> None: self.__event_emitter, self.__events_per_subscription = \ event_emitter, events_per_subscription - self.__handlers = { - "ticker": self.__ticker_channel_handler, - "trades": self.__trades_channel_handler, - "book": self.__book_channel_handler, - "candles": self.__candles_channel_handler, - "status": self.__status_channel_handler - } + def handle(self, subscription: "Subscription", stream: List[Any]) -> None: + def _strip(subscription: "Subscription", *args: str) -> "_NoHeaderSubscription": + return cast("_NoHeaderSubscription", \ + { key: value for key, value in subscription.items() if key not in args }) - def handle(self, subscription, *stream): - #pylint: disable-next=unnecessary-lambda-assignment - _clear = lambda dictionary, *args: { key: value for key, value in dictionary.items() if key not in args } + _subscription = _strip(subscription, "event", "channel", "chanId") - #pylint: disable-next=consider-iterating-dictionary - if (channel := subscription["channel"]) and channel in self.__handlers.keys(): - return self.__handlers[channel](_clear(subscription, "event", "channel", "chanId"), *stream) + if subscription["channel"] == "ticker": + self.__ticker_channel_handler(cast("Ticker", _subscription), stream) + elif subscription["channel"] == "trades": + self.__trades_channel_handler(cast("Trades", _subscription), stream) + elif subscription["channel"] == "book": + self.__book_channel_handler(cast("Book", _subscription), stream) + elif subscription["channel"] == "candles": + self.__candles_channel_handler(cast("Candles", _subscription), stream) + elif subscription["channel"] == "status": + self.__status_channel_handler(cast("Status", _subscription), stream) - def __emit(self, event, sub, data): - sub_id, should_emit_event = sub["subId"], True + def __emit(self, event: str, subscription: "_NoHeaderSubscription", data: Any) -> None: + sub_id, should_emit_event = subscription["subId"], True if event in PublicChannelsHandler.ONCE_PER_SUBSCRIPTION_EVENTS: if sub_id not in self.__events_per_subscription: @@ -46,94 +62,76 @@ class PublicChannelsHandler: else: should_emit_event = False if should_emit_event: - return self.__event_emitter.emit(event, sub, data) + self.__event_emitter.emit(event, subscription, data) - def __ticker_channel_handler(self, subscription, *stream): + def __ticker_channel_handler(self, subscription: "Ticker", stream: List[Any]) -> None: if subscription["symbol"].startswith("t"): - return self.__emit( - "t_ticker_update", - subscription, - serializers.TradingPairTicker.parse(*stream[0]) - ) + return self.__emit("t_ticker_update", subscription, \ + serializers.TradingPairTicker.parse(*stream[0])) if subscription["symbol"].startswith("f"): - return self.__emit( - "f_ticker_update", - subscription, - serializers.FundingCurrencyTicker.parse(*stream[0]) - ) + return self.__emit("f_ticker_update", subscription, \ + serializers.FundingCurrencyTicker.parse(*stream[0])) - def __trades_channel_handler(self, subscription, *stream): + def __trades_channel_handler(self, subscription: "Trades", stream: List[Any]) -> None: if (event := stream[0]) and event in [ "te", "tu", "fte", "ftu" ]: + events = { "te": "t_trade_execution", "tu": "t_trade_execution_update", \ + "fte": "f_trade_execution", "ftu": "f_trade_execution_update" } + if subscription["symbol"].startswith("t"): - return self.__emit( - { "te": "t_trade_execution", "tu": "t_trade_execution_update" }[event], - subscription, - serializers.TradingPairTrade.parse(*stream[1]) - ) + return self.__emit(events[event], subscription, \ + serializers.TradingPairTrade.parse(*stream[1])) if subscription["symbol"].startswith("f"): - return self.__emit( - { "fte": "f_trade_execution", "ftu": "f_trade_execution_update" }[event], - subscription, - serializers.FundingCurrencyTrade.parse(*stream[1]) - ) + return self.__emit(events[event], subscription, \ + serializers.FundingCurrencyTrade.parse(*stream[1])) if subscription["symbol"].startswith("t"): - return self.__emit( - "t_trades_snapshot", - subscription, - [ serializers.TradingPairTrade.parse(*substream) for substream in stream[0] ] - ) + return self.__emit("t_trades_snapshot", subscription, \ + [ serializers.TradingPairTrade.parse(*sub_stream) \ + for sub_stream in stream[0] ]) if subscription["symbol"].startswith("f"): - return self.__emit( - "f_trades_snapshot", - subscription, - [ serializers.FundingCurrencyTrade.parse(*substream) for substream in stream[0] ] - ) + return self.__emit("f_trades_snapshot", subscription, \ + [ serializers.FundingCurrencyTrade.parse(*sub_stream) \ + for sub_stream in stream[0] ]) - def __book_channel_handler(self, subscription, *stream): - event = subscription["symbol"][0] + def __book_channel_handler(self, subscription: "Book", stream: List[Any]) -> None: + t_or_f = subscription["symbol"][0] - if subscription["prec"] == "R0": - _trading_pair_serializer, _funding_currency_serializer, is_raw_book = \ - serializers.TradingPairRawBook, serializers.FundingCurrencyRawBook, True - else: _trading_pair_serializer, _funding_currency_serializer, is_raw_book = \ - serializers.TradingPairBook, serializers.FundingCurrencyBook, False + is_raw_book = subscription["prec"] == "R0" - if all(isinstance(substream, list) for substream in stream[0]): - return self.__emit( - event + "_" + (is_raw_book and "raw_book" or "book") + "_snapshot", - subscription, - [ { "t": _trading_pair_serializer, "f": _funding_currency_serializer }[event] \ - .parse(*substream) for substream in stream[0] ] - ) + serializer = { + "t": is_raw_book and serializers.TradingPairRawBook \ + or serializers.TradingPairBook, + "f": is_raw_book and serializers.FundingCurrencyRawBook \ + or serializers.FundingCurrencyBook + }[t_or_f] - return self.__emit( - event + "_" + (is_raw_book and "raw_book" or "book") + "_update", - subscription, - { "t": _trading_pair_serializer, "f": _funding_currency_serializer }[event].parse(*stream[0]) - ) + if all(isinstance(sub_stream, list) for sub_stream in stream[0]): + event = t_or_f + "_" + \ + (is_raw_book and "raw_book" or "book") + "_snapshot" - def __candles_channel_handler(self, subscription, *stream): - if all(isinstance(substream, list) for substream in stream[0]): - return self.__emit( - "candles_snapshot", - subscription, - [ serializers.Candle.parse(*substream) for substream in stream[0] ] - ) + return self.__emit(event, subscription, \ + [ serializer.parse(*sub_stream) \ + for sub_stream in stream[0] ]) - return self.__emit( - "candles_update", - subscription, - serializers.Candle.parse(*stream[0]) - ) + event = t_or_f + "_" + \ + (is_raw_book and "raw_book" or "book") + "_update" - def __status_channel_handler(self, subscription, *stream): + return self.__emit(event, subscription, \ + serializer.parse(*stream[0])) + + def __candles_channel_handler(self, subscription: "Candles", stream: List[Any]) -> None: + if all(isinstance(sub_stream, list) for sub_stream in stream[0]): + return self.__emit("candles_snapshot", subscription, \ + [ serializers.Candle.parse(*sub_stream) \ + for sub_stream in stream[0] ]) + + return self.__emit("candles_update", subscription, \ + serializers.Candle.parse(*stream[0])) + + def __status_channel_handler(self, subscription: "Status", stream: List[Any]) -> None: if subscription["key"].startswith("deriv:"): - return self.__emit( - "derivatives_status_update", - subscription, - serializers.DerivativesStatus.parse(*stream[0]) - ) + return self.__emit("derivatives_status_update", subscription, \ + serializers.DerivativesStatus.parse(*stream[0])) diff --git a/bfxapi/websocket/subscriptions.py b/bfxapi/websocket/subscriptions.py index 72df007..d4ffd2a 100644 --- a/bfxapi/websocket/subscriptions.py +++ b/bfxapi/websocket/subscriptions.py @@ -1,20 +1,5 @@ -from typing import TypedDict, Union, Literal, Optional - -_Channel = Literal[ - "ticker", - "trades", - "book", - "candles", - "status" -] - -_Header = TypedDict("_Header", { - "event": Literal["subscribed"], - "channel": _Channel, - "chanId": int -}) - -Subscription = Union[_Header, "Ticker", "Trades", "Book", "Candles", "Status"] +from typing import TypedDict, \ + Union, Literal, Optional class Ticker(TypedDict): subId: str @@ -43,3 +28,33 @@ class Candles(TypedDict): class Status(TypedDict): subId: str key: str + +Subscription = Union["_Ticker", "_Trades", "_Book", "_Candles", "_Status"] + +_Channel = Literal["ticker", "trades", "book", "candles", "status"] + +_Header = TypedDict("_Header", { + "event": Literal["subscribed"], + "channel": _Channel, + "chanId": int +}) + +#pylint: disable-next=inherit-non-class +class _Ticker(Ticker, _Header): + pass + +#pylint: disable-next=inherit-non-class +class _Trades(Trades, _Header): + pass + +#pylint: disable-next=inherit-non-class +class _Book(Book, _Header): + pass + +#pylint: disable-next=inherit-non-class +class _Candles(Candles, _Header): + pass + +#pylint: disable-next=inherit-non-class +class _Status(Status, _Header): + pass diff --git a/dev-requirements.txt b/dev-requirements.txt index fff03cf052a26593c67097d8c3c4cd6ce90c4e5b..c7daa7862d8b787e838ecbbee8fcad38190de71d 100644 GIT binary patch delta 20 acmcb?a)V`p4W}W49)mFu8*FqHW&!{?