diff --git a/bfxapi/websocket/BfxWebsocketClient.py b/bfxapi/websocket/BfxWebsocketClient.py index 91153ad..5347b0d 100644 --- a/bfxapi/websocket/BfxWebsocketClient.py +++ b/bfxapi/websocket/BfxWebsocketClient.py @@ -1,10 +1,12 @@ -import json, asyncio, hmac, hashlib, time, websockets +import json, asyncio, hmac, hashlib, time, uuid, websockets + +from enum import Enum from pyee.asyncio import AsyncIOEventEmitter from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler -from .errors import ConnectionNotOpen, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion +from .exceptions import ConnectionNotOpen, TooManySubscriptions, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion from ..utils.logger import CustomLogger @@ -19,94 +21,53 @@ def _require_websocket_connection(function): return wrapper -def _require_websocket_authentication(function): - @_require_websocket_connection - async def wrapper(self, *args, **kwargs): - if self.authentication == False: - raise WebsocketAuthenticationRequired("To perform this action you need to authenticate using your API_KEY and API_SECRET.") - - await function(self, *args, **kwargs) - - return wrapper - class BfxWebsocketClient(object): VERSION = 2 EVENTS = [ - "open", "subscribed", "authenticated", "error", + "open", "subscribed", "authenticated", "wss-error", *PublicChannelsHandler.EVENTS, *AuthenticatedChannelsHandler.EVENTS ] - def __init__(self, host, log_level = "INFO", API_KEY=None, API_SECRET=None): - self.host, self.chanIds, self.event_emitter = host, dict(), AsyncIOEventEmitter() + def __init__(self, host, buckets=5, API_KEY=None, API_SECRET=None): + self.host, self.websocket, self.event_emitter = host, None, AsyncIOEventEmitter() - self.websocket, self.API_KEY, self.API_SECRET = None, API_KEY, API_SECRET + self.API_KEY, self.API_SECRET, self.authentication = API_KEY, API_SECRET, False - self.authentication = False + self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter) - self.handlers = { - "public": PublicChannelsHandler(event_emitter=self.event_emitter), - "authenticated": AuthenticatedChannelsHandler(event_emitter=self.event_emitter) - } + self.buckets = [ _BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ] - self.logger = CustomLogger("BfxWebsocketClient", logLevel=log_level) + async def start(self): + tasks = [ bucket._connect(index) for index, bucket in enumerate(self.buckets) ] + + if self.API_KEY != None and self.API_SECRET != None: + tasks.append(self.__connect(self.API_KEY, self.API_SECRET)) - async def connect(self): + await asyncio.gather(*tasks) + + async def __connect(self, API_KEY, API_SECRET, filter=None): async for websocket in websockets.connect(self.host): self.websocket = websocket + + await self.__authenticate(API_KEY, API_SECRET, filter) - try: - self.event_emitter.emit("open") - - if self.API_KEY != None and self.API_SECRET != None: - await self.authenticate(self.API_KEY, self.API_SECRET) - + try: async for message in websocket: message = json.loads(message) - if isinstance(message, dict) and message["event"] == "info" and "version" in message: - if BfxWebsocketClient.VERSION != message["version"]: - raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).") - elif isinstance(message, dict) and message["event"] == "subscribed": - self.chanIds[message["chanId"]] = message - - self.event_emitter.emit("subscribed", message) - elif isinstance(message, dict) and message["event"] == "unsubscribed": + if isinstance(message, dict) and message["event"] == "auth": if message["status"] == "OK": - del self.chanIds[message["chanId"]] - elif isinstance(message, dict) and message["event"] == "auth": - if message["status"] == "OK": - self.event_emitter.emit("authenticated", message) - - self.authentication = True + self.event_emitter.emit("authenticated", message); self.authentication = True else: raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.") elif isinstance(message, dict) and message["event"] == "error": - self.event_emitter.emit("error", message["code"], message["msg"]) - elif isinstance(message, list) and message[1] != HEARTBEAT: - if ((chanId := message[0]) or True) and chanId == 0: - self.handlers["authenticated"].handle(message[1], message[2]) - else: self.handlers["public"].handle(self.chanIds[chanId], *message[1:]) - except websockets.ConnectionClosed: - continue + self.event_emitter.emit("wss-error", message["code"], message["msg"]) + elif isinstance(message, list) and (chanId := message[0]) == 0 and message[1] != HEARTBEAT: + self.handler.handle(message[1], message[2]) + except websockets.ConnectionClosedError: continue - @_require_websocket_connection - async def subscribe(self, channel, **kwargs): - await self.websocket.send(json.dumps({ - "event": "subscribe", - "channel": channel, - **kwargs - })) - - @_require_websocket_connection - async def unsubscribe(self, chanId): - await self.websocket.send(json.dumps({ - "event": "unsubscribe", - "chanId": chanId - })) - - @_require_websocket_connection - async def authenticate(self, API_KEY, API_SECRET, filter=None): + async def __authenticate(self, API_KEY, API_SECRET, filter=None): data = { "event": "auth", "filter": filter, "apiKey": API_KEY } data["authNonce"] = int(time.time()) * 1000 @@ -121,13 +82,38 @@ class BfxWebsocketClient(object): await self.websocket.send(json.dumps(data)) - @_require_websocket_authentication - async def notify(self, MESSAGE_ID, info): - await self.websocket.send(json.dumps([ 0, "n", MESSAGE_ID, { "type": "ucm-test", "info": info } ])) + async def subscribe(self, channel, **kwargs): + counters = [ len(bucket.pendings) + len(bucket.chanIds) for bucket in self.buckets ] - async def clear(self): - for chanId in self.chanIds.keys(): - await self.unsubscribe(chanId) + index = counters.index(min(counters)) + + await self.buckets[index]._subscribe(channel, **kwargs) + + async def unsubscribe(self, chanId): + for bucket in self.buckets: + if chanId in bucket.chanIds.keys(): + await bucket._unsubscribe(chanId=chanId) + + async def close(self, code=1000, reason=str()): + if self.websocket != None and self.websocket.open == True: + await self.websocket.close(code=code, reason=reason) + + for bucket in self.buckets: + await bucket.close(code=code, reason=reason) + + def __require_websocket_authentication(function): + @_require_websocket_connection + async def wrapper(self, *args, **kwargs): + if self.authentication == False: + raise WebsocketAuthenticationRequired("To perform this action you need to authenticate using your API_KEY and API_SECRET.") + + await function(self, *args, **kwargs) + + return wrapper + + def __bucket_open_signal(self, index): + if all(bucket.websocket != None and bucket.websocket.open == True for bucket in self.buckets): + self.event_emitter.emit("open") def on(self, event): if event not in BfxWebsocketClient.EVENTS: @@ -147,5 +133,83 @@ class BfxWebsocketClient(object): return handler - def run(self): - asyncio.run(self.connect()) \ No newline at end of file +class _BfxWebsocketBucket(object): + MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25 + + def __init__(self, host, event_emitter, __bucket_open_signal): + self.host, self.event_emitter, self.__bucket_open_signal = host, event_emitter, __bucket_open_signal + + self.websocket, self.chanIds, self.pendings = None, dict(), list() + + self.handler = PublicChannelsHandler(event_emitter=self.event_emitter) + + async def _connect(self, index): + async for websocket in websockets.connect(self.host): + self.websocket = websocket + + self.__bucket_open_signal(index) + + try: + async for message in websocket: + message = json.loads(message) + + if isinstance(message, dict) and message["event"] == "info" and "version" in message: + if BfxWebsocketClient.VERSION != message["version"]: + raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).") + elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]): + self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ] + self.chanIds[chanId] = message + self.event_emitter.emit("subscribed", message) + elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]): + if message["status"] == "OK": + del self.chanIds[chanId] + elif isinstance(message, dict) and message["event"] == "error": + self.event_emitter.emit("wss-error", message["code"], message["msg"]) + elif isinstance(message, list) and (chanId := message[0]) and message[1] != HEARTBEAT: + self.handler.handle(self.chanIds[chanId], *message[1:]) + except websockets.ConnectionClosedError: continue + + @_require_websocket_connection + async def _subscribe(self, channel, subId=None, **kwargs): + if len(self.chanIds) + len(self.pendings) == _BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT: + raise TooManySubscriptions("The client has reached the maximum number of subscriptions.") + + subscription = { + "event": "subscribe", + "channel": channel, + "subId": subId or str(uuid.uuid4()), + + **kwargs + } + + self.pendings.append(subscription) + + await self.websocket.send(json.dumps(subscription)) + + @_require_websocket_connection + async def _unsubscribe(self, chanId): + await self.websocket.send(json.dumps({ + "event": "unsubscribe", + "chanId": chanId + })) + + @_require_websocket_connection + async def close(self, code=1000, reason=str()): + await self.websocket.close(code=code, reason=reason) + +class Errors(int, Enum): + ERR_UNK = 10000 + ERR_GENERIC = 10001 + ERR_CONCURRENCY = 10008 + ERR_PARAMS = 10020 + ERR_CONF_FAIL = 10050 + ERR_AUTH_FAIL = 10100 + ERR_AUTH_PAYLOAD = 10111 + ERR_AUTH_SIG = 10112 + ERR_AUTH_HMAC = 10113 + ERR_AUTH_NONCE = 10114 + ERR_UNAUTH_FAIL = 10200 + ERR_SUB_FAIL = 10300 + ERR_SUB_MULTI = 10301 + ERR_UNSUB_FAIL = 10400 + ERR_READY = 11000 \ No newline at end of file diff --git a/bfxapi/websocket/__init__.py b/bfxapi/websocket/__init__.py index f36a1b0..4704773 100644 --- a/bfxapi/websocket/__init__.py +++ b/bfxapi/websocket/__init__.py @@ -1,3 +1,3 @@ -from .BfxWebsocketClient import BfxWebsocketClient +from .BfxWebsocketClient import BfxWebsocketClient, Errors from .handlers import Channels -from .errors import BfxWebsocketException, ConnectionNotOpen, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion \ No newline at end of file +from .exceptions import BfxWebsocketException, ConnectionNotOpen, TooManySubscriptions, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion \ No newline at end of file diff --git a/bfxapi/websocket/errors.py b/bfxapi/websocket/exceptions.py similarity index 75% rename from bfxapi/websocket/errors.py rename to bfxapi/websocket/exceptions.py index 7565bc9..3a1f900 100644 --- a/bfxapi/websocket/errors.py +++ b/bfxapi/websocket/exceptions.py @@ -1,7 +1,10 @@ __all__ = [ - "BfxWebsocketException", "ConnectionNotOpen", - "InvalidAuthenticationCredentials" + "TooManySubscriptions", + "WebsocketAuthenticationRequired", + "InvalidAuthenticationCredentials", + "EventNotSupported", + "OutdatedClientVersion" ] class BfxWebsocketException(Exception): @@ -18,6 +21,13 @@ class ConnectionNotOpen(BfxWebsocketException): pass +class TooManySubscriptions(BfxWebsocketException): + """ + This error indicates an attempt to subscribe to a public channel after reaching the limit of simultaneous connections. + """ + + pass + class WebsocketAuthenticationRequired(BfxWebsocketException): """ This error indicates an attempt to access a protected resource without logging in first. diff --git a/bfxapi/websocket/handlers.py b/bfxapi/websocket/handlers.py index aebe66c..dd85b76 100644 --- a/bfxapi/websocket/handlers.py +++ b/bfxapi/websocket/handlers.py @@ -2,7 +2,7 @@ from enum import Enum from . import serializers -from .errors import BfxWebsocketException +from .exceptions import BfxWebsocketException def _get_sub_dictionary(dictionary, keys): return { key: dictionary[key] for key in dictionary if key in keys } @@ -22,9 +22,9 @@ class Channels(str, Enum): class PublicChannelsHandler(object): EVENTS = [ - "tp_ticker_update", "fc_ticker_update", - "tp_trade_executed", "tp_trade_execution_update", "fc_trade_executed", "fc_trade_execution_update", "tp_trades_snapshot", "fc_trades_snapshot", - "tp_book_snapshot", "fc_book_snapshot", "tp_raw_book_snapshot", "fc_raw_book_snapshot", "tp_book_update", "fc_book_update", "tp_raw_book_update", "fc_raw_book_update", + "t_ticker_update", "f_ticker_update", + "t_trade_executed", "t_trade_execution_update", "f_trade_executed", "f_trade_execution_update", "t_trades_snapshot", "f_trades_snapshot", + "t_book_snapshot", "f_book_snapshot", "t_raw_book_snapshot", "f_raw_book_snapshot", "t_book_update", "f_book_update", "t_raw_book_update", "f_raw_book_update", "candles_snapshot", "candles_update", "derivatives_status_update", ] @@ -47,14 +47,14 @@ class PublicChannelsHandler(object): def __ticker_channel_handler(self, subscription, *stream): if subscription["symbol"].startswith("t"): return self.event_emitter.emit( - "tp_ticker_update", + "t_ticker_update", _get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]), serializers.TradingPairTicker.parse(*stream[0]) ) if subscription["symbol"].startswith("f"): return self.event_emitter.emit( - "fc_ticker_update", + "f_ticker_update", _get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]), serializers.FundingCurrencyTicker.parse(*stream[0]) ) @@ -63,28 +63,28 @@ class PublicChannelsHandler(object): if type := stream[0] or type in [ "te", "tu", "fte", "ftu" ]: if subscription["symbol"].startswith("t"): return self.event_emitter.emit( - { "te": "tp_trade_executed", "tu": "tp_trade_execution_update" }[type], + { "te": "t_trade_executed", "tu": "t_trade_execution_update" }[type], _get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]), serializers.TradingPairTrade.parse(*stream[1]) ) if subscription["symbol"].startswith("f"): return self.event_emitter.emit( - { "fte": "fc_trade_executed", "ftu": "fc_trade_execution_update" }[type], + { "fte": "f_trade_executed", "ftu": "f_trade_execution_update" }[type], _get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]), serializers.FundingCurrencyTrade.parse(*stream[1]) ) if subscription["symbol"].startswith("t"): return self.event_emitter.emit( - "tp_trades_snapshot", + "t_trades_snapshot", _get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]), [ serializers.TradingPairTrade.parse(*substream) for substream in stream[0] ] ) if subscription["symbol"].startswith("f"): return self.event_emitter.emit( - "fc_trades_snapshot", + "f_trades_snapshot", _get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]), [ serializers.FundingCurrencyTrade.parse(*substream) for substream in stream[0] ] ) @@ -100,13 +100,13 @@ class PublicChannelsHandler(object): if all(isinstance(substream, list) for substream in stream[0]): return self.event_emitter.emit( - { "t": "tp_", "f": "fc_" }[type] + (IS_RAW_BOOK and "raw_book" or "book") + "_snapshot", + type + "_" + (IS_RAW_BOOK and "raw_book" or "book") + "_snapshot", subscription, [ { "t": _trading_pair_serializer, "f": _funding_currency_serializer }[type].parse(*substream) for substream in stream[0] ] ) return self.event_emitter.emit( - { "t": "tp_", "f": "fc_" }[type] + (IS_RAW_BOOK and "raw_book" or "book") + "_update", + type + "_" + (IS_RAW_BOOK and "raw_book" or "book") + "_update", subscription, { "t": _trading_pair_serializer, "f": _funding_currency_serializer }[type].parse(*stream[0]) ) diff --git a/bfxapi/websocket/serializers.py b/bfxapi/websocket/serializers.py index 47cd770..b8796fd 100644 --- a/bfxapi/websocket/serializers.py +++ b/bfxapi/websocket/serializers.py @@ -1,4 +1,4 @@ -from .errors import BfxWebsocketException +from .exceptions import BfxWebsocketException class _Serializer(object): def __init__(self, name, labels): diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..9eff973 --- /dev/null +++ b/setup.py @@ -0,0 +1,22 @@ +from distutils.core import setup + +setup( + name="bitfinex-api-py", + version="3.0.0", + packages=[ "bfxapi", "bfxapi.websocket" ], + url="https://github.com/bitfinexcom/bitfinex-api-py", + license="OSI Approved :: Apache Software License", + author="Bitfinex", + author_email="support@bitfinex.com", + description="Official Bitfinex Python API", + keywords="bitfinex,api,trading", + install_requires=[ + "pyee~=9.0.4", + "typing_extensions~=4.4.0", + "websockets~=10.4", + ], + project_urls={ + "Bug Reports": "https://github.com/bitfinexcom/bitfinex-api-py/issues", + "Source": "https://github.com/bitfinexcom/bitfinex-api-py", + } +) \ No newline at end of file