Files
bitfinex-api-py/bfxapi/websocket/_client/bfx_websocket_client.py

352 lines
12 KiB
Python

import asyncio
import json
import random
import traceback
from asyncio import Task
from datetime import datetime
from logging import Logger
from socket import gaierror
from typing import Any, Dict, List, Optional, TypedDict
import websockets
import websockets.client
from websockets.exceptions import ConnectionClosedError, InvalidStatusCode
from bfxapi._utils.json_encoder import JSONEncoder
from bfxapi.exceptions import InvalidCredentialError
from bfxapi.websocket._connection import Connection
from bfxapi.websocket._event_emitter import BfxEventEmitter
from bfxapi.websocket._handlers import AuthEventsHandler
from bfxapi.websocket.exceptions import (
ReconnectionTimeoutError,
SubIdError,
UnknownChannelError,
UnknownSubscriptionError,
VersionMismatchError,
)
from .bfx_websocket_bucket import BfxWebSocketBucket
from .bfx_websocket_inputs import BfxWebSocketInputs
_Credentials = TypedDict(
"_Credentials", {"api_key": str, "api_secret": str, "filters": Optional[List[str]]}
)
_Reconnection = TypedDict(
"_Reconnection", {"attempts": int, "reason": str, "timestamp": datetime}
)
_DEFAULT_LOGGER = Logger("bfxapi.websocket._client", level=0)
class _Delay:
__BACKOFF_MIN = 1.92
__BACKOFF_MAX = 60.0
def __init__(self, backoff_factor: float) -> None:
self.__backoff_factor = backoff_factor
self.__backoff_delay = _Delay.__BACKOFF_MIN
self.__initial_delay = random.uniform(1.0, 5.0)
def next(self) -> float:
_backoff_delay = self.peek()
__backoff_delay = _backoff_delay * self.__backoff_factor
self.__backoff_delay = min(__backoff_delay, _Delay.__BACKOFF_MAX)
return _backoff_delay
def peek(self) -> float:
return (
(self.__backoff_delay == _Delay.__BACKOFF_MIN)
and self.__initial_delay
or self.__backoff_delay
)
def reset(self) -> None:
self.__backoff_delay = _Delay.__BACKOFF_MIN
class BfxWebSocketClient(Connection):
def __init__(
self,
host: str,
*,
credentials: Optional[_Credentials] = None,
timeout: Optional[int] = 60 * 15,
logger: Logger = _DEFAULT_LOGGER,
) -> None:
super().__init__(host)
self.__credentials, self.__timeout, self.__logger = credentials, timeout, logger
self.__buckets: Dict[BfxWebSocketBucket, Optional[Task]] = {}
self.__reconnection: Optional[_Reconnection] = None
self.__event_emitter = BfxEventEmitter(loop=None)
self.__handler = AuthEventsHandler(event_emitter=self.__event_emitter)
self.__inputs = BfxWebSocketInputs(
handle_websocket_input=self.__handle_websocket_input
)
@self.__event_emitter.listens_to("error")
def error(exception: Exception) -> None:
header = f"{type(exception).__name__}: {str(exception)}"
stack_trace = traceback.format_exception(
type(exception), exception, exception.__traceback__
)
self.__logger.critical(f"{header}\n" + str().join(stack_trace)[:-1])
@property
def inputs(self) -> BfxWebSocketInputs:
return self.__inputs
def run(self) -> None:
return asyncio.get_event_loop().run_until_complete(self.start())
async def start(self) -> None:
_delay = _Delay(backoff_factor=1.618)
_sleep: Optional[Task] = None
def _on_timeout():
if not self.open:
if _sleep:
_sleep.cancel()
while True:
if self.__reconnection:
_sleep = asyncio.create_task(asyncio.sleep(int(_delay.next())))
try:
await _sleep
except asyncio.CancelledError:
raise ReconnectionTimeoutError(
"Connection has been offline for too long "
f"without being able to reconnect (timeout: {self.__timeout}s)."
) from None
try:
await self.__connect()
except (ConnectionClosedError, InvalidStatusCode, gaierror) as error:
async def _cancel(task: Task) -> None:
task.cancel()
try:
await task
except (ConnectionClosedError, InvalidStatusCode, gaierror) as _e:
nonlocal error
if type(error) is not type(_e) or error.args != _e.args:
raise _e
except asyncio.CancelledError:
pass
for bucket in self.__buckets:
if task := self.__buckets[bucket]:
self.__buckets[bucket] = None
await _cancel(task)
if isinstance(error, ConnectionClosedError) and error.code in (
1006,
1012,
):
if error.code == 1006:
self.__logger.error("Connection lost: trying to reconnect...")
if error.code == 1012:
self.__logger.warning(
"WSS server is restarting: all "
"clients need to reconnect (server sent 20051)."
)
if self.__timeout:
asyncio.get_event_loop().call_later(self.__timeout, _on_timeout)
self.__reconnection = {
"attempts": 1,
"reason": error.reason,
"timestamp": datetime.now(),
}
self._authentication = False
_delay.reset()
elif (
(isinstance(error, InvalidStatusCode) and error.status_code == 408)
or isinstance(error, gaierror)
) and self.__reconnection:
self.__logger.warning(
"Reconnection attempt unsuccessful (no."
f"{self.__reconnection['attempts']}): next attempt in "
f"~{int(_delay.peek())}.0s."
)
self.__logger.info(
f"The client has been offline for "
f"{datetime.now() - self.__reconnection['timestamp']}."
)
self.__reconnection["attempts"] += 1
else:
raise error
if not self.__reconnection:
self.__event_emitter.emit(
"disconnected",
self._websocket.close_code,
self._websocket.close_reason,
)
break
async def __connect(self) -> None:
async with websockets.client.connect(self._host) as websocket:
if self.__reconnection:
self.__logger.warning(
"Reconnection attempt successful (no."
f"{self.__reconnection['attempts']}): recovering "
"connection state..."
)
self.__reconnection = None
self._websocket = websocket
for bucket in self.__buckets:
self.__buckets[bucket] = asyncio.create_task(bucket.start())
if len(self.__buckets) == 0 or (
await asyncio.gather(*[bucket.wait() for bucket in self.__buckets])
):
self.__event_emitter.emit("open")
if self.__credentials:
authentication = Connection._get_authentication_message(
**self.__credentials
)
await self._websocket.send(authentication)
async for _message in self._websocket:
message = json.loads(_message)
if isinstance(message, dict):
if message["event"] == "info" and "version" in message:
if message["version"] != 2:
raise VersionMismatchError(
"Mismatch between the client and the server version: "
"please update bitfinex-api-py to the latest version "
f"to resolve this error (client version: 2, server "
f"version: {message['version']})."
)
elif message["event"] == "info" and message["code"] == 20051:
rcvd = websockets.frames.Close(
1012, "Stop/Restart WebSocket Server (please reconnect)."
)
raise ConnectionClosedError(rcvd=rcvd, sent=None)
elif message["event"] == "auth":
if message["status"] != "OK":
raise InvalidCredentialError(
"Can't authenticate with given API-KEY and API-SECRET."
)
self.__event_emitter.emit("authenticated", message)
self._authentication = True
if (
isinstance(message, list)
and message[0] == 0
and message[1] != Connection._HEARTBEAT
):
self.__handler.handle(message[1], message[2])
async def __new_bucket(self) -> BfxWebSocketBucket:
bucket = BfxWebSocketBucket(self._host, self.__event_emitter)
self.__buckets[bucket] = asyncio.create_task(bucket.start())
await bucket.wait()
return bucket
@Connection._require_websocket_connection
async def subscribe(
self, channel: str, sub_id: Optional[str] = None, **kwargs: Any
) -> None:
if channel not in ["ticker", "trades", "book", "candles", "status"]:
raise UnknownChannelError(
"Available channels are: ticker, trades, book, candles and status."
)
for bucket in self.__buckets:
if sub_id in bucket.ids:
raise SubIdError("sub_id must be unique for all subscriptions.")
for bucket in self.__buckets:
if not bucket.is_full:
return await bucket.subscribe(channel, sub_id, **kwargs)
bucket = await self.__new_bucket()
return await bucket.subscribe(channel, sub_id, **kwargs)
@Connection._require_websocket_connection
async def unsubscribe(self, sub_id: str) -> None:
for bucket in self.__buckets:
if bucket.has(sub_id):
if bucket.count == 1:
del self.__buckets[bucket]
return await bucket.close(code=1001, reason="Going Away")
return await bucket.unsubscribe(sub_id)
raise UnknownSubscriptionError(
f"Unable to find a subscription with sub_id <{sub_id}>."
)
@Connection._require_websocket_connection
async def resubscribe(self, sub_id: str) -> None:
for bucket in self.__buckets:
if bucket.has(sub_id):
return await bucket.resubscribe(sub_id)
raise UnknownSubscriptionError(
f"Unable to find a subscription with sub_id <{sub_id}>."
)
@Connection._require_websocket_connection
async def close(self, code: int = 1000, reason: str = "") -> None:
for bucket in self.__buckets:
await bucket.close(code=code, reason=reason)
if self._websocket.open:
await self._websocket.close(code=code, reason=reason)
@Connection._require_websocket_authentication
async def notify(
self, info: Any, message_id: Optional[int] = None, **kwargs: Any
) -> None:
await self._websocket.send(
json.dumps(
[0, "n", message_id, {"type": "ucm-test", "info": info, **kwargs}]
)
)
@Connection._require_websocket_authentication
async def __handle_websocket_input(self, event: str, data: Any) -> None:
await self._websocket.send(json.dumps([0, event, None, data], cls=JSONEncoder))
def on(self, event, callback=None):
return self.__event_emitter.on(event, callback)