From 5ae576e36ae98f3a6745272cfb2a87dcfdb20936 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Sun, 8 Oct 2023 05:37:10 +0200 Subject: [PATCH] Fix and rewrite all logic in class BfxWebSocketBucket. --- README.md | 2 +- bfxapi/websocket/__init__.py | 2 +- bfxapi/websocket/_client/__init__.py | 2 - .../websocket/_client/bfx_websocket_bucket.py | 135 ++++++++++-------- .../websocket/_client/bfx_websocket_client.py | 16 ++- bfxapi/websocket/_connection.py | 2 +- .../_event_emitter/bfx_event_emitter.py | 2 +- .../_handlers/public_channels_handler.py | 32 ++--- bfxapi/websocket/exceptions.py | 14 +- bfxapi/websocket/subscriptions.py | 64 +++------ examples/websocket/public/order_book.py | 2 +- examples/websocket/public/raw_order_book.py | 2 +- examples/websocket/public/ticker.py | 2 +- 13 files changed, 132 insertions(+), 145 deletions(-) diff --git a/README.md b/README.md index 3116e38..004b346 100644 --- a/README.md +++ b/README.md @@ -201,7 +201,7 @@ On each successful subscription, the client will emit the `subscribed` event: @bfx.wss.on("subscribed") def on_subscribed(subscription: subscriptions.Subscription): if subscription["channel"] == "ticker": - print(f"{subscription['symbol']}: {subscription['subId']}") # tBTCUSD: f2757df2-7e11-4244-9bb7-a53b7343bef8 + print(f"{subscription['symbol']}: {subscription['sub_id']}") # tBTCUSD: f2757df2-7e11-4244-9bb7-a53b7343bef8 ``` ### Unsubscribing from a public channel diff --git a/bfxapi/websocket/__init__.py b/bfxapi/websocket/__init__.py index f1ed659..ced8300 100644 --- a/bfxapi/websocket/__init__.py +++ b/bfxapi/websocket/__init__.py @@ -1 +1 @@ -from ._client import BfxWebSocketClient, BfxWebSocketBucket, BfxWebSocketInputs +from ._client import BfxWebSocketClient diff --git a/bfxapi/websocket/_client/__init__.py b/bfxapi/websocket/_client/__init__.py index 05b843c..ebbd6d2 100644 --- a/bfxapi/websocket/_client/__init__.py +++ b/bfxapi/websocket/_client/__init__.py @@ -1,3 +1 @@ from .bfx_websocket_client import BfxWebSocketClient -from .bfx_websocket_bucket import BfxWebSocketBucket -from .bfx_websocket_inputs import BfxWebSocketInputs diff --git a/bfxapi/websocket/_client/bfx_websocket_bucket.py b/bfxapi/websocket/_client/bfx_websocket_bucket.py index 0130dfb..3c42fdf 100644 --- a/bfxapi/websocket/_client/bfx_websocket_bucket.py +++ b/bfxapi/websocket/_client/bfx_websocket_bucket.py @@ -1,32 +1,32 @@ from typing import \ - TYPE_CHECKING, Optional, Dict, List, Any, cast + List, Dict, Any, \ + Optional, cast import asyncio, json, uuid -from websockets.legacy.client import connect as _websockets__connect +import websockets.client +from pyee import EventEmitter from bfxapi.websocket._connection import Connection from bfxapi.websocket._handlers import PublicChannelsHandler -from bfxapi.websocket.exceptions import TooManySubscriptions +from bfxapi.websocket.subscriptions import Subscription -if TYPE_CHECKING: - from bfxapi.websocket.subscriptions import Subscription - from websockets.client import WebSocketClientProtocol - from pyee import EventEmitter +from bfxapi.websocket.exceptions import FullBucketError _CHECKSUM_FLAG_VALUE = 131_072 +def _strip(message: Dict[str, Any], keys: List[str]) -> Dict[str, Any]: + return { key: message[key] for key in message if not key in keys } + class BfxWebSocketBucket(Connection): - VERSION = 2 + __MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 - MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 - - def __init__(self, host: str, event_emitter: "EventEmitter") -> None: + def __init__(self, host: str, event_emitter: EventEmitter) -> None: super().__init__(host) self.__event_emitter = event_emitter self.__pendings: List[Dict[str, Any]] = [ ] - self.__subscriptions: Dict[int, "Subscription"] = { } + self.__subscriptions: Dict[int, Subscription] = { } self.__condition = asyncio.locks.Condition() @@ -34,118 +34,131 @@ class BfxWebSocketBucket(Connection): event_emitter=self.__event_emitter) @property - def pendings(self) -> List[Dict[str, Any]]: - return self.__pendings + def count(self) -> int: + return len(self.__pendings) + \ + len(self.__subscriptions) @property - def subscriptions(self) -> Dict[int, "Subscription"]: - return self.__subscriptions + def is_full(self) -> bool: + return self.count == \ + BfxWebSocketBucket.__MAXIMUM_SUBSCRIPTIONS_AMOUNT - async def connect(self) -> None: - async with _websockets__connect(self._host) as websocket: + async def start(self) -> None: + async with websockets.client.connect(self._host) as websocket: self._websocket = websocket await self.__recover_state() - await self.__set_conf(flags=_CHECKSUM_FLAG_VALUE) - async with self.__condition: self.__condition.notify(1) - async for message in self._websocket: - message = json.loads(message) + async for _message in self._websocket: + message = json.loads(_message) if isinstance(message, dict): - if message["event"] == "subscribed" and (chan_id := message["chanId"]): - self.__pendings = [ pending \ - for pending in self.__pendings \ - if pending["subId"] != message["subId"] ] + # I think there's a better way to do it... + if "chanId" in message: + message["chan_id"] = message.pop("chanId") - self.__subscriptions[chan_id] = cast("Subscription", message) + if "subId" in message: + message["sub_id"] = message.pop("subId") - self.__event_emitter.emit("subscribed", message) - elif message["event"] == "unsubscribed" and (chan_id := message["chanId"]): + if message["event"] == "subscribed": + self.__on_subscribed(message) + elif message["event"] == "unsubscribed": if message["status"] == "OK": + chan_id = cast(int, message["chan_id"]) + del self.__subscriptions[chan_id] elif message["event"] == "error": - self.__event_emitter.emit( \ - "wss-error", message["code"], message["msg"]) + self.__event_emitter.emit("wss-error", \ + message["code"], message["msg"]) if isinstance(message, list): - if (chan_id := message[0]) and message[1] != Connection.HEARTBEAT: + if (chan_id := cast(int, message[0])) and \ + (message[1] != Connection._HEARTBEAT): self.__handler.handle(self.__subscriptions[chan_id], message[1:]) + def __on_subscribed(self, message: Dict[str, Any]) -> None: + chan_id = cast(int, message["chan_id"]) + + subscription = cast(Subscription, _strip(message, \ + keys=["event", "chan_id", "pair", "currency"])) + + self.__pendings = [ pending \ + for pending in self.__pendings \ + if pending["subId"] != message["sub_id"] ] + + self.__subscriptions[chan_id] = subscription + + self.__event_emitter.emit("subscribed", subscription) + async def __recover_state(self) -> None: for pending in self.__pendings: - await self._websocket.send( \ + await self._websocket.send(message = \ json.dumps(pending)) - for _, subscription in self.__subscriptions.items(): - _subscription = cast(Dict[str, Any], subscription) + for chan_id in list(self.__subscriptions.keys()): + subscription = self.__subscriptions.pop(chan_id) - await self.subscribe( \ - sub_id=_subscription.pop("subId"), - **_subscription) + await self.subscribe(**subscription) - self.__subscriptions.clear() + await self.__set_config([ _CHECKSUM_FLAG_VALUE ]) - async def __set_conf(self, flags: int) -> None: + async def __set_config(self, flags: List[int]) -> None: await self._websocket.send(json.dumps( \ - { "event": "conf", "flags": flags })) + { "event": "conf", "flags": sum(flags) })) @Connection.require_websocket_connection async def subscribe(self, channel: str, sub_id: Optional[str] = None, **kwargs: Any) -> None: - if len(self.__subscriptions) + len(self.__pendings) \ - == BfxWebSocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT: - raise TooManySubscriptions("The client has reached the maximum number of subscriptions.") + if self.is_full: + raise FullBucketError("The bucket is full: " + \ + "can't subscribe to any other channel.") - subscription = \ + subscription: Dict[str, Any] = \ { **kwargs, "event": "subscribe", "channel": channel } subscription["subId"] = sub_id or str(uuid.uuid4()) self.__pendings.append(subscription) - await self._websocket.send( \ + await self._websocket.send(message = \ json.dumps(subscription)) @Connection.require_websocket_connection async def unsubscribe(self, sub_id: str) -> None: for chan_id, subscription in self.__subscriptions.items(): - if subscription["subId"] == sub_id: - message = json.dumps({ + if subscription["sub_id"] == sub_id: + unsubscription = { "event": "unsubscribe", - "chanId": chan_id }) + "chanId": chan_id } - await self._websocket.send(message) + await self._websocket.send(message = \ + json.dumps(unsubscription)) @Connection.require_websocket_connection async def resubscribe(self, sub_id: str) -> None: for subscription in self.__subscriptions.values(): - if subscription["subId"] == sub_id: - _subscription = cast(Dict[str, Any], subscription) + if subscription["sub_id"] == sub_id: + await self.unsubscribe(sub_id) - await self.unsubscribe(sub_id=sub_id) - - await self.subscribe( \ - sub_id=_subscription.pop("subId"), - **_subscription) + await self.subscribe(**subscription) @Connection.require_websocket_connection async def close(self, code: int = 1000, reason: str = str()) -> None: - await self._websocket.close(code=code, reason=reason) + await self._websocket.close(code, reason) def has(self, sub_id: str) -> bool: for subscription in self.__subscriptions.values(): - if subscription["subId"] == sub_id: + if subscription["sub_id"] == sub_id: return True return False async def wait(self) -> None: async with self.__condition: - await self.__condition.wait_for( - lambda: self.open) + await self.__condition \ + .wait_for(lambda: self.open) diff --git a/bfxapi/websocket/_client/bfx_websocket_client.py b/bfxapi/websocket/_client/bfx_websocket_client.py index 249ba59..1518e35 100644 --- a/bfxapi/websocket/_client/bfx_websocket_client.py +++ b/bfxapi/websocket/_client/bfx_websocket_client.py @@ -29,7 +29,8 @@ from bfxapi.websocket.exceptions import \ InvalidAuthenticationCredentials, \ ReconnectionTimeoutError, \ OutdatedClientVersion, \ - ZeroConnectionsError + ZeroConnectionsError, \ + UnknownChannelError from bfxapi.websocket._client.bfx_websocket_bucket import BfxWebSocketBucket @@ -71,7 +72,7 @@ class _Delay: self.__backoff_delay = _Delay.__BACKOFF_MIN class BfxWebSocketClient(Connection): - VERSION = BfxWebSocketBucket.VERSION + VERSION = 2 MAXIMUM_CONNECTIONS_AMOUNT = 20 @@ -227,7 +228,7 @@ class BfxWebSocketClient(Connection): self.__buckets = { bucket: asyncio.create_task(_c) for bucket in self.__buckets - if (_c := bucket.connect()) + if (_c := bucket.start()) } if len(self.__buckets) == 0 or \ @@ -265,7 +266,7 @@ class BfxWebSocketClient(Connection): self.__event_emitter.emit("wss-error", message["code"], message["msg"]) if isinstance(message, list) and \ - message[0] == 0 and message[1] != Connection.HEARTBEAT: + message[0] == 0 and message[1] != Connection._HEARTBEAT: self.__handler.handle(message[1], message[2]) @Connection.require_websocket_connection @@ -277,10 +278,13 @@ class BfxWebSocketClient(Connection): raise ZeroConnectionsError("Unable to subscribe: " \ "the number of connections must be greater than 0.") + if not channel in ["ticker", "trades", "book", "candles", "status"]: + raise UnknownChannelError("Available channels are: " + \ + "ticker, trades, book, candles and status.") + _buckets = list(self.__buckets.keys()) - counters = [ len(bucket.pendings) + len(bucket.subscriptions) - for bucket in _buckets ] + counters = [ bucket.count for bucket in _buckets ] index = counters.index(min(counters)) diff --git a/bfxapi/websocket/_connection.py b/bfxapi/websocket/_connection.py index 4ef6fad..971bdd0 100644 --- a/bfxapi/websocket/_connection.py +++ b/bfxapi/websocket/_connection.py @@ -20,7 +20,7 @@ _R = TypeVar("_R") _P = ParamSpec("_P") class Connection(ABC): - HEARTBEAT = "hb" + _HEARTBEAT = "hb" def __init__(self, host: str) -> None: self._host = host diff --git a/bfxapi/websocket/_event_emitter/bfx_event_emitter.py b/bfxapi/websocket/_event_emitter/bfx_event_emitter.py index 3c11c08..e53eee8 100644 --- a/bfxapi/websocket/_event_emitter/bfx_event_emitter.py +++ b/bfxapi/websocket/_event_emitter/bfx_event_emitter.py @@ -60,7 +60,7 @@ class BfxEventEmitter(AsyncIOEventEmitter): self._connection += [ event ] if event in _ONCE_PER_SUBSCRIPTION: - sub_id = args[0]["subId"] + sub_id = args[0]["sub_id"] if event in self._subscriptions[sub_id]: return self._has_listeners(event) diff --git a/bfxapi/websocket/_handlers/public_channels_handler.py b/bfxapi/websocket/_handlers/public_channels_handler.py index 5bc22f2..597f16c 100644 --- a/bfxapi/websocket/_handlers/public_channels_handler.py +++ b/bfxapi/websocket/_handlers/public_channels_handler.py @@ -1,5 +1,6 @@ -from typing import TYPE_CHECKING, \ - Union, List, Any, cast +from typing import \ + TYPE_CHECKING, List, Any, \ + cast from bfxapi.types import serializers @@ -9,9 +10,6 @@ if TYPE_CHECKING: from pyee.base import EventEmitter - _NoHeaderSubscription = \ - Union[Ticker, Trades, Book, Candles, Status] - _CHECKSUM = "cs" class PublicChannelsHandler: @@ -19,30 +17,24 @@ class PublicChannelsHandler: self.__event_emitter = event_emitter def handle(self, subscription: "Subscription", stream: List[Any]) -> None: - def _strip(subscription: "Subscription", *args: str) -> "_NoHeaderSubscription": - return cast("_NoHeaderSubscription", \ - { key: value for key, value in subscription.items() if key not in args }) - - _subscription = _strip(subscription, "event", "channel", "chanId") - if subscription["channel"] == "ticker": - self.__ticker_channel_handler(cast("Ticker", _subscription), stream) + self.__ticker_channel_handler(cast("Ticker", subscription), stream) elif subscription["channel"] == "trades": - self.__trades_channel_handler(cast("Trades", _subscription), stream) + self.__trades_channel_handler(cast("Trades", subscription), stream) elif subscription["channel"] == "book": - _subscription = cast("Book", _subscription) + subscription = cast("Book", subscription) if stream[0] == _CHECKSUM: - self.__checksum_handler(_subscription, stream[1]) + self.__checksum_handler(subscription, stream[1]) else: - if _subscription["prec"] != "R0": - self.__book_channel_handler(_subscription, stream) + if subscription["prec"] != "R0": + self.__book_channel_handler(subscription, stream) else: - self.__raw_book_channel_handler(_subscription, stream) + self.__raw_book_channel_handler(subscription, stream) elif subscription["channel"] == "candles": - self.__candles_channel_handler(cast("Candles", _subscription), stream) + self.__candles_channel_handler(cast("Candles", subscription), stream) elif subscription["channel"] == "status": - self.__status_channel_handler(cast("Status", _subscription), stream) + self.__status_channel_handler(cast("Status", subscription), stream) def __ticker_channel_handler(self, subscription: "Ticker", stream: List[Any]): if subscription["symbol"].startswith("t"): diff --git a/bfxapi/websocket/exceptions.py b/bfxapi/websocket/exceptions.py index 6469723..c23da60 100644 --- a/bfxapi/websocket/exceptions.py +++ b/bfxapi/websocket/exceptions.py @@ -4,11 +4,12 @@ __all__ = [ "BfxWebSocketException", "ConnectionNotOpen", - "TooManySubscriptions", + "FullBucketError", "ZeroConnectionsError", "ReconnectionTimeoutError", "ActionRequiresAuthentication", "InvalidAuthenticationCredentials", + "UnknownChannelError", "UnknownEventError", "OutdatedClientVersion" ] @@ -23,9 +24,9 @@ class ConnectionNotOpen(BfxWebSocketException): This error indicates an attempt to communicate via websocket before starting the connection with the servers. """ -class TooManySubscriptions(BfxWebSocketException): +class FullBucketError(BfxWebSocketException): """ - This error indicates a subscription attempt after reaching the limit of simultaneous connections. + Thrown when a user attempts a subscription but all buckets are full. """ class ZeroConnectionsError(BfxWebSocketException): @@ -48,9 +49,14 @@ class InvalidAuthenticationCredentials(BfxWebSocketException): This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication. """ +class UnknownChannelError(BfxWebSocketException): + """ + Thrown when a user attempts to subscribe to an unknown channel. + """ + class UnknownEventError(BfxWebSocketException): """ - This error indicates a failed attempt to subscribe to an event not supported by the BfxWebSocketClient. + Thrown when a user attempts to add a listener for an unknown event. """ class OutdatedClientVersion(BfxWebSocketException): diff --git a/bfxapi/websocket/subscriptions.py b/bfxapi/websocket/subscriptions.py index d4ffd2a..db3a2c9 100644 --- a/bfxapi/websocket/subscriptions.py +++ b/bfxapi/websocket/subscriptions.py @@ -1,60 +1,34 @@ -from typing import TypedDict, \ - Union, Literal, Optional +from typing import \ + Union, Literal, TypedDict + +Subscription = Union["Ticker", "Trades", "Book", "Candles", "Status"] + +Channel = Literal["ticker", "trades", "book", "candles", "status"] class Ticker(TypedDict): - subId: str + channel: Literal["ticker"] + sub_id: str symbol: str - pair: Optional[str] - currency: Optional[str] class Trades(TypedDict): - subId: str + channel: Literal["trades"] + sub_id: str symbol: str - pair: Optional[str] - currency: Optional[str] class Book(TypedDict): - subId: str + channel: Literal["book"] + sub_id: str symbol: str - prec: str - freq: str - len: str - pair: str + prec: Literal["R0", "P0", "P1", "P2", "P3", "P4"] + freq: Literal["F0", "F1"] + len: Literal["1", "25", "100", "250"] class Candles(TypedDict): - subId: str + channel: Literal["candles"] + sub_id: str key: str class Status(TypedDict): - subId: str + channel: Literal["status"] + sub_id: str key: str - -Subscription = Union["_Ticker", "_Trades", "_Book", "_Candles", "_Status"] - -_Channel = Literal["ticker", "trades", "book", "candles", "status"] - -_Header = TypedDict("_Header", { - "event": Literal["subscribed"], - "channel": _Channel, - "chanId": int -}) - -#pylint: disable-next=inherit-non-class -class _Ticker(Ticker, _Header): - pass - -#pylint: disable-next=inherit-non-class -class _Trades(Trades, _Header): - pass - -#pylint: disable-next=inherit-non-class -class _Book(Book, _Header): - pass - -#pylint: disable-next=inherit-non-class -class _Candles(Candles, _Header): - pass - -#pylint: disable-next=inherit-non-class -class _Status(Status, _Header): - pass diff --git a/examples/websocket/public/order_book.py b/examples/websocket/public/order_book.py index 03d8c02..497e787 100644 --- a/examples/websocket/public/order_book.py +++ b/examples/websocket/public/order_book.py @@ -102,7 +102,7 @@ async def on_checksum(subscription: Book, value: int): print("Mismatch between local and remote checksums: " f"restarting book for symbol <{symbol}>...") - await bfx.wss.resubscribe(sub_id=subscription["subId"]) + await bfx.wss.resubscribe(sub_id=subscription["sub_id"]) order_book.cooldown[symbol] = True diff --git a/examples/websocket/public/raw_order_book.py b/examples/websocket/public/raw_order_book.py index c151dda..a08d9bb 100644 --- a/examples/websocket/public/raw_order_book.py +++ b/examples/websocket/public/raw_order_book.py @@ -102,7 +102,7 @@ async def on_checksum(subscription: Book, value: int): print("Mismatch between local and remote checksums: " f"restarting book for symbol <{symbol}>...") - await bfx.wss.resubscribe(sub_id=subscription["subId"]) + await bfx.wss.resubscribe(sub_id=subscription["sub_id"]) raw_order_book.cooldown[symbol] = True diff --git a/examples/websocket/public/ticker.py b/examples/websocket/public/ticker.py index 24c9463..253d467 100644 --- a/examples/websocket/public/ticker.py +++ b/examples/websocket/public/ticker.py @@ -10,7 +10,7 @@ bfx = Client(wss_host=PUB_WSS_HOST) @bfx.wss.on("t_ticker_update") def on_t_ticker_update(subscription: Ticker, data: TradingPairTicker): - print(f"Subscription with subId: {subscription['subId']}") + print(f"Subscription with sub_id: {subscription['sub_id']}") print(f"Data: {data}")