Apply pylint's linting rules to bfxapi/websocket/client/*.py.

This commit is contained in:
Davide Casale
2023-03-06 18:46:04 +01:00
parent 7e627dd239
commit 5c707d7929
10 changed files with 173 additions and 127 deletions

View File

@@ -2,4 +2,4 @@ from .bfx_websocket_client import BfxWebsocketClient
from .bfx_websocket_bucket import BfxWebsocketBucket
from .bfx_websocket_inputs import BfxWebsocketInputs
NAME = "client"
NAME = "client"

View File

@@ -1,10 +1,10 @@
import json, uuid, websockets
from typing import Literal, TypeVar, Callable, cast
import json, uuid, websockets
from ..handlers import PublicChannelsHandler
from ..exceptions import ConnectionNotOpen, TooManySubscriptions, OutdatedClientVersion
from ..exceptions import ConnectionNotOpen, TooManySubscriptions
_HEARTBEAT = "hb"
@@ -12,14 +12,14 @@ F = TypeVar("F", bound=Callable[..., Literal[None]])
def _require_websocket_connection(function: F) -> F:
async def wrapper(self, *args, **kwargs):
if self.websocket == None or self.websocket.open == False:
if self.websocket is None or not self.websocket.open:
raise ConnectionNotOpen("No open connection with the server.")
await function(self, *args, **kwargs)
return cast(F, wrapper)
class BfxWebsocketBucket(object):
class BfxWebsocketBucket:
VERSION = 2
MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25
@@ -27,11 +27,12 @@ class BfxWebsocketBucket(object):
def __init__(self, host, event_emitter, on_open_event):
self.host, self.event_emitter, self.on_open_event = host, event_emitter, on_open_event
self.websocket, self.subscriptions, self.pendings = None, dict(), list()
self.websocket, self.subscriptions, self.pendings = None, {}, []
self.handler = PublicChannelsHandler(event_emitter=self.event_emitter)
async def _connect(self, index):
#pylint: disable-next=unused-argument
async def connect(self, index):
reconnection = False
async for websocket in websockets.connect(self.host):
@@ -39,12 +40,12 @@ class BfxWebsocketBucket(object):
self.on_open_event.set()
if reconnection == True or (reconnection := False):
if reconnection or (reconnection := False):
for pending in self.pendings:
await self.websocket.send(json.dumps(pending))
for _, subscription in self.subscriptions.items():
await self._subscribe(**subscription)
await self.subscribe(**subscription)
self.subscriptions.clear()
@@ -52,21 +53,21 @@ class BfxWebsocketBucket(object):
async for message in websocket:
message = json.loads(message)
if isinstance(message, dict) and message["event"] == "subscribed" and (chanId := message["chanId"]):
if isinstance(message, dict) and message["event"] == "subscribed" and (chan_id := message["chanId"]):
self.pendings = [ pending for pending in self.pendings if pending["subId"] != message["subId"] ]
self.subscriptions[chanId] = message
self.subscriptions[chan_id] = message
self.event_emitter.emit("subscribed", message)
elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chanId := message["chanId"]):
elif isinstance(message, dict) and message["event"] == "unsubscribed" and (chan_id := message["chanId"]):
if message["status"] == "OK":
del self.subscriptions[chanId]
del self.subscriptions[chan_id]
elif isinstance(message, dict) and message["event"] == "error":
self.event_emitter.emit("wss-error", message["code"], message["msg"])
elif isinstance(message, list) and (chanId := message[0]) and message[1] != _HEARTBEAT:
self.handler.handle(self.subscriptions[chanId], *message[1:])
except websockets.ConnectionClosedError as error:
elif isinstance(message, list) and (chan_id := message[0]) and message[1] != _HEARTBEAT:
self.handler.handle(self.subscriptions[chan_id], *message[1:])
except websockets.ConnectionClosedError as error:
if error.code == 1006:
self.on_open_event.clear()
reconnection = True
reconnection = True
continue
raise error
@@ -74,7 +75,7 @@ class BfxWebsocketBucket(object):
break
@_require_websocket_connection
async def _subscribe(self, channel, subId=None, **kwargs):
async def subscribe(self, channel, sub_id=None, **kwargs):
if len(self.subscriptions) + len(self.pendings) == BfxWebsocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT:
raise TooManySubscriptions("The client has reached the maximum number of subscriptions.")
@@ -83,7 +84,7 @@ class BfxWebsocketBucket(object):
"event": "subscribe",
"channel": channel,
"subId": subId or str(uuid.uuid4()),
"subId": sub_id or str(uuid.uuid4()),
}
self.pendings.append(subscription)
@@ -91,17 +92,17 @@ class BfxWebsocketBucket(object):
await self.websocket.send(json.dumps(subscription))
@_require_websocket_connection
async def _unsubscribe(self, chanId):
async def unsubscribe(self, chan_id):
await self.websocket.send(json.dumps({
"event": "unsubscribe",
"chanId": chanId
"chanId": chan_id
}))
@_require_websocket_connection
async def _close(self, code=1000, reason=str()):
async def close(self, code=1000, reason=str()):
await self.websocket.close(code=code, reason=reason)
def _get_chan_id(self, subId):
def get_chan_id(self, sub_id):
for subscription in self.subscriptions.values():
if subscription["subId"] == subId:
return subscription["chanId"]
if subscription["subId"] == sub_id:
return subscription["chanId"]

