Merge branch 'master' into add-logging-system

This commit is contained in:
Davide Casale
2022-11-22 18:49:55 +01:00
committed by GitHub
6 changed files with 188 additions and 92 deletions

View File

@@ -1,10 +1,12 @@
import json, asyncio, hmac, hashlib, time, websockets
import json, asyncio, hmac, hashlib, time, uuid, websockets
from enum import Enum
from pyee.asyncio import AsyncIOEventEmitter
from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler
from .errors import ConnectionNotOpen, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion
from .exceptions import ConnectionNotOpen, TooManySubscriptions, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion
from ..utils.logger import CustomLogger
@@ -19,94 +21,53 @@ def _require_websocket_connection(function):
return wrapper
def _require_websocket_authentication(function):
@_require_websocket_connection
async def wrapper(self, *args, **kwargs):
if self.authentication == False:
raise WebsocketAuthenticationRequired("To perform this action you need to authenticate using your API_KEY and API_SECRET.")
await function(self, *args, **kwargs)
return wrapper
class BfxWebsocketClient(object):
VERSION = 2
EVENTS = [
"open", "subscribed", "authenticated", "error",
"open", "subscribed", "authenticated", "wss-error",
*PublicChannelsHandler.EVENTS,
*AuthenticatedChannelsHandler.EVENTS
]
def __init__(self, host, log_level = "INFO", API_KEY=None, API_SECRET=None):
self.host, self.chanIds, self.event_emitter = host, dict(), AsyncIOEventEmitter()
def __init__(self, host, buckets=5, API_KEY=None, API_SECRET=None):
self.host, self.websocket, self.event_emitter = host, None, AsyncIOEventEmitter()
self.websocket, self.API_KEY, self.API_SECRET = None, API_KEY, API_SECRET
self.API_KEY, self.API_SECRET, self.authentication = API_KEY, API_SECRET, False
self.authentication = False
self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter)
self.handlers = {
"public": PublicChannelsHandler(event_emitter=self.event_emitter),
"authenticated": AuthenticatedChannelsHandler(event_emitter=self.event_emitter)
}
self.buckets = [ _BfxWebsocketBucket(self.host, self.event_emitter, self.__bucket_open_signal) for _ in range(buckets) ]
self.logger = CustomLogger("BfxWebsocketClient", logLevel=log_level)
async def start(self):
tasks = [ bucket._connect(index) for index, bucket in enumerate(self.buckets) ]
if self.API_KEY != None and self.API_SECRET != None:
tasks.append(self.__connect(self.API_KEY, self.API_SECRET))
async def connect(self):
await asyncio.gather(*tasks)
async def __connect(self, API_KEY, API_SECRET, filter=None):
async for websocket in websockets.connect(self.host):
self.websocket = websocket
await self.__authenticate(API_KEY, API_SECRET, filter)
try:
self.event_emitter.emit("open")
if self.API_KEY != None and self.API_SECRET != None:
await self.authenticate(self.API_KEY, self.API_SECRET)
try:
async for message in websocket:
message = json.loads(message)
if isinstance(message, dict) and message["event"] == "info" and "version" in message:
if BfxWebsocketClient.VERSION != message["version"]:
raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).")
elif isinstance(message, dict) and message["event"] == "subscribed":
self.chanIds[message["chanId"]] = message
self.event_emitter.emit("subscribed", message)
elif isinstance(message, dict) and message["event"] == "unsubscribed":
if isinstance(message, dict) and message["event"] == "auth":
if message["status"] == "OK":
del self.chanIds[message["chanId"]]
elif isinstance(message, dict) and message["event"] == "auth":
if message["status"] == "OK":
self.event_emitter.emit("authenticated", message)
self.authentication = True
self.event_emitter.emit("authenticated", message); self.authentication = True
else: raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.")
elif isinstance(message, dict) and message["event"] == "error":
self.event_emitter.emit("error", message["code"], message["msg"])
elif isinstance(message, list) and message[1] != HEARTBEAT:
if ((chanId := message[0]) or True) and chanId == 0:
self.handlers["authenticated"].handle(message[1], message[2])
else: self.handlers["public"].handle(self.chanIds[chanId], *message[1:])
except websockets.ConnectionClosed:
continue
self.event_emitter.emit("wss-error", message["code"], message["msg"])
elif isinstance(message, list) and (chanId := message[0]) == 0 and message[1] != HEARTBEAT:
self.handler.handle(message[1], message[2])
except websockets.ConnectionClosedError: continue
@_require_websocket_connection
async def subscribe(self, channel, **kwargs):
await self.websocket.send(json.dumps({
"event": "subscribe",
"channel": channel,
**kwargs
}))
@_require_websocket_connection
async def unsubscribe(self, chanId):
await self.websocket.send(json.dumps({
"event": "unsubscribe",
"chanId": chanId
}))
@_require_websocket_connection
async def authenticate(self, API_KEY, API_SECRET, filter=None):
async def __authenticate(self, API_KEY, API_SECRET, filter=None):
data = { "event": "auth", "filter": filter, "apiKey": API_KEY }
data["authNonce"] = int(time.time()) * 1000
@@ -121,13 +82,38 @@ class BfxWebsocketClient(object):
await self.websocket.send(json.dumps(data))
@_require_websocket_authentication
async def notify(self, MESSAGE_ID, info):
await self.websocket.send(json.dumps([ 0, "n", MESSAGE_ID, { "type": "ucm-test", "info": info } ]))
async def subscribe(self, channel, **kwargs):
counters = [ len(bucket.pendings) + len(bucket.chanIds) for bucket in self.buckets ]
async def clear(self):
for chanId in self.chanIds.keys():
await self.unsubscribe(chanId)
index = counters.index(min(counters))
await self.buckets[index]._subscribe(channel, **kwargs)
async def unsubscribe(self, chanId):
for bucket in self.buckets:
if chanId in bucket.chanIds.keys():
await bucket._unsubscribe(chanId=chanId)
async def close(self, code=1000, reason=str()):
if self.websocket != None and self.websocket.open == True:
await self.websocket.close(code=code, reason=reason)
for bucket in self.buckets:
await bucket.close(code=code, reason=reason)
def __require_websocket_authentication(function):
@_require_websocket_connection
async def wrapper(self, *args, **kwargs):
if self.authentication == False:
raise WebsocketAuthenticationRequired("To perform this action you need to authenticate using your API_KEY and API_SECRET.")
await function(self, *args, **kwargs)
return wrapper
def __bucket_open_signal(self, index):
if all(bucket.websocket != None and bucket.websocket.open == True for bucket in self.buckets):
self.event_emitter.emit("open")
def on(self, event):
if event not in BfxWebsocketClient.EVENTS:
@@ -147,5 +133,83 @@ class BfxWebsocketClient(object):
return handler
def run(self):
asyncio.run(self.connect())
class _BfxWebsocketBucket(object):
MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25
def __init__(self, host, event_emitter, __bucket_open_signal):
self.host, self.event_emitter, self.__bucket_open_signal = host, event_emitter, __bucket_open_signal
self.websocket, self.chanIds, self.pendings = None, dict(), list()
self.handler = PublicChannelsHandler(event_emitter=self.event_emitter)
async def _connect(self, index):
async for websocket in websockets.connect(self.host):
self.websocket = websocket
self.__bucket_open_signal(index)
try:
async for message in websocket:
message = json.loads(message)
if isinstance(message, dict) and message["event"] == "info" and "version" in message:
if BfxWebsocketClient.VERSION != message["version"]:
raise OutdatedClientVersion(f"Mismatch between the client version and the server version. Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, server version: {message['version']}).")
elif isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]):
self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ]
self.chanIds[chanId] = message
self.event_emitter.emit("subscribed", message)
elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]):
if message["status"] == "OK":
del self.chanIds[chanId]
elif isinstance(message, dict) and message["event"] == "error":
self.event_emitter.emit("wss-error", message["code"], message["msg"])
elif isinstance(message, list) and (chanId := message[0]) and message[1] != HEARTBEAT:
self.handler.handle(self.chanIds[chanId], *message[1:])
except websockets.ConnectionClosedError: continue
@_require_websocket_connection
async def _subscribe(self, channel, subId=None, **kwargs):
if len(self.chanIds) + len(self.pendings) == _BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT:
raise TooManySubscriptions("The client has reached the maximum number of subscriptions.")
subscription = {
"event": "subscribe",
"channel": channel,
"subId": subId or str(uuid.uuid4()),
**kwargs
}
self.pendings.append(subscription)
await self.websocket.send(json.dumps(subscription))
@_require_websocket_connection
async def _unsubscribe(self, chanId):
await self.websocket.send(json.dumps({
"event": "unsubscribe",
"chanId": chanId
}))
@_require_websocket_connection
async def close(self, code=1000, reason=str()):
await self.websocket.close(code=code, reason=reason)
class Errors(int, Enum):
ERR_UNK = 10000
ERR_GENERIC = 10001
ERR_CONCURRENCY = 10008
ERR_PARAMS = 10020
ERR_CONF_FAIL = 10050
ERR_AUTH_FAIL = 10100
ERR_AUTH_PAYLOAD = 10111
ERR_AUTH_SIG = 10112
ERR_AUTH_HMAC = 10113
ERR_AUTH_NONCE = 10114
ERR_UNAUTH_FAIL = 10200
ERR_SUB_FAIL = 10300
ERR_SUB_MULTI = 10301
ERR_UNSUB_FAIL = 10400
ERR_READY = 11000

