Fix and rewrite all logic in class BfxWebSocketBucket.

This commit is contained in:
Davide Casale
2023-10-08 05:37:10 +02:00
parent 628c3a0d66
commit 5ae576e36a
13 changed files with 132 additions and 145 deletions

View File

@@ -201,7 +201,7 @@ On each successful subscription, the client will emit the `subscribed` event:
@bfx.wss.on("subscribed") @bfx.wss.on("subscribed")
def on_subscribed(subscription: subscriptions.Subscription): def on_subscribed(subscription: subscriptions.Subscription):
if subscription["channel"] == "ticker": if subscription["channel"] == "ticker":
print(f"{subscription['symbol']}: {subscription['subId']}") # tBTCUSD: f2757df2-7e11-4244-9bb7-a53b7343bef8 print(f"{subscription['symbol']}: {subscription['sub_id']}") # tBTCUSD: f2757df2-7e11-4244-9bb7-a53b7343bef8
``` ```
### Unsubscribing from a public channel ### Unsubscribing from a public channel

View File

@@ -1 +1 @@
from ._client import BfxWebSocketClient, BfxWebSocketBucket, BfxWebSocketInputs from ._client import BfxWebSocketClient

View File

@@ -1,3 +1 @@
from .bfx_websocket_client import BfxWebSocketClient from .bfx_websocket_client import BfxWebSocketClient
from .bfx_websocket_bucket import BfxWebSocketBucket
from .bfx_websocket_inputs import BfxWebSocketInputs

View File