View File

@@ -1,18 +1,19 @@
import traceback, json, asyncio, hmac, hashlib, time, websockets, socket, random
from typing import cast
from collections import namedtuple
from datetime import datetime
import traceback, json, asyncio, hmac, hashlib, time, socket, random, websockets
from pyee.asyncio import AsyncIOEventEmitter
from .bfx_websocket_bucket import _HEARTBEAT, F, _require_websocket_connection, BfxWebsocketBucket
from .bfx_websocket_inputs import BfxWebsocketInputs
from ..handlers import PublicChannelsHandler, AuthenticatedChannelsHandler
from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, OutdatedClientVersion
from ..exceptions import WebsocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, \
OutdatedClientVersion
from ...utils.json_encoder import JSONEncoder
@@ -20,14 +21,15 @@ from ...utils.logger import ColorLogger, FileLogger
def _require_websocket_authentication(function: F) -> F:
async def wrapper(self, *args, **kwargs):
if hasattr(self, "authentication") and self.authentication == False:
raise WebsocketAuthenticationRequired("To perform this action you need to authenticate using your API_KEY and API_SECRET.")
if hasattr(self, "authentication") and not self.authentication:
raise WebsocketAuthenticationRequired("To perform this action you need to authenticate " +
"using your API_KEY and API_SECRET.")
await _require_websocket_connection(function)(self, *args, **kwargs)
return cast(F, wrapper)
class BfxWebsocketClient(object):
class BfxWebsocketClient:
VERSION = BfxWebsocketBucket.VERSION
MAXIMUM_CONNECTIONS_AMOUNT = 20
@@ -43,16 +45,18 @@ class BfxWebsocketClient(object):
self.host, self.credentials, self.event_emitter = host, credentials, AsyncIOEventEmitter()
self.on_open_events, self.buckets, self.authentication = [], [], False
self.inputs = BfxWebsocketInputs(handle_websocket_input=self.__handle_websocket_input)
self.handler = AuthenticatedChannelsHandler(event_emitter=self.event_emitter)
if log_filename == None:
if log_filename is None:
self.logger = ColorLogger("BfxWebsocketClient", level=log_level)
else: self.logger = FileLogger("BfxWebsocketClient", level=log_level, filename=log_filename)
self.event_emitter.add_listener("error",
lambda exception: self.logger.error(f"{type(exception).__name__}: {str(exception)}" + "\n" +
self.event_emitter.add_listener("error",
lambda exception: self.logger.error(f"{type(exception).__name__}: {str(exception)}" + "\n" +
str().join(traceback.format_exception(type(exception), exception, exception.__traceback__))[:-1])
)
@@ -61,23 +65,24 @@ class BfxWebsocketClient(object):
async def start(self, connections = 5):
if connections > BfxWebsocketClient.MAXIMUM_CONNECTIONS_AMOUNT:
self.logger.warning(f"It is not safe to use more than {BfxWebsocketClient.MAXIMUM_CONNECTIONS_AMOUNT} buckets from the same " +
f"connection ({connections} in use), the server could momentarily block the client with <429 Too Many Requests>.")
self.logger.warning(f"It is not safe to use more than {BfxWebsocketClient.MAXIMUM_CONNECTIONS_AMOUNT} "
+ f"buckets from the same connection ({connections} in use), the server could momentarily block the "
+ "client with <429 Too Many Requests>.")
self.on_open_events = [ asyncio.Event() for _ in range(connections) ]
for _ in range(connections):
self.on_open_events.append(asyncio.Event())
self.buckets = [
BfxWebsocketBucket(self.host, self.event_emitter, self.on_open_events[index])
for index in range(connections)
]
for index in range(connections):
self.buckets += [BfxWebsocketBucket(self.host, self.event_emitter, self.on_open_events[index])]
tasks = [ bucket._connect(index) for index, bucket in enumerate(self.buckets) ]
tasks.append(self.__connect(self.credentials))
tasks = [ bucket.connect(index) for index, bucket in enumerate(self.buckets) ]
tasks.append(self.__connect())
await asyncio.gather(*tasks)
async def __connect(self, credentials = None):
#pylint: disable-next=too-many-statements
async def __connect(self):
Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"])
reconnection, delay = Reconnection(status=False, attempts=0, timestamp=None), None
@@ -86,10 +91,10 @@ class BfxWebsocketClient(object):
nonlocal reconnection
async with websockets.connect(self.host) as websocket:
if reconnection.status == True:
self.logger.info(f"Reconnect attempt successful (attempt no.{reconnection.attempts}): The " +
f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " +
f"(connection lost at: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).")
if reconnection.status:
self.logger.info(f"Reconnect attempt successful (attempt no.{reconnection.attempts}): The "
+ f"client has been offline for a total of {datetime.now() - reconnection.timestamp} "
+ f"(connection lost at: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).")
reconnection = Reconnection(status=False, attempts=0, timestamp=None)
@@ -106,20 +111,22 @@ class BfxWebsocketClient(object):
if isinstance(message, dict) and message["event"] == "info" and "version" in message:
if BfxWebsocketClient.VERSION != message["version"]:
raise OutdatedClientVersion(f"Mismatch between the client version and the server version. " +
f"Update the library to the latest version to continue (client version: {BfxWebsocketClient.VERSION}, " +
f"server version: {message['version']}).")
raise OutdatedClientVersion("Mismatch between the client version and the server version. "
+ "Update the library to the latest version to continue (client version: "
+ f"{BfxWebsocketClient.VERSION}, server version: {message['version']}).")
elif isinstance(message, dict) and message["event"] == "info" and message["code"] == 20051:
rcvd = websockets.frames.Close(code=1012, reason="Stop/Restart Websocket Server (please reconnect).")
raise websockets.ConnectionClosedError(rcvd=rcvd, sent=None)
elif isinstance(message, dict) and message["event"] == "auth":
if message["status"] == "OK":
self.event_emitter.emit("authenticated", message); self.authentication = True
self.event_emitter.emit("authenticated", message)
self.authentication = True
else: raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.")
elif isinstance(message, dict) and message["event"] == "error":
self.event_emitter.emit("wss-error", message["code"], message["msg"])
elif isinstance(message, list) and (chanId := message[0]) == 0 and message[1] != _HEARTBEAT:
elif isinstance(message, list) and message[0] == 0 and message[1] != _HEARTBEAT:
self.handler.handle(message[1], message[2])
class _Delay:
@@ -138,52 +145,52 @@ class BfxWebsocketClient(object):
self.__backoff_delay = min(__backoff_delay, _Delay.BACKOFF_MAX)
return backoff_delay
def peek(self):
return (self.__backoff_delay == _Delay.BACKOFF_MIN) \
and self.__initial_delay or self.__backoff_delay
while True:
if reconnection.status == True:
if reconnection.status:
await asyncio.sleep(delay.next())
try:
await _connection()
except (websockets.ConnectionClosedError, socket.gaierror) as error:
if isinstance(error, websockets.ConnectionClosedError) and (error.code == 1006 or error.code == 1012):
if isinstance(error, websockets.ConnectionClosedError) and error.code in (1006, 1012):
if error.code == 1006:
self.logger.error("Connection lost: no close frame received "
self.logger.error("Connection lost: no close frame received "
+ "or sent (1006). Attempting to reconnect...")
if error.code == 1012:
self.logger.info("WSS server is about to restart, reconnection "
+ "required (client received 20051). Attempt in progress...")
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now());
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now())
delay = _Delay(backoff_factor=1.618)
elif isinstance(error, socket.gaierror) and reconnection.status == True:
elif isinstance(error, socket.gaierror) and reconnection.status:
self.logger.warning(f"Reconnection attempt no.{reconnection.attempts} has failed. "
+ f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds."
+ f"Next reconnection attempt in ~{round(delay.peek()):.1f} seconds."
+ f"(at the moment the client has been offline for {datetime.now() - reconnection.timestamp})")
reconnection = reconnection._replace(attempts=reconnection.attempts + 1)
else: raise error
if reconnection.status == False:
if not reconnection.status:
break
async def __authenticate(self, API_KEY, API_SECRET, filters=None):
data = { "event": "auth", "filter": filters, "apiKey": API_KEY }
async def __authenticate(self, api_key, api_secret, filters=None):
data = { "event": "auth", "filter": filters, "apiKey": api_key }
data["authNonce"] = str(round(time.time() * 1_000_000))
data["authPayload"] = "AUTH" + data["authNonce"]
data["authSig"] = hmac.new(
API_SECRET.encode("utf8"),
api_secret.encode("utf8"),
data["authPayload"].encode("utf8"),
hashlib.sha384
hashlib.sha384
).hexdigest()
await self.websocket.send(json.dumps(data))
@@ -193,56 +200,58 @@ class BfxWebsocketClient(object):
index = counters.index(min(counters))
await self.buckets[index]._subscribe(channel, **kwargs)
await self.buckets[index].subscribe(channel, **kwargs)
async def unsubscribe(self, subId):
async def unsubscribe(self, sub_id):
for bucket in self.buckets:
if (chanId := bucket._get_chan_id(subId)):
await bucket._unsubscribe(chanId=chanId)
if (chan_id := bucket.get_chan_id(sub_id)):
await bucket.unsubscribe(chan_id=chan_id)
async def close(self, code=1000, reason=str()):
if self.websocket != None and self.websocket.open == True:
if self.websocket is not None and self.websocket.open:
await self.websocket.close(code=code, reason=reason)
for bucket in self.buckets:
await bucket._close(code=code, reason=reason)
await bucket.close(code=code, reason=reason)
@_require_websocket_authentication
async def notify(self, info, MESSAGE_ID=None, **kwargs):
await self.websocket.send(json.dumps([ 0, "n", MESSAGE_ID, { "type": "ucm-test", "info": info, **kwargs } ]))
async def notify(self, info, message_id=None, **kwargs):
await self.websocket.send(json.dumps([ 0, "n", message_id, { "type": "ucm-test", "info": info, **kwargs } ]))
@_require_websocket_authentication
async def __handle_websocket_input(self, input, data):
await self.websocket.send(json.dumps([ 0, input, None, data], cls=JSONEncoder))
async def __handle_websocket_input(self, event, data):
await self.websocket.send(json.dumps([ 0, event, None, data], cls=JSONEncoder))
def on(self, *events, callback = None):
for event in events:
if event not in BfxWebsocketClient.EVENTS:
raise EventNotSupported(f"Event <{event}> is not supported. To get a list of available events print BfxWebsocketClient.EVENTS")
raise EventNotSupported(f"Event <{event}> is not supported. To get a list "
+ "of available events print BfxWebsocketClient.EVENTS")
if callback != None:
if callback is not None:
for event in events:
self.event_emitter.on(event, callback)
if callback == None:
if callback is None:
def handler(function):
for event in events:
self.event_emitter.on(event, function)
return handler
return handler
def once(self, *events, callback = None):
for event in events:
if event not in BfxWebsocketClient.EVENTS:
raise EventNotSupported(f"Event <{event}> is not supported. To get a list of available events print BfxWebsocketClient.EVENTS")
raise EventNotSupported(f"Event <{event}> is not supported. To get a list "
+ "of available events print BfxWebsocketClient.EVENTS")
if callback != None:
if callback is not None:
for event in events:
self.event_emitter.once(event, callback)
if callback == None:
if callback is None:
def handler(function):
for event in events:
self.event_emitter.once(event, function)
return handler
return handler

View File

@@ -1,3 +1,5 @@
#pylint: disable=invalid-name,redefined-builtin,too-many-arguments
from decimal import Decimal
from datetime import datetime
@@ -5,56 +7,84 @@ from typing import Union, Optional, List, Tuple
from .. enums import OrderType, FundingOfferType
from ...utils.json_encoder import JSON
class BfxWebsocketInputs(object):
class BfxWebsocketInputs:
def __init__(self, handle_websocket_input):
self.handle_websocket_input = handle_websocket_input
self.__handle_websocket_input = handle_websocket_input
async def submit_order(self, type: OrderType, symbol: str, amount: Union[Decimal, float, str],
price: Optional[Union[Decimal, float, str]] = None, lev: Optional[int] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None, price_aux_limit: Optional[Union[Decimal, float, str]] = None, price_oco_stop: Optional[Union[Decimal, float, str]] = None,
gid: Optional[int] = None, cid: Optional[int] = None,
flags: Optional[int] = 0, tif: Optional[Union[datetime, str]] = None, meta: Optional[JSON] = None):
await self.handle_websocket_input("on", {
async def submit_order(self,
type: OrderType,
symbol: str,
amount: Union[Decimal, float, str],
price: Optional[Union[Decimal, float, str]] = None,
lev: Optional[int] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None,
price_oco_stop: Optional[Union[Decimal, float, str]] = None,
gid: Optional[int] = None,
cid: Optional[int] = None,
flags: Optional[int] = 0,
tif: Optional[Union[datetime, str]] = None,
meta: Optional[JSON] = None):
await self.__handle_websocket_input("on", {
"type": type, "symbol": symbol, "amount": amount,
"price": price, "lev": lev,
"price_trailing": price_trailing, "price_aux_limit": price_aux_limit, "price_oco_stop": price_oco_stop,
"gid": gid, "cid": cid,
"flags": flags, "tif": tif, "meta": meta
"price": price, "lev": lev, "price_trailing": price_trailing,
"price_aux_limit": price_aux_limit, "price_oco_stop": price_oco_stop, "gid": gid,
"cid": cid, "flags": flags, "tif": tif,
"meta": meta
})
async def update_order(self, id: int, amount: Optional[Union[Decimal, float, str]] = None, price: Optional[Union[Decimal, float, str]] = None,
cid: Optional[int] = None, cid_date: Optional[str] = None, gid: Optional[int] = None,
flags: Optional[int] = 0, lev: Optional[int] = None, delta: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None, price_trailing: Optional[Union[Decimal, float, str]] = None, tif: Optional[Union[datetime, str]] = None):
await self.handle_websocket_input("ou", {
async def update_order(self,
id: int,
amount: Optional[Union[Decimal, float, str]] = None,
price: Optional[Union[Decimal, float, str]] = None,
cid: Optional[int] = None,
cid_date: Optional[str] = None,
gid: Optional[int] = None,
flags: Optional[int] = 0,
lev: Optional[int] = None,
delta: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None,
tif: Optional[Union[datetime, str]] = None):
await self.__handle_websocket_input("ou", {
"id": id, "amount": amount, "price": price,
"cid": cid, "cid_date": cid_date, "gid": gid,
"flags": flags, "lev": lev, "delta": delta,
"price_aux_limit": price_aux_limit, "price_trailing": price_trailing, "tif": tif
})
async def cancel_order(self, id: Optional[int] = None, cid: Optional[int] = None, cid_date: Optional[str] = None):
await self.handle_websocket_input("oc", {
async def cancel_order(self,
id: Optional[int] = None,
cid: Optional[int] = None,
cid_date: Optional[str] = None):
await self.__handle_websocket_input("oc", {
"id": id, "cid": cid, "cid_date": cid_date
})
async def cancel_order_multi(self, ids: Optional[List[int]] = None, cids: Optional[List[Tuple[int, str]]] = None, gids: Optional[List[int]] = None, all: bool = False):
await self.handle_websocket_input("oc_multi", {
async def cancel_order_multi(self,
ids: Optional[List[int]] = None,
cids: Optional[List[Tuple[int, str]]] = None,
gids: Optional[List[int]] = None,
all: bool = False):
await self.__handle_websocket_input("oc_multi", {
"ids": ids, "cids": cids, "gids": gids,
"all": int(all)
})
async def submit_funding_offer(self, type: FundingOfferType, symbol: str, amount: Union[Decimal, float, str],
rate: Union[Decimal, float, str], period: int,
flags: Optional[int] = 0):
await self.handle_websocket_input("fon", {
async def submit_funding_offer(self,
type: FundingOfferType,
symbol: str,
amount: Union[Decimal, float, str],
rate: Union[Decimal, float, str],
period: int,
flags: Optional[int] = 0):
await self.__handle_websocket_input("fon", {
"type": type, "symbol": symbol, "amount": amount,
"rate": rate, "period": period,
"flags": flags
"rate": rate, "period": period, "flags": flags
})
async def cancel_funding_offer(self, id: int):
await self.handle_websocket_input("foc", { "id": id })
await self.__handle_websocket_input("foc", { "id": id })
async def calc(self, *args: str):
await self.handle_websocket_input("calc", list(map(lambda arg: [arg], args)))
await self.__handle_websocket_input("calc", list(map(lambda arg: [arg], args)))

View File

@@ -1,5 +1,3 @@
#pylint: disable=inconsistent-return-statements
from .. import serializers
from .. exceptions import HandlerNotFound