View File

@@ -1,3 +1,3 @@
from .BfxWebsocketClient import BfxWebsocketClient
from .BfxWebsocketClient import BfxWebsocketClient, Errors
from .handlers import Channels
from .errors import BfxWebsocketException, ConnectionNotOpen, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion
from .exceptions import BfxWebsocketException, ConnectionNotOpen, TooManySubscriptions, WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion

View File

@@ -1,7 +1,10 @@
__all__ = [
"BfxWebsocketException",
"ConnectionNotOpen",
"InvalidAuthenticationCredentials"
"TooManySubscriptions",
"WebsocketAuthenticationRequired",
"InvalidAuthenticationCredentials",
"EventNotSupported",
"OutdatedClientVersion"
]
class BfxWebsocketException(Exception):
@@ -18,6 +21,13 @@ class ConnectionNotOpen(BfxWebsocketException):
pass
class TooManySubscriptions(BfxWebsocketException):
"""
This error indicates an attempt to subscribe to a public channel after reaching the limit of simultaneous connections.
"""
pass
class WebsocketAuthenticationRequired(BfxWebsocketException):
"""
This error indicates an attempt to access a protected resource without logging in first.

View File

@@ -2,7 +2,7 @@ from enum import Enum
from . import serializers
from .errors import BfxWebsocketException
from .exceptions import BfxWebsocketException
def _get_sub_dictionary(dictionary, keys):
return { key: dictionary[key] for key in dictionary if key in keys }
@@ -22,9 +22,9 @@ class Channels(str, Enum):
class PublicChannelsHandler(object):
EVENTS = [
"tp_ticker_update", "fc_ticker_update",
"tp_trade_executed", "tp_trade_execution_update", "fc_trade_executed", "fc_trade_execution_update", "tp_trades_snapshot", "fc_trades_snapshot",
"tp_book_snapshot", "fc_book_snapshot", "tp_raw_book_snapshot", "fc_raw_book_snapshot", "tp_book_update", "fc_book_update", "tp_raw_book_update", "fc_raw_book_update",
"t_ticker_update", "f_ticker_update",
"t_trade_executed", "t_trade_execution_update", "f_trade_executed", "f_trade_execution_update", "t_trades_snapshot", "f_trades_snapshot",
"t_book_snapshot", "f_book_snapshot", "t_raw_book_snapshot", "f_raw_book_snapshot", "t_book_update", "f_book_update", "t_raw_book_update", "f_raw_book_update",
"candles_snapshot", "candles_update",
"derivatives_status_update",
]
@@ -47,14 +47,14 @@ class PublicChannelsHandler(object):
def __ticker_channel_handler(self, subscription, *stream):
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
"tp_ticker_update",
"t_ticker_update",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]),
serializers.TradingPairTicker.parse(*stream[0])
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
"fc_ticker_update",
"f_ticker_update",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]),
serializers.FundingCurrencyTicker.parse(*stream[0])
)
@@ -63,28 +63,28 @@ class PublicChannelsHandler(object):
if type := stream[0] or type in [ "te", "tu", "fte", "ftu" ]:
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
{ "te": "tp_trade_executed", "tu": "tp_trade_execution_update" }[type],
{ "te": "t_trade_executed", "tu": "t_trade_execution_update" }[type],
_get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]),
serializers.TradingPairTrade.parse(*stream[1])
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
{ "fte": "fc_trade_executed", "ftu": "fc_trade_execution_update" }[type],
{ "fte": "f_trade_executed", "ftu": "f_trade_execution_update" }[type],
_get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]),
serializers.FundingCurrencyTrade.parse(*stream[1])
)
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
"tp_trades_snapshot",
"t_trades_snapshot",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]),
[ serializers.TradingPairTrade.parse(*substream) for substream in stream[0] ]
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
"fc_trades_snapshot",
"f_trades_snapshot",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]),
[ serializers.FundingCurrencyTrade.parse(*substream) for substream in stream[0] ]
)
@@ -100,13 +100,13 @@ class PublicChannelsHandler(object):
if all(isinstance(substream, list) for substream in stream[0]):
return self.event_emitter.emit(
{ "t": "tp_", "f": "fc_" }[type] + (IS_RAW_BOOK and "raw_book" or "book") + "_snapshot",
type + "_" + (IS_RAW_BOOK and "raw_book" or "book") + "_snapshot",
subscription,
[ { "t": _trading_pair_serializer, "f": _funding_currency_serializer }[type].parse(*substream) for substream in stream[0] ]
)
return self.event_emitter.emit(
{ "t": "tp_", "f": "fc_" }[type] + (IS_RAW_BOOK and "raw_book" or "book") + "_update",
type + "_" + (IS_RAW_BOOK and "raw_book" or "book") + "_update",
subscription,
{ "t": _trading_pair_serializer, "f": _funding_currency_serializer }[type].parse(*stream[0])
)

View File

@@ -1,4 +1,4 @@
from .errors import BfxWebsocketException
from .exceptions import BfxWebsocketException
class _Serializer(object):
def __init__(self, name, labels):

22
setup.py Normal file
View File

@@ -0,0 +1,22 @@
from distutils.core import setup
setup(
name="bitfinex-api-py",
version="3.0.0",
packages=[ "bfxapi", "bfxapi.websocket" ],
url="https://github.com/bitfinexcom/bitfinex-api-py",
license="OSI Approved :: Apache Software License",
author="Bitfinex",
author_email="support@bitfinex.com",
description="Official Bitfinex Python API",
keywords="bitfinex,api,trading",
install_requires=[
"pyee~=9.0.4",
"typing_extensions~=4.4.0",
"websockets~=10.4",
],
project_urls={
"Bug Reports": "https://github.com/bitfinexcom/bitfinex-api-py/issues",
"Source": "https://github.com/bitfinexcom/bitfinex-api-py",
}
)