@@ -1,32 +1,32 @@
from typing import \ from typing import \
TYPE_CHECKING, Optional, Dict, List, Any, cast List, Dict, Any, \
Optional, cast
import asyncio, json, uuid import asyncio, json, uuid
from websockets.legacy.client import connect as _websockets__connect import websockets.client
from pyee import EventEmitter
from bfxapi.websocket._connection import Connection from bfxapi.websocket._connection import Connection
from bfxapi.websocket._handlers import PublicChannelsHandler from bfxapi.websocket._handlers import PublicChannelsHandler
from bfxapi.websocket.exceptions import TooManySubscriptions from bfxapi.websocket.subscriptions import Subscription
if TYPE_CHECKING: from bfxapi.websocket.exceptions import FullBucketError
from bfxapi.websocket.subscriptions import Subscription
from websockets.client import WebSocketClientProtocol
from pyee import EventEmitter
_CHECKSUM_FLAG_VALUE = 131_072 _CHECKSUM_FLAG_VALUE = 131_072
def _strip(message: Dict[str, Any], keys: List[str]) -> Dict[str, Any]:
return { key: message[key] for key in message if not key in keys }
class BfxWebSocketBucket(Connection): class BfxWebSocketBucket(Connection):
VERSION = 2 __MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25
MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 def __init__(self, host: str, event_emitter: EventEmitter) -> None:
def __init__(self, host: str, event_emitter: "EventEmitter") -> None:
super().__init__(host) super().__init__(host)
self.__event_emitter = event_emitter self.__event_emitter = event_emitter
self.__pendings: List[Dict[str, Any]] = [ ] self.__pendings: List[Dict[str, Any]] = [ ]
self.__subscriptions: Dict[int, "Subscription"] = { } self.__subscriptions: Dict[int, Subscription] = { }
self.__condition = asyncio.locks.Condition() self.__condition = asyncio.locks.Condition()
@@ -34,118 +34,131 @@ class BfxWebSocketBucket(Connection):
event_emitter=self.__event_emitter) event_emitter=self.__event_emitter)
@property @property
def pendings(self) -> List[Dict[str, Any]]: def count(self) -> int:
return self.__pendings return len(self.__pendings) + \
len(self.__subscriptions)
@property @property
def subscriptions(self) -> Dict[int, "Subscription"]: def is_full(self) -> bool:
return self.__subscriptions return self.count == \
BfxWebSocketBucket.__MAXIMUM_SUBSCRIPTIONS_AMOUNT
async def connect(self) -> None: async def start(self) -> None:
async with _websockets__connect(self._host) as websocket: async with websockets.client.connect(self._host) as websocket:
self._websocket = websocket self._websocket = websocket
await self.__recover_state() await self.__recover_state()
await self.__set_conf(flags=_CHECKSUM_FLAG_VALUE)
async with self.__condition: async with self.__condition:
self.__condition.notify(1) self.__condition.notify(1)
async for message in self._websocket: async for _message in self._websocket:
message = json.loads(message) message = json.loads(_message)
if isinstance(message, dict): if isinstance(message, dict):
if message["event"] == "subscribed" and (chan_id := message["chanId"]): # I think there's a better way to do it...
self.__pendings = [ pending \ if "chanId" in message:
for pending in self.__pendings \ message["chan_id"] = message.pop("chanId")
if pending["subId"] != message["subId"] ]
self.__subscriptions[chan_id] = cast("Subscription", message) if "subId" in message:
message["sub_id"] = message.pop("subId")
self.__event_emitter.emit("subscribed", message) if message["event"] == "subscribed":
elif message["event"] == "unsubscribed" and (chan_id := message["chanId"]): self.__on_subscribed(message)
elif message["event"] == "unsubscribed":
if message["status"] == "OK": if message["status"] == "OK":
chan_id = cast(int, message["chan_id"])
del self.__subscriptions[chan_id] del self.__subscriptions[chan_id]
elif message["event"] == "error": elif message["event"] == "error":
self.__event_emitter.emit( \ self.__event_emitter.emit("wss-error", \
"wss-error", message["code"], message["msg"]) message["code"], message["msg"])
if isinstance(message, list): if isinstance(message, list):
if (chan_id := message[0]) and message[1] != Connection.HEARTBEAT: if (chan_id := cast(int, message[0])) and \
(message[1] != Connection._HEARTBEAT):
self.__handler.handle(self.__subscriptions[chan_id], message[1:]) self.__handler.handle(self.__subscriptions[chan_id], message[1:])
def __on_subscribed(self, message: Dict[str, Any]) -> None:
chan_id = cast(int, message["chan_id"])
subscription = cast(Subscription, _strip(message, \
keys=["event", "chan_id", "pair", "currency"]))
self.__pendings = [ pending \
for pending in self.__pendings \
if pending["subId"] != message["sub_id"] ]
self.__subscriptions[chan_id] = subscription
self.__event_emitter.emit("subscribed", subscription)
async def __recover_state(self) -> None: async def __recover_state(self) -> None:
for pending in self.__pendings: for pending in self.__pendings:
await self._websocket.send( \ await self._websocket.send(message = \
json.dumps(pending)) json.dumps(pending))
for _, subscription in self.__subscriptions.items(): for chan_id in list(self.__subscriptions.keys()):
_subscription = cast(Dict[str, Any], subscription) subscription = self.__subscriptions.pop(chan_id)
await self.subscribe( \ await self.subscribe(**subscription)
sub_id=_subscription.pop("subId"),
**_subscription)
self.__subscriptions.clear() await self.__set_config([ _CHECKSUM_FLAG_VALUE ])
async def __set_conf(self, flags: int) -> None: async def __set_config(self, flags: List[int]) -> None:
await self._websocket.send(json.dumps( \ await self._websocket.send(json.dumps( \
{ "event": "conf", "flags": flags })) { "event": "conf", "flags": sum(flags) }))
@Connection.require_websocket_connection @Connection.require_websocket_connection
async def subscribe(self, async def subscribe(self,
channel: str, channel: str,
sub_id: Optional[str] = None, sub_id: Optional[str] = None,
**kwargs: Any) -> None: **kwargs: Any) -> None:
if len(self.__subscriptions) + len(self.__pendings) \ if self.is_full:
== BfxWebSocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT: raise FullBucketError("The bucket is full: " + \
raise TooManySubscriptions("The client has reached the maximum number of subscriptions.") "can't subscribe to any other channel.")
subscription = \ subscription: Dict[str, Any] = \
{ **kwargs, "event": "subscribe", "channel": channel } { **kwargs, "event": "subscribe", "channel": channel }
subscription["subId"] = sub_id or str(uuid.uuid4()) subscription["subId"] = sub_id or str(uuid.uuid4())
self.__pendings.append(subscription) self.__pendings.append(subscription)
await self._websocket.send( \ await self._websocket.send(message = \
json.dumps(subscription)) json.dumps(subscription))
@Connection.require_websocket_connection @Connection.require_websocket_connection
async def unsubscribe(self, sub_id: str) -> None: async def unsubscribe(self, sub_id: str) -> None:
for chan_id, subscription in self.__subscriptions.items(): for chan_id, subscription in self.__subscriptions.items():
if subscription["subId"] == sub_id: if subscription["sub_id"] == sub_id:
message = json.dumps({ unsubscription = {
"event": "unsubscribe", "event": "unsubscribe",
"chanId": chan_id }) "chanId": chan_id }
await self._websocket.send(message) await self._websocket.send(message = \
json.dumps(unsubscription))
@Connection.require_websocket_connection @Connection.require_websocket_connection
async def resubscribe(self, sub_id: str) -> None: async def resubscribe(self, sub_id: str) -> None:
for subscription in self.__subscriptions.values(): for subscription in self.__subscriptions.values():
if subscription["subId"] == sub_id: if subscription["sub_id"] == sub_id:
_subscription = cast(Dict[str, Any], subscription) await self.unsubscribe(sub_id)
await self.unsubscribe(sub_id=sub_id) await self.subscribe(**subscription)
await self.subscribe( \
sub_id=_subscription.pop("subId"),
**_subscription)
@Connection.require_websocket_connection @Connection.require_websocket_connection
async def close(self, code: int = 1000, reason: str = str()) -> None: async def close(self, code: int = 1000, reason: str = str()) -> None:
await self._websocket.close(code=code, reason=reason) await self._websocket.close(code, reason)
def has(self, sub_id: str) -> bool: def has(self, sub_id: str) -> bool:
for subscription in self.__subscriptions.values(): for subscription in self.__subscriptions.values():
if subscription["subId"] == sub_id: if subscription["sub_id"] == sub_id:
return True return True
return False return False
async def wait(self) -> None: async def wait(self) -> None:
async with self.__condition: async with self.__condition:
await self.__condition.wait_for( await self.__condition \
lambda: self.open) .wait_for(lambda: self.open)

