From 36725a183e7a04670016946ed9dc051e84f8aad5 Mon Sep 17 00:00:00 2001 From: Davide Casale Date: Thu, 19 Jan 2023 18:12:12 +0100 Subject: [PATCH] Move _BfxWebsocketBucket class in its own file bfxapi/websocket/_BfxWebsocketBucket.py. --- bfxapi/websocket/BfxWebsocketClient.py | 86 ++---------------------- bfxapi/websocket/_BfxWebsocketBucket.py | 87 +++++++++++++++++++++++++ bfxapi/websocket/subscriptions.py | 8 +++ 3 files changed, 100 insertions(+), 81 deletions(-) create mode 100644 bfxapi/websocket/_BfxWebsocketBucket.py diff --git a/bfxapi/websocket/BfxWebsocketClient.py b/bfxapi/websocket/BfxWebsocketClient.py index c5b2f31..e5c7efb 100644 --- a/bfxapi/websocket/BfxWebsocketClient.py +++ b/bfxapi/websocket/BfxWebsocketClient.py @@ -4,27 +4,16 @@ from typing import Literal, TypeVar, Callable, cast from pyee.asyncio import AsyncIOEventEmitter +from ._BfxWebsocketBucket import _HEARTBEAT, F, _require_websocket_connection, _BfxWebsocketBucket + from ._BfxWebsocketInputs import _BfxWebsocketInputs from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler -from .exceptions import ConnectionNotOpen, TooManySubscriptions, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion +from .exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported from ..utils.encoder import JSONEncoder from ..utils.logger import Formatter, CustomLogger -_HEARTBEAT = "hb" - -F = TypeVar("F", bound=Callable[..., Literal[None]]) - -def _require_websocket_connection(function: F) -> F: - async def wrapper(self, *args, **kwargs): - if self.websocket == None or self.websocket.open == False: - raise ConnectionNotOpen("No open connection with the server.") - - await function(self, *args, **kwargs) - - return cast(F, wrapper) - def _require_websocket_authentication(function: F) -> F: async def wrapper(self, *args, **kwargs): if self.authentication == False: @@ -35,7 +24,7 @@ def _require_websocket_authentication(function: F) -> F: return cast(F, wrapper) class BfxWebsocketClient(object): - VERSION = 2 + VERSION = _BfxWebsocketBucket.VERSION MAXIMUM_BUCKETS_AMOUNT = 20 @@ -163,69 +152,4 @@ class BfxWebsocketClient(object): def handler(function): self.event_emitter.once(event, function) - return handler - -class _BfxWebsocketBucket(object): - MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 - - def __init__(self, host, event_emitter, __bucket_open_signal): - self.host, self.event_emitter, self.__bucket_open_signal = host, event_emitter, __bucket_open_signal - - self.websocket, self.subscriptions, self.pendings = None, dict(), list() - - self.handler = PublicChannelsHandler(event_emitter=self.event_emitter) - - async def _connect(self, index): - async for websocket in websockets.connect(self.host): - self.websocket = websocket - - self.__bucket_open_signal(index) - - try: - async for message in websocket: - message = json.loads(message) - - if isinstance(message, dict) and message["event"] == "info" and "version" in message: - if BfxWebsocketClient.VERSION != message["version"]: - raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).") - elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]): - self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ] - self.subscriptions[chanId] = message - self.event_emitter.emit("subscribed", message) - elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]): - if message["status"] == "OK": - del self.subscriptions[chanId] - elif isinstance(message, dict) and message["event"] == "error": - self.event_emitter.emit("wss-error", message["code"], message["msg"]) - elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT: - self.handler.handle(self.subscriptions[chanId], *message[1:]) - except websockets.ConnectionClosedError: continue - finally: await self.websocket.wait_closed(); break - - @_require_websocket_connection - async def _subscribe(self, channel, subId=None, **kwargs): - if len(self.subscriptions) + len(self.pendings) == _BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT: - raise TooManySubscriptions("The client has reached the maximum number of subscriptions.") - - subscription = { - "event": "subscribe", - "channel": channel, - "subId": subId or str(uuid.uuid4()), - - **kwargs - } - - self.pendings.append(subscription) - - await self.websocket.send(json.dumps(subscription)) - - @_require_websocket_connection - async def _unsubscribe(self, chanId): - await self.websocket.send(json.dumps({ - "event": "unsubscribe", - "chanId": chanId - })) - - @_require_websocket_connection - async def _close(self, code=1000, reason=str()): - await self.websocket.close(code=code, reason=reason) \ No newline at end of file + return handler \ No newline at end of file diff --git a/bfxapi/websocket/_BfxWebsocketBucket.py b/bfxapi/websocket/_BfxWebsocketBucket.py new file mode 100644 index 0000000..2cfe48c --- /dev/null +++ b/bfxapi/websocket/_BfxWebsocketBucket.py @@ -0,0 +1,87 @@ +import json, uuid, websockets + +from typing import Literal, TypeVar, Callable, cast + +from .handlers import PublicChannelsHandler + +from .exceptions import ConnectionNotOpen, TooManySubscriptions, OutdatedClientVersion + +_HEARTBEAT = "hb" + +F = TypeVar("F", bound=Callable[..., Literal[None]]) + +def _require_websocket_connection(function: F) -> F: + async def wrapper(self, *args, **kwargs): + if self.websocket == None or self.websocket.open == False: + raise ConnectionNotOpen("No open connection with the server.") + + await function(self, *args, **kwargs) + + return cast(F, wrapper) + +class _BfxWebsocketBucket(object): + VERSION = 2 + + MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 + + def __init__(self, host, event_emitter, __bucket_open_signal): + self.host, self.event_emitter, self.__bucket_open_signal = host, event_emitter, __bucket_open_signal + + self.websocket, self.subscriptions, self.pendings = None, dict(), list() + + self.handler = PublicChannelsHandler(event_emitter=self.event_emitter) + + async def _connect(self, index): + async for websocket in websockets.connect(self.host): + self.websocket = websocket + + self.__bucket_open_signal(index) + + try: + async for message in websocket: + message = json.loads(message) + + if isinstance(message, dict) and message["event"] == "info" and "version" in message: + if _BfxWebsocketBucket.VERSION != message["version"]: + raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {_BfxWebsocketBucket.VERSION}, server version: {message['version']}).") + elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]): + self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ] + self.subscriptions[chanId] = message + self.event_emitter.emit("subscribed", message) + elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]): + if message["status"] == "OK": + del self.subscriptions[chanId] + elif isinstance(message, dict) and message["event"] == "error": + self.event_emitter.emit("wss-error", message["code"], message["msg"]) + elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT: + self.handler.handle(self.subscriptions[chanId], *message[1:]) + except websockets.ConnectionClosedError: continue + finally: await self.websocket.wait_closed(); break + + @_require_websocket_connection + async def _subscribe(self, channel, subId=None, **kwargs): + if len(self.subscriptions) + len(self.pendings) == _BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT: + raise TooManySubscriptions("The client has reached the maximum number of subscriptions.") + + subscription = { + "event": "subscribe", + "channel": channel, + "subId": subId or str(uuid.uuid4()), + + **kwargs + } + + self.pendings.append(subscription) + + await self.websocket.send(json.dumps(subscription)) + + @_require_websocket_connection + async def _unsubscribe(self, chanId): + await self.websocket.send(json.dumps({ + "event": "unsubscribe", + "chanId": chanId + })) + + @_require_websocket_connection + async def _close(self, code=1000, reason=str()): + await self.websocket.close(code=code, reason=reason) \ No newline at end of file diff --git a/bfxapi/websocket/subscriptions.py b/bfxapi/websocket/subscriptions.py index 08b7b21..2acc1af 100644 --- a/bfxapi/websocket/subscriptions.py +++ b/bfxapi/websocket/subscriptions.py @@ -1,5 +1,13 @@ from typing import TypedDict, Optional +__all__ = [ + "Ticker", + "Trades", + "Book", + "Candles", + "Status" +] + class Ticker(TypedDict): chanId: int; symbol: str pair: Optional[str]