View File

@@ -29,7 +29,8 @@ from bfxapi.websocket.exceptions import \
InvalidAuthenticationCredentials, \ InvalidAuthenticationCredentials, \
ReconnectionTimeoutError, \ ReconnectionTimeoutError, \
OutdatedClientVersion, \ OutdatedClientVersion, \
ZeroConnectionsError ZeroConnectionsError, \
UnknownChannelError
from bfxapi.websocket._client.bfx_websocket_bucket import BfxWebSocketBucket from bfxapi.websocket._client.bfx_websocket_bucket import BfxWebSocketBucket
@@ -71,7 +72,7 @@ class _Delay:
self.__backoff_delay = _Delay.__BACKOFF_MIN self.__backoff_delay = _Delay.__BACKOFF_MIN
class BfxWebSocketClient(Connection): class BfxWebSocketClient(Connection):
VERSION = BfxWebSocketBucket.VERSION VERSION = 2
MAXIMUM_CONNECTIONS_AMOUNT = 20 MAXIMUM_CONNECTIONS_AMOUNT = 20
@@ -227,7 +228,7 @@ class BfxWebSocketClient(Connection):
self.__buckets = { self.__buckets = {
bucket: asyncio.create_task(_c) bucket: asyncio.create_task(_c)
for bucket in self.__buckets for bucket in self.__buckets
if (_c := bucket.connect()) if (_c := bucket.start())
} }
if len(self.__buckets) == 0 or \ if len(self.__buckets) == 0 or \
@@ -265,7 +266,7 @@ class BfxWebSocketClient(Connection):
self.__event_emitter.emit("wss-error", message["code"], message["msg"]) self.__event_emitter.emit("wss-error", message["code"], message["msg"])
if isinstance(message, list) and \ if isinstance(message, list) and \
message[0] == 0 and message[1] != Connection.HEARTBEAT: message[0] == 0 and message[1] != Connection._HEARTBEAT:
self.__handler.handle(message[1], message[2]) self.__handler.handle(message[1], message[2])
@Connection.require_websocket_connection @Connection.require_websocket_connection
@@ -277,10 +278,13 @@ class BfxWebSocketClient(Connection):
raise ZeroConnectionsError("Unable to subscribe: " \ raise ZeroConnectionsError("Unable to subscribe: " \
"the number of connections must be greater than 0.") "the number of connections must be greater than 0.")
if not channel in ["ticker", "trades", "book", "candles", "status"]:
raise UnknownChannelError("Available channels are: " + \
"ticker, trades, book, candles and status.")
_buckets = list(self.__buckets.keys()) _buckets = list(self.__buckets.keys())
counters = [ len(bucket.pendings) + len(bucket.subscriptions) counters = [ bucket.count for bucket in _buckets ]
for bucket in _buckets ]
index = counters.index(min(counters)) index = counters.index(min(counters))

View File

@@ -20,7 +20,7 @@ _R = TypeVar("_R")
_P = ParamSpec("_P") _P = ParamSpec("_P")
class Connection(ABC): class Connection(ABC):
HEARTBEAT = "hb" _HEARTBEAT = "hb"
def __init__(self, host: str) -> None: def __init__(self, host: str) -> None:
self._host = host self._host = host

View File

@@ -60,7 +60,7 @@ class BfxEventEmitter(AsyncIOEventEmitter):
self._connection += [ event ] self._connection += [ event ]
if event in _ONCE_PER_SUBSCRIPTION: if event in _ONCE_PER_SUBSCRIPTION:
sub_id = args[0]["subId"] sub_id = args[0]["sub_id"]
if event in self._subscriptions[sub_id]: if event in self._subscriptions[sub_id]:
return self._has_listeners(event) return self._has_listeners(event)

View File

@@ -1,5 +1,6 @@
from typing import TYPE_CHECKING, \ from typing import \
Union, List, Any, cast TYPE_CHECKING, List, Any, \
cast
from bfxapi.types import serializers from bfxapi.types import serializers
@@ -9,9 +10,6 @@ if TYPE_CHECKING:
from pyee.base import EventEmitter from pyee.base import EventEmitter
_NoHeaderSubscription = \
Union[Ticker, Trades, Book, Candles, Status]
_CHECKSUM = "cs" _CHECKSUM = "cs"
class PublicChannelsHandler: class PublicChannelsHandler:
@@ -19,30 +17,24 @@ class PublicChannelsHandler:
self.__event_emitter = event_emitter self.__event_emitter = event_emitter
def handle(self, subscription: "Subscription", stream: List[Any]) -> None: def handle(self, subscription: "Subscription", stream: List[Any]) -> None:
def _strip(subscription: "Subscription", *args: str) -> "_NoHeaderSubscription":
return cast("_NoHeaderSubscription", \
{ key: value for key, value in subscription.items() if key not in args })
_subscription = _strip(subscription, "event", "channel", "chanId")
if subscription["channel"] == "ticker": if subscription["channel"] == "ticker":
self.__ticker_channel_handler(cast("Ticker", _subscription), stream) self.__ticker_channel_handler(cast("Ticker", subscription), stream)
elif subscription["channel"] == "trades": elif subscription["channel"] == "trades":
self.__trades_channel_handler(cast("Trades", _subscription), stream) self.__trades_channel_handler(cast("Trades", subscription), stream)
elif subscription["channel"] == "book": elif subscription["channel"] == "book":
_subscription = cast("Book", _subscription) subscription = cast("Book", subscription)
if stream[0] == _CHECKSUM: if stream[0] == _CHECKSUM:
self.__checksum_handler(_subscription, stream[1]) self.__checksum_handler(subscription, stream[1])
else: else:
if _subscription["prec"] != "R0": if subscription["prec"] != "R0":
self.__book_channel_handler(_subscription, stream) self.__book_channel_handler(subscription, stream)
else: else:
self.__raw_book_channel_handler(_subscription, stream) self.__raw_book_channel_handler(subscription, stream)
elif subscription["channel"] == "candles": elif subscription["channel"] == "candles":
self.__candles_channel_handler(cast("Candles", _subscription), stream) self.__candles_channel_handler(cast("Candles", subscription), stream)
elif subscription["channel"] == "status": elif subscription["channel"] == "status":
self.__status_channel_handler(cast("Status", _subscription), stream) self.__status_channel_handler(cast("Status", subscription), stream)
def __ticker_channel_handler(self, subscription: "Ticker", stream: List[Any]): def __ticker_channel_handler(self, subscription: "Ticker", stream: List[Any]):
if subscription["symbol"].startswith("t"): if subscription["symbol"].startswith("t"):

View File

@@ -4,11 +4,12 @@ __all__ = [
"BfxWebSocketException", "BfxWebSocketException",
"ConnectionNotOpen", "ConnectionNotOpen",
"TooManySubscriptions", "FullBucketError",
"ZeroConnectionsError", "ZeroConnectionsError",
"ReconnectionTimeoutError", "ReconnectionTimeoutError",
"ActionRequiresAuthentication", "ActionRequiresAuthentication",
"InvalidAuthenticationCredentials", "InvalidAuthenticationCredentials",
"UnknownChannelError",
"UnknownEventError", "UnknownEventError",
"OutdatedClientVersion" "OutdatedClientVersion"
] ]
@@ -23,9 +24,9 @@ class ConnectionNotOpen(BfxWebSocketException):
This error indicates an attempt to communicate via websocket before starting the connection with the servers. This error indicates an attempt to communicate via websocket before starting the connection with the servers.
""" """
class TooManySubscriptions(BfxWebSocketException): class FullBucketError(BfxWebSocketException):
""" """
This error indicates a subscription attempt after reaching the limit of simultaneous connections. Thrown when a user attempts a subscription but all buckets are full.
""" """
class ZeroConnectionsError(BfxWebSocketException): class ZeroConnectionsError(BfxWebSocketException):
@@ -48,9 +49,14 @@ class InvalidAuthenticationCredentials(BfxWebSocketException):
This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication. This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication.
""" """
class UnknownChannelError(BfxWebSocketException):
"""
Thrown when a user attempts to subscribe to an unknown channel.
"""
class UnknownEventError(BfxWebSocketException): class UnknownEventError(BfxWebSocketException):
""" """
This error indicates a failed attempt to subscribe to an event not supported by the BfxWebSocketClient. Thrown when a user attempts to add a listener for an unknown event.
""" """
class OutdatedClientVersion(BfxWebSocketException): class OutdatedClientVersion(BfxWebSocketException):

View File

@@ -1,60 +1,34 @@
from typing import TypedDict, \ from typing import \
Union, Literal, Optional Union, Literal, TypedDict
Subscription = Union["Ticker", "Trades", "Book", "Candles", "Status"]
Channel = Literal["ticker", "trades", "book", "candles", "status"]
class Ticker(TypedDict): class Ticker(TypedDict):
subId: str channel: Literal["ticker"]
sub_id: str
symbol: str symbol: str
pair: Optional[str]
currency: Optional[str]
class Trades(TypedDict): class Trades(TypedDict):
subId: str channel: Literal["trades"]
sub_id: str
symbol: str symbol: str
pair: Optional[str]
currency: Optional[str]
class Book(TypedDict): class Book(TypedDict):
subId: str channel: Literal["book"]
sub_id: str
symbol: str symbol: str
prec: str prec: Literal["R0", "P0", "P1", "P2", "P3", "P4"]
freq: str freq: Literal["F0", "F1"]
len: str len: Literal["1", "25", "100", "250"]
pair: str
class Candles(TypedDict): class Candles(TypedDict):
subId: str channel: Literal["candles"]
sub_id: str
key: str key: str
class Status(TypedDict): class Status(TypedDict):
subId: str channel: Literal["status"]
sub_id: str
key: str key: str
Subscription = Union["_Ticker", "_Trades", "_Book", "_Candles", "_Status"]
_Channel = Literal["ticker", "trades", "book", "candles", "status"]
_Header = TypedDict("_Header", {
"event": Literal["subscribed"],
"channel": _Channel,
"chanId": int
})
#pylint: disable-next=inherit-non-class
class _Ticker(Ticker, _Header):
pass
#pylint: disable-next=inherit-non-class
class _Trades(Trades, _Header):
pass
#pylint: disable-next=inherit-non-class
class _Book(Book, _Header):
pass
#pylint: disable-next=inherit-non-class
class _Candles(Candles, _Header):
pass
#pylint: disable-next=inherit-non-class
class _Status(Status, _Header):
pass

View File

@@ -102,7 +102,7 @@ async def on_checksum(subscription: Book, value: int):
print("Mismatch between local and remote checksums: " print("Mismatch between local and remote checksums: "
f"restarting book for symbol <{symbol}>...") f"restarting book for symbol <{symbol}>...")
await bfx.wss.resubscribe(sub_id=subscription["subId"]) await bfx.wss.resubscribe(sub_id=subscription["sub_id"])
order_book.cooldown[symbol] = True order_book.cooldown[symbol] = True

View File

@@ -102,7 +102,7 @@ async def on_checksum(subscription: Book, value: int):
print("Mismatch between local and remote checksums: " print("Mismatch between local and remote checksums: "
f"restarting book for symbol <{symbol}>...") f"restarting book for symbol <{symbol}>...")
await bfx.wss.resubscribe(sub_id=subscription["subId"]) await bfx.wss.resubscribe(sub_id=subscription["sub_id"])
raw_order_book.cooldown[symbol] = True raw_order_book.cooldown[symbol] = True

View File

@@ -10,7 +10,7 @@ bfx = Client(wss_host=PUB_WSS_HOST)
@bfx.wss.on("t_ticker_update") @bfx.wss.on("t_ticker_update")
def on_t_ticker_update(subscription: Ticker, data: TradingPairTicker): def on_t_ticker_update(subscription: Ticker, data: TradingPairTicker):
print(f"Subscription with subId: {subscription['subId']}") print(f"Subscription with sub_id: {subscription['sub_id']}")
print(f"Data: {data}") print(f"Data: {data}")