Merge pull request #228 from Davi0kProgramsThings/v3.0.0b3

Merge branch `Davi0kProgramsThings:v3.0.0b3` into branch `bitfinexcom:master`.
This commit is contained in:
Vigan Abdurrahmani
2023-11-23 14:38:41 +01:00
committed by GitHub
83 changed files with 1758 additions and 1689 deletions

View File

@@ -26,10 +26,5 @@ A possible solution could be...
### Python version
<!-- Indicate your python version here -->
<!-- You can print it using `python3 --version`-->
<!-- You can print it using `python3 --version` -->
Python 3.10.6 x64
### Mypy version
<!-- Indicate your mypy version here -->
<!-- You can print it using `python3 -m mypy --version`-->
mypy 0.991 (compiled: yes)

View File

@@ -25,8 +25,5 @@ PR fixes the following issue:
- [ ] I have commented my code, particularly in hard-to-understand areas;
- [ ] I have made corresponding changes to the documentation;
- [ ] My changes generate no new warnings;
- [ ] I have added tests that prove my fix is effective or that my feature works;
- [ ] New and existing unit tests pass locally with my changes;
- [ ] Mypy returns no errors or warnings when run on the root package;
- [ ] Pylint returns a score of 10.00/10.00 when run on the root package;
- [ ] I have updated the library version and updated the CHANGELOG;

View File

@@ -27,5 +27,3 @@ jobs:
run: python -m pylint bfxapi
- name: Run mypy to check the correctness of type hinting (and fail if any error or warning is found)
run: python -m mypy bfxapi
- name: Execute project's unit tests (unittest)
run: python -m unittest bfxapi.tests

View File

@@ -3,28 +3,20 @@ py-version=3.8.0
[MESSAGES CONTROL]
disable=
multiple-imports,
missing-docstring,
logging-not-lazy,
logging-fstring-interpolation,
multiple-imports,
too-few-public-methods,
too-many-public-methods,
too-many-instance-attributes,
dangerous-default-value,
inconsistent-return-statements,
[SIMILARITIES]
min-similarity-lines=6
too-many-instance-attributes
[VARIABLES]
allowed-redefined-builtins=type,dir,id,all,format,len
allowed-redefined-builtins=all,dir,format,id,len,type
[FORMAT]
max-line-length=120
expected-line-ending-format=LF
[BASIC]
good-names=id,on,pl,t,ip,tf,A,B,C,D,E,F
good-names=f,t,id,ip,on,pl,tf,to,A,B,C,D,E,F
[TYPECHECK]
generated-members=websockets

View File

@@ -1,11 +0,0 @@
language: python
python:
- "3.8.0"
before_install:
- python -m pip install --upgrade pip
install:
- pip install -r dev-requirements.txt
script:
- python -m pylint bfxapi
- python -m mypy bfxapi
- python -m unittest bfxapi.tests

View File

@@ -97,7 +97,6 @@ _Revoke your API-KEYs and API-SECRETs immediately if you think they might have b
### Advanced features
* [Using custom notifications](#using-custom-notifications)
* [Setting up connection multiplexing](#setting-up-connection-multiplexing)
### Examples
* [Creating a new order](#creating-a-new-order)
@@ -181,10 +180,10 @@ A custom [close code number](https://www.iana.org/assignments/websocket/websocke
await bfx.wss.close(code=1001, reason="Going Away")
```
After closing the connection, the client will emit the `disconnection` event:
After closing the connection, the client will emit the `disconnected` event:
```python
@bfx.wss.on("disconnection")
def on_disconnection(code: int, reason: str):
@bfx.wss.on("disconnected")
def on_disconnected(code: int, reason: str):
if code == 1000 or code == 1001:
print("Closing the connection without errors!")
```
@@ -201,7 +200,7 @@ On each successful subscription, the client will emit the `subscribed` event:
@bfx.wss.on("subscribed")
def on_subscribed(subscription: subscriptions.Subscription):
if subscription["channel"] == "ticker":
print(f"{subscription['symbol']}: {subscription['subId']}") # tBTCUSD: f2757df2-7e11-4244-9bb7-a53b7343bef8
print(f"{subscription['symbol']}: {subscription['sub_id']}") # tBTCUSD: f2757df2-7e11-4244-9bb7-a53b7343bef8
```
### Unsubscribing from a public channel
@@ -242,11 +241,6 @@ The same can be done without using decorators:
bfx.wss.on("candles_update", callback=on_candles_update)
```
You can pass any number of events to register for the same callback function:
```python
bfx.wss.on("t_ticker_update", "f_ticker_update", callback=on_ticker_update)
```
# Advanced features
## Using custom notifications
@@ -269,27 +263,6 @@ def on_notification(notification: Notification[Any]):
print(notification.data) # { "foo": 1 }
```
## Setting up connection multiplexing
`BfxWebSocketClient::run` and `BfxWebSocketClient::start` accept a `connections` argument:
```python
bfx.wss.run(connections=3)
```
`connections` indicates the number of connections to run concurrently (through connection multiplexing).
Each of these connections can handle up to 25 subscriptions to public channels. \
So, using `N` connections will allow the client to handle at most `N * 25` subscriptions. \
You should always use the minimum number of connections necessary to handle all the subscriptions that will be made.
For example, if you know that your application will subscribe to 75 public channels, 75 / 25 = 3 connections will be enough to handle all the subscriptions.
The default number of connections is 5; therefore, if the `connections` argument is not given, the client will be able to handle a maximum of 25 * 5 = 125 subscriptions.
Keep in mind that using a large number of connections could slow down the client performance.
The use of more than 20 connections is not recommended.
# Examples
## Creating a new order
@@ -340,7 +313,6 @@ Contributors must uphold the [Contributor Covenant code of conduct](https://gith
* [Cloning the repository](#cloning-the-repository)
* [Installing the dependencies](#installing-the-dependencies)
2. [Before opening a PR](#before-opening-a-pr)
* [Running the unit tests](#running-the-unit-tests)
3. [License](#license)
## Installation and setup
@@ -376,24 +348,9 @@ Wheter you're submitting a bug fix, a new feature or a documentation change, you
All PRs must follow this [PULL_REQUEST_TEMPLATE](https://github.com/bitfinexcom/bitfinex-api-py/blob/v3-beta/.github/PULL_REQUEST_TEMPLATE.md) and include an exhaustive description.
Before opening a pull request, you should also make sure that:
- [ ] all unit tests pass (see [Running the unit tests](#running-the-unit-tests)).
- [ ] [`pylint`](https://github.com/pylint-dev/pylint) returns a score of 10.00/10.00 when run against your code.
- [ ] [`mypy`](https://github.com/python/mypy) doesn't throw any error code when run on the project (excluding notes).
### Running the unit tests
`bitfinex-api-py` comes with a set of unit tests (written using the [`unittest`](https://docs.python.org/3.8/library/unittest.html) unit testing framework). \
Contributors must ensure that each unit test passes before opening a pull request. \
You can run all project's unit tests by calling `unittest` on `bfxapi.tests`:
```console
python3 -m unittest -v bfxapi.tests
```
A single unit test can be run as follows:
```console
python3 -m unittest -v bfxapi.tests.test_notification
```
## License
```

View File

@@ -1,6 +1,6 @@
from .client import Client
from .urls import REST_HOST, PUB_REST_HOST, \
WSS_HOST, PUB_WSS_HOST
from .version import __version__
from ._client import \
Client, \
REST_HOST, \
WSS_HOST, \
PUB_REST_HOST, \
PUB_WSS_HOST

52
bfxapi/_client.py Normal file
View File

@@ -0,0 +1,52 @@
from typing import \
TYPE_CHECKING, List, Optional
from bfxapi._utils.logging import ColorLogger
from bfxapi.rest import BfxRestInterface
from bfxapi.websocket import BfxWebSocketClient
from bfxapi.exceptions import IncompleteCredentialError
if TYPE_CHECKING:
from bfxapi.websocket._client.bfx_websocket_client import \
_Credentials
REST_HOST = "https://api.bitfinex.com/v2"
WSS_HOST = "wss://api.bitfinex.com/ws/2"
PUB_REST_HOST = "https://api-pub.bitfinex.com/v2"
PUB_WSS_HOST = "wss://api-pub.bitfinex.com/ws/2"
class Client:
def __init__(
self,
api_key: Optional[str] = None,
api_secret: Optional[str] = None,
*,
rest_host: str = REST_HOST,
wss_host: str = WSS_HOST,
filters: Optional[List[str]] = None,
timeout: Optional[int] = 60 * 15,
log_filename: Optional[str] = None
) -> None:
credentials: Optional["_Credentials"] = None
if api_key and api_secret:
credentials = \
{ "api_key": api_key, "api_secret": api_secret, "filters": filters }
elif api_key:
raise IncompleteCredentialError( \
"You must provide both an API-KEY and an API-SECRET (missing API-KEY).")
elif api_secret:
raise IncompleteCredentialError( \
"You must provide both an API-KEY and an API-SECRET (missing API-SECRET).")
self.rest = BfxRestInterface(rest_host, api_key, api_secret)
logger = ColorLogger("bfxapi", level="INFO")
if log_filename:
logger.register(filename=log_filename)
self.wss = BfxWebSocketClient(wss_host, \
credentials=credentials, timeout=timeout, logger=logger)

View File

@@ -0,0 +1,13 @@
from typing import Dict, Any
import re, json
def _to_snake_case(string: str) -> str:
return re.sub(r"(?<!^)(?=[A-Z])", "_", string).lower()
def _object_hook(data: Dict[str, Any]) -> Any:
return { _to_snake_case(key): value for key, value in data.items() }
class JSONDecoder(json.JSONDecoder):
def __init__(self, *args: Any, **kwargs: Any) -> None:
super().__init__(object_hook=_object_hook, *args, **kwargs)

View File

@@ -0,0 +1,36 @@
from typing import \
Union, List, Dict, \
Any
import json
from decimal import Decimal
_ExtJSON = Union[Dict[str, "_ExtJSON"], List["_ExtJSON"], \
bool, int, float, str, Decimal, None]
_StrictJSON = Union[Dict[str, "_StrictJSON"], List["_StrictJSON"], \
int, str, None]
def _clear(dictionary: Dict[str, Any]) -> Dict[str, Any]:
return { key: value for key, value in dictionary.items() \
if value is not None }
def _adapter(data: _ExtJSON) -> _StrictJSON:
if isinstance(data, bool):
return int(data)
if isinstance(data, float):
return format(Decimal(repr(data)), "f")
if isinstance(data, Decimal):
return format(data, "f")
if isinstance(data, list):
return [ _adapter(sub_data) for sub_data in data ]
if isinstance(data, dict):
return _clear({ key: _adapter(value) for key, value in data.items() })
return data
class JSONEncoder(json.JSONEncoder):
def encode(self, o: _ExtJSON) -> str:
return super().encode(_adapter(o))

67
bfxapi/_utils/logging.py Normal file
View File

@@ -0,0 +1,67 @@
from typing import \
TYPE_CHECKING, Literal, Optional
#pylint: disable-next=wildcard-import,unused-wildcard-import
from logging import *
from copy import copy
import sys
if TYPE_CHECKING:
_Level = Literal["NOTSET", "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
_BLACK, _RED, _GREEN, _YELLOW, \
_BLUE, _MAGENTA, _CYAN, _WHITE = \
[ f"\033[0;{90 + i}m" for i in range(8) ]
_BOLD_BLACK, _BOLD_RED, _BOLD_GREEN, _BOLD_YELLOW, \
_BOLD_BLUE, _BOLD_MAGENTA, _BOLD_CYAN, _BOLD_WHITE = \
[ f"\033[1;{90 + i}m" for i in range(8) ]
_NC = "\033[0m"
class _ColorFormatter(Formatter):
__LEVELS = {
"INFO": _BLUE,
"WARNING": _YELLOW,
"ERROR": _RED,
"CRITICAL": _BOLD_RED,
"DEBUG": _BOLD_WHITE
}
def format(self, record: LogRecord) -> str:
_record = copy(record)
_record.name = _MAGENTA + record.name + _NC
_record.levelname = _ColorFormatter.__format_level(record.levelname)
return super().format(_record)
#pylint: disable-next=invalid-name
def formatTime(self, record: LogRecord, datefmt: Optional[str] = None) -> str:
return _GREEN + super().formatTime(record, datefmt) + _NC
@staticmethod
def __format_level(level: str) -> str:
return _ColorFormatter.__LEVELS[level] + level + _NC
_FORMAT = "%(asctime)s %(name)s %(levelname)s %(message)s"
_DATE_FORMAT = "%d-%m-%Y %H:%M:%S"
class ColorLogger(Logger):
__FORMATTER = Formatter(_FORMAT,_DATE_FORMAT)
def __init__(self, name: str, level: "_Level" = "NOTSET") -> None:
super().__init__(name, level)
formatter = _ColorFormatter(_FORMAT, _DATE_FORMAT)
handler = StreamHandler(stream=sys.stderr)
handler.setFormatter(fmt=formatter)
self.addHandler(hdlr=handler)
def register(self, filename: str) -> None:
handler = FileHandler(filename=filename)
handler.setFormatter(fmt=ColorLogger.__FORMATTER)
self.addHandler(hdlr=handler)

1
bfxapi/_version.py Normal file
View File

@@ -0,0 +1 @@
__version__ = "3.0.0b3"

View File

@@ -1,37 +0,0 @@
from typing import List, Literal, Optional
from .rest import BfxRestInterface
from .websocket import BfxWebSocketClient
from .urls import REST_HOST, WSS_HOST
class Client:
def __init__(
self,
api_key: Optional[str] = None,
api_secret: Optional[str] = None,
filters: Optional[List[str]] = None,
*,
rest_host: str = REST_HOST,
wss_host: str = WSS_HOST,
wss_timeout: Optional[float] = 60 * 15,
log_filename: Optional[str] = None,
log_level: Literal["ERROR", "WARNING", "INFO", "DEBUG"] = "INFO"
):
credentials = None
if api_key and api_secret:
credentials = { "api_key": api_key, "api_secret": api_secret, "filters": filters }
self.rest = BfxRestInterface(
host=rest_host,
credentials=credentials
)
self.wss = BfxWebSocketClient(
host=wss_host,
credentials=credentials,
wss_timeout=wss_timeout,
log_filename=log_filename,
log_level=log_level
)

View File

@@ -1,50 +0,0 @@
from enum import Enum
class OrderType(str, Enum):
LIMIT = "LIMIT"
EXCHANGE_LIMIT = "EXCHANGE LIMIT"
MARKET = "MARKET"
EXCHANGE_MARKET = "EXCHANGE MARKET"
STOP = "STOP"
EXCHANGE_STOP = "EXCHANGE STOP"
STOP_LIMIT = "STOP LIMIT"
EXCHANGE_STOP_LIMIT = "EXCHANGE STOP LIMIT"
TRAILING_STOP = "TRAILING STOP"
EXCHANGE_TRAILING_STOP = "EXCHANGE TRAILING STOP"
FOK = "FOK"
EXCHANGE_FOK = "EXCHANGE FOK"
IOC = "IOC"
EXCHANGE_IOC = "EXCHANGE IOC"
class FundingOfferType(str, Enum):
LIMIT = "LIMIT"
FRR_DELTA_FIX = "FRRDELTAFIX"
FRR_DELTA_VAR = "FRRDELTAVAR"
class Flag(int, Enum):
HIDDEN = 64
CLOSE = 512
REDUCE_ONLY = 1024
POST_ONLY = 4096
OCO = 16384
NO_VAR_RATES = 524288
class Error(int, Enum):
ERR_UNK = 10000
ERR_GENERIC = 10001
ERR_CONCURRENCY = 10008
ERR_PARAMS = 10020
ERR_CONF_FAIL = 10050
ERR_AUTH_FAIL = 10100
ERR_AUTH_PAYLOAD = 10111
ERR_AUTH_SIG = 10112
ERR_AUTH_HMAC = 10113
ERR_AUTH_NONCE = 10114
ERR_UNAUTH_FAIL = 10200
ERR_SUB_FAIL = 10300
ERR_SUB_MULTI = 10301
ERR_SUB_UNK = 10302
ERR_SUB_LIMIT = 10305
ERR_UNSUB_FAIL = 10400
ERR_UNSUB_NOT = 10401
ERR_READY = 11000

View File

@@ -1,8 +1,10 @@
__all__ = [
"BfxBaseException",
]
class BfxBaseException(Exception):
"""
Base class for every custom exception in bfxapi/rest/exceptions.py and bfxapi/websocket/exceptions.py.
Base class for every custom exception thrown by bitfinex-api-py.
"""
class IncompleteCredentialError(BfxBaseException):
pass
class InvalidCredentialError(BfxBaseException):
pass

View File

@@ -1,2 +1,2 @@
from .endpoints import BfxRestInterface, RestPublicEndpoints, RestAuthenticatedEndpoints, \
from .endpoints import BfxRestInterface, RestPublicEndpoints, RestAuthEndpoints, \
RestMerchantEndpoints

View File

@@ -1,5 +1,5 @@
from .bfx_rest_interface import BfxRestInterface
from .rest_public_endpoints import RestPublicEndpoints
from .rest_authenticated_endpoints import RestAuthenticatedEndpoints
from .rest_auth_endpoints import RestAuthEndpoints
from .rest_merchant_endpoints import RestMerchantEndpoints

View File

@@ -1,13 +1,11 @@
from .rest_public_endpoints import RestPublicEndpoints
from .rest_authenticated_endpoints import RestAuthenticatedEndpoints
from .rest_auth_endpoints import RestAuthEndpoints
from .rest_merchant_endpoints import RestMerchantEndpoints
class BfxRestInterface:
VERSION = 2
def __init__(self, host, credentials = None):
api_key, api_secret = (credentials['api_key'], credentials['api_secret']) if credentials else (None, None)
def __init__(self, host, api_key = None, api_secret = None):
self.public = RestPublicEndpoints(host=host)
self.auth = RestAuthenticatedEndpoints(host=host, api_key=api_key, api_secret=api_secret)
self.auth = RestAuthEndpoints(host=host, api_key=api_key, api_secret=api_secret)
self.merchant = RestMerchantEndpoints(host=host, api_key=api_key, api_secret=api_secret)

View File

@@ -1,12 +1,10 @@
from typing import Dict, List, Tuple, Union, Literal, Optional
from decimal import Decimal
from datetime import datetime
from ..middleware import Middleware
from ..enums import Sort, OrderType, FundingOfferType
from ...types import JSON, Notification, \
from ...types import Notification, \
UserInfo, LoginHistory, BalanceAvailable, \
Order, Position, Trade, \
FundingTrade, OrderTrade, Ledger, \
@@ -22,7 +20,8 @@ from ...types import serializers
from ...types.serializers import _Notification
class RestAuthenticatedEndpoints(Middleware):
#pylint: disable-next=too-many-public-methods
class RestAuthEndpoints(Middleware):
def get_user_info(self) -> UserInfo:
return serializers.UserInfo \
.parse(*self._post("auth/r/info/user"))
@@ -62,26 +61,24 @@ class RestAuthenticatedEndpoints(Middleware):
for sub_data in self._post(endpoint, body={ "id": ids }) ]
def submit_order(self,
type: OrderType,
type: str,
symbol: str,
amount: Union[Decimal, float, str],
amount: Union[str, float, Decimal],
price: Union[str, float, Decimal],
*,
price: Optional[Union[Decimal, float, str]] = None,
lev: Optional[int] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None,
price_oco_stop: Optional[Union[Decimal, float, str]] = None,
price_trailing: Optional[Union[str, float, Decimal]] = None,
price_aux_limit: Optional[Union[str, float, Decimal]] = None,
price_oco_stop: Optional[Union[str, float, Decimal]] = None,
gid: Optional[int] = None,
cid: Optional[int] = None,
flags: Optional[int] = 0,
tif: Optional[Union[datetime, str]] = None,
meta: Optional[JSON] = None) -> Notification[Order]:
flags: Optional[int] = None,
tif: Optional[str] = None) -> Notification[Order]:
body = {
"type": type, "symbol": symbol, "amount": amount,
"price": price, "lev": lev, "price_trailing": price_trailing,
"price_aux_limit": price_aux_limit, "price_oco_stop": price_oco_stop, "gid": gid,
"cid": cid, "flags": flags, "tif": tif,
"meta": meta
"cid": cid, "flags": flags, "tif": tif
}
return _Notification[Order](serializers.Order) \
@@ -90,17 +87,17 @@ class RestAuthenticatedEndpoints(Middleware):
def update_order(self,
id: int,
*,
amount: Optional[Union[Decimal, float, str]] = None,
price: Optional[Union[Decimal, float, str]] = None,
amount: Optional[Union[str, float, Decimal]] = None,
price: Optional[Union[str, float, Decimal]] = None,
cid: Optional[int] = None,
cid_date: Optional[str] = None,
gid: Optional[int] = None,
flags: Optional[int] = 0,
flags: Optional[int] = None,
lev: Optional[int] = None,
delta: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None,
tif: Optional[Union[datetime, str]] = None) -> Notification[Order]:
delta: Optional[Union[str, float, Decimal]] = None,
price_aux_limit: Optional[Union[str, float, Decimal]] = None,
price_trailing: Optional[Union[str, float, Decimal]] = None,
tif: Optional[str] = None) -> Notification[Order]:
body = {
"id": id, "amount": amount, "price": price,
"cid": cid, "cid_date": cid_date, "gid": gid,
@@ -122,13 +119,13 @@ class RestAuthenticatedEndpoints(Middleware):
def cancel_order_multi(self,
*,
ids: Optional[List[int]] = None,
cids: Optional[List[Tuple[int, str]]] = None,
gids: Optional[List[int]] = None,
all: bool = False) -> Notification[List[Order]]:
id: Optional[List[int]] = None,
cid: Optional[List[Tuple[int, str]]] = None,
gid: Optional[List[int]] = None,
all: Optional[bool] = None) -> Notification[List[Order]]:
body = {
"ids": ids, "cids": cids, "gids": gids,
"all": int(all)
"id": id, "cid": cid, "gid": gid,
"all": all
}
return _Notification[List[Order]](serializers.Order, is_iterable=True) \
@@ -162,7 +159,7 @@ class RestAuthenticatedEndpoints(Middleware):
def get_trades_history(self,
*,
symbol: Optional[str] = None,
sort: Optional[Sort] = None,
sort: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Trade]:
@@ -212,21 +209,21 @@ class RestAuthenticatedEndpoints(Middleware):
def claim_position(self,
id: int,
*,
amount: Optional[Union[Decimal, float, str]] = None) -> Notification[PositionClaim]:
amount: Optional[Union[str, float, Decimal]] = None) -> Notification[PositionClaim]:
return _Notification[PositionClaim](serializers.PositionClaim) \
.parse(*self._post("auth/w/position/claim", \
body={ "id": id, "amount": amount }))
def increase_position(self,
symbol: str,
amount: Union[Decimal, float, str]) -> Notification[PositionIncrease]:
amount: Union[str, float, Decimal]) -> Notification[PositionIncrease]:
return _Notification[PositionIncrease](serializers.PositionIncrease) \
.parse(*self._post("auth/w/position/increase", \
body={ "symbol": symbol, "amount": amount }))
def get_increase_position_info(self,
symbol: str,
amount: Union[Decimal, float, str]) -> PositionIncreaseInfo:
amount: Union[str, float, Decimal]) -> PositionIncreaseInfo:
return serializers.PositionIncreaseInfo \
.parse(*self._post("auth/r/position/increase/info", \
body={ "symbol": symbol, "amount": amount }))
@@ -265,7 +262,7 @@ class RestAuthenticatedEndpoints(Middleware):
def set_derivative_position_collateral(self,
symbol: str,
collateral: Union[Decimal, float, str]) -> DerivativePositionCollateral:
collateral: Union[str, float, Decimal]) -> DerivativePositionCollateral:
return serializers.DerivativePositionCollateral \
.parse(*(self._post("auth/w/deriv/collateral/set", \
body={ "symbol": symbol, "collateral": collateral })[0]))
@@ -284,13 +281,13 @@ class RestAuthenticatedEndpoints(Middleware):
#pylint: disable-next=too-many-arguments
def submit_funding_offer(self,
type: FundingOfferType,
type: str,
symbol: str,
amount: Union[Decimal, float, str],
rate: Union[Decimal, float, str],
amount: Union[str, float, Decimal],
rate: Union[str, float, Decimal],
period: int,
*,
flags: Optional[int] = 0) -> Notification[FundingOffer]:
flags: Optional[int] = None) -> Notification[FundingOffer]:
body = {
"type": type, "symbol": symbol, "amount": amount,
"rate": rate, "period": period, "flags": flags
@@ -319,7 +316,7 @@ class RestAuthenticatedEndpoints(Middleware):
rate: Optional[int] = None,
period: Optional[int] = None) -> Notification[FundingAutoRenew]:
body = {
"status": int(status), "currency": currency, "amount": amount,
"status": status, "currency": currency, "amount": amount,
"rate": rate, "period": period
}
@@ -396,7 +393,7 @@ class RestAuthenticatedEndpoints(Middleware):
def get_funding_trades_history(self,
*,
symbol: Optional[str] = None,
sort: Optional[Sort] = None,
sort: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[FundingTrade]:
@@ -421,7 +418,7 @@ class RestAuthenticatedEndpoints(Middleware):
to_wallet: str,
currency: str,
currency_to: str,
amount: Union[Decimal, float, str]) -> Notification[Transfer]:
amount: Union[str, float, Decimal]) -> Notification[Transfer]:
body = {
"from": from_wallet, "to": to_wallet, "currency": currency,
"currency_to": currency_to, "amount": amount
@@ -434,7 +431,7 @@ class RestAuthenticatedEndpoints(Middleware):
wallet: str,
method: str,
address: str,
amount: Union[Decimal, float, str]) -> Notification[Withdrawal]:
amount: Union[str, float, Decimal]) -> Notification[Withdrawal]:
body = {
"wallet": wallet, "method": method, "address": address,
"amount": amount
@@ -446,15 +443,15 @@ class RestAuthenticatedEndpoints(Middleware):
def get_deposit_address(self,
wallet: str,
method: str,
renew: bool = False) -> Notification[DepositAddress]:
op_renew: bool = False) -> Notification[DepositAddress]:
return _Notification[DepositAddress](serializers.DepositAddress) \
.parse(*self._post("auth/w/deposit/address", \
body={ "wallet": wallet, "method": method, "renew": int(renew) }))
body={ "wallet": wallet, "method": method, "op_renew": op_renew }))
def generate_deposit_invoice(self,
wallet: str,
currency: str,
amount: Union[Decimal, float, str]) -> LightningNetworkInvoice:
amount: Union[str, float, Decimal]) -> LightningNetworkInvoice:
return serializers.LightningNetworkInvoice \
.parse(*self._post("auth/w/deposit/invoice", \
body={ "wallet": wallet, "currency": currency, "amount": amount }))

View File

@@ -1,52 +1,36 @@
import re
from typing import Callable, TypeVar, cast, \
TypedDict, Dict, List, Union, Literal, Optional, Any
from typing import \
TypedDict, Dict, List, \
Union, Literal, Optional, \
Any
from decimal import Decimal
from ..middleware import Middleware
from bfxapi.rest.middleware import Middleware
from ..enums import MerchantSettingsKey
from ...types import \
InvoiceSubmission, InvoicePage, InvoiceStats, \
CurrencyConversion, MerchantDeposit, MerchantUnlinkedDeposit
#region Defining methods to convert dictionary keys to snake_case and camelCase.
T = TypeVar("T")
_to_snake_case: Callable[[str], str] = lambda string: re.sub(r"(?<!^)(?=[A-Z])", "_", string).lower()
_to_camel_case: Callable[[str], str] = lambda string: \
(components := string.split("_"))[0] + str().join(c.title() for c in components[1:])
def _scheme(data: T, adapter: Callable[[str], str]) -> T:
if isinstance(data, list):
return cast(T, [ _scheme(sub_data, adapter) for sub_data in data ])
if isinstance(data, dict):
return cast(T, { adapter(key): _scheme(value, adapter) for key, value in data.items() })
return data
def _to_snake_case_keys(dictionary: T) -> T:
return _scheme(dictionary, _to_snake_case)
def _to_camel_case_keys(dictionary: T) -> T:
return _scheme(dictionary, _to_camel_case)
#endregion
from bfxapi.types import \
InvoiceSubmission, \
InvoicePage, \
InvoiceStats, \
CurrencyConversion, \
MerchantDeposit, \
MerchantUnlinkedDeposit
_CustomerInfo = TypedDict("_CustomerInfo", {
"nationality": str, "resid_country": str, "resid_city": str,
"resid_zip_code": str, "resid_street": str, "resid_building_no": str,
"full_name": str, "email": str, "tos_accepted": bool
"nationality": str,
"resid_country": str,
"resid_city": str,
"resid_zip_code": str,
"resid_street": str,
"resid_building_no": str,
"full_name": str,
"email": str,
"tos_accepted": bool
})
class RestMerchantEndpoints(Middleware):
#pylint: disable-next=too-many-arguments
def submit_invoice(self,
amount: Union[Decimal, float, str],
amount: Union[str, float, Decimal],
currency: str,
order_id: str,
customer_info: _CustomerInfo,
@@ -55,13 +39,13 @@ class RestMerchantEndpoints(Middleware):
duration: Optional[int] = None,
webhook: Optional[str] = None,
redirect_url: Optional[str] = None) -> InvoiceSubmission:
body = _to_camel_case_keys({
"amount": amount, "currency": currency, "order_id": order_id,
"customer_info": customer_info, "pay_currencies": pay_currencies, "duration": duration,
"webhook": webhook, "redirect_url": redirect_url
})
body = {
"amount": amount, "currency": currency, "orderId": order_id,
"customerInfo": customer_info, "payCurrencies": pay_currencies, "duration": duration,
"webhook": webhook, "redirectUrl": redirect_url
}
data = _to_snake_case_keys(self._post("auth/w/ext/pay/invoice/create", body=body))
data = self._post("auth/w/ext/pay/invoice/create", body=body)
return InvoiceSubmission.parse(data)
@@ -76,9 +60,9 @@ class RestMerchantEndpoints(Middleware):
"limit": limit
}
response = self._post("auth/r/ext/pay/invoices", body=body)
data = self._post("auth/r/ext/pay/invoices", body=body)
return [ InvoiceSubmission.parse(sub_data) for sub_data in _to_snake_case_keys(response) ]
return [ InvoiceSubmission.parse(sub_data) for sub_data in data ]
def get_invoices_paginated(self,
page: int = 1,
@@ -91,13 +75,13 @@ class RestMerchantEndpoints(Middleware):
crypto: Optional[List[str]] = None,
id: Optional[str] = None,
order_id: Optional[str] = None) -> InvoicePage:
body = _to_camel_case_keys({
"page": page, "page_size": page_size, "sort": sort,
"sort_field": sort_field, "status": status, "fiat": fiat,
"crypto": crypto, "id": id, "order_id": order_id
})
body = {
"page": page, "pageSize": page_size, "sort": sort,
"sortField": sort_field, "status": status, "fiat": fiat,
"crypto": crypto, "id": id, "orderId": order_id
}
data = _to_snake_case_keys(self._post("auth/r/ext/pay/invoices/paginated", body=body))
data = self._post("auth/r/ext/pay/invoices/paginated", body=body)
return InvoicePage.parse(data)
@@ -105,13 +89,15 @@ class RestMerchantEndpoints(Middleware):
status: Literal["CREATED", "PENDING", "COMPLETED", "EXPIRED"],
format: str) -> List[InvoiceStats]:
return [ InvoiceStats(**sub_data) for sub_data in \
self._post("auth/r/ext/pay/invoice/stats/count", body={ "status": status, "format": format }) ]
self._post("auth/r/ext/pay/invoice/stats/count", \
body={ "status": status, "format": format }) ]
def get_invoice_earning_stats(self,
currency: str,
format: str) -> List[InvoiceStats]:
return [ InvoiceStats(**sub_data) for sub_data in \
self._post("auth/r/ext/pay/invoice/stats/earning", body={ "currency": currency, "format": format }) ]
self._post("auth/r/ext/pay/invoice/stats/earning", \
body={ "currency": currency, "format": format }) ]
def complete_invoice(self,
id: str,
@@ -119,67 +105,75 @@ class RestMerchantEndpoints(Middleware):
*,
deposit_id: Optional[int] = None,
ledger_id: Optional[int] = None) -> InvoiceSubmission:
return InvoiceSubmission.parse(_to_snake_case_keys(self._post("auth/w/ext/pay/invoice/complete", body={
body = {
"id": id, "payCcy": pay_currency, "depositId": deposit_id,
"ledgerId": ledger_id
})))
}
data = self._post("auth/w/ext/pay/invoice/complete", body=body)
return InvoiceSubmission.parse(data)
def expire_invoice(self, id: str) -> InvoiceSubmission:
body = { "id": id }
response = self._post("auth/w/ext/pay/invoice/expire", body=body)
return InvoiceSubmission.parse(_to_snake_case_keys(response))
data = self._post("auth/w/ext/pay/invoice/expire", body=body)
return InvoiceSubmission.parse(data)
def get_currency_conversion_list(self) -> List[CurrencyConversion]:
return [
CurrencyConversion(
base_currency=sub_data["baseCcy"],
convert_currency=sub_data["convertCcy"],
created=sub_data["created"]
) for sub_data in self._post("auth/r/ext/pay/settings/convert/list")
]
return [ CurrencyConversion(**sub_data) \
for sub_data in self._post("auth/r/ext/pay/settings/convert/list") ]
def add_currency_conversion(self,
base_currency: str,
convert_currency: str) -> bool:
return bool(self._post("auth/w/ext/pay/settings/convert/create", body={
"baseCcy": base_currency,
"convertCcy": convert_currency
}))
base_ccy: str,
convert_ccy: str) -> bool:
return bool(self._post("auth/w/ext/pay/settings/convert/create", \
body={ "baseCcy": base_ccy, "convertCcy": convert_ccy }))
def remove_currency_conversion(self,
base_currency: str,
convert_currency: str) -> bool:
return bool(self._post("auth/w/ext/pay/settings/convert/remove", body={
"baseCcy": base_currency,
"convertCcy": convert_currency
}))
base_ccy: str,
convert_ccy: str) -> bool:
return bool(self._post("auth/w/ext/pay/settings/convert/remove", \
body={ "baseCcy": base_ccy, "convertCcy": convert_ccy }))
def set_merchant_settings(self,
key: MerchantSettingsKey,
key: str,
val: Any) -> bool:
return bool(self._post("auth/w/ext/pay/settings/set", body={ "key": key, "val": val }))
return bool(self._post("auth/w/ext/pay/settings/set", \
body={ "key": key, "val": val }))
def get_merchant_settings(self, key: MerchantSettingsKey) -> Any:
def get_merchant_settings(self, key: str) -> Any:
return self._post("auth/r/ext/pay/settings/get", body={ "key": key })
def list_merchant_settings(self, keys: List[MerchantSettingsKey] = []) -> Dict[MerchantSettingsKey, Any]:
#pylint: disable-next=dangerous-default-value
def list_merchant_settings(self, keys: List[str] = []) -> Dict[str, Any]:
return self._post("auth/r/ext/pay/settings/list", body={ "keys": keys })
def get_deposits(self,
start: int,
end: int,
to: int,
*,
ccy: Optional[str] = None,
unlinked: Optional[bool] = None) -> List[MerchantDeposit]:
body = { "from": start, "to": end, "ccy": ccy, "unlinked": unlinked }
response = self._post("auth/r/ext/pay/deposits", body=body)
return [ MerchantDeposit(**sub_data) for sub_data in _to_snake_case_keys(response) ]
body = {
"from": start, "to": to, "ccy": ccy,
"unlinked": unlinked
}
data = self._post("auth/r/ext/pay/deposits", body=body)
return [ MerchantDeposit(**sub_data) for sub_data in data ]
def get_unlinked_deposits(self,
ccy: str,
*,
start: Optional[int] = None,
end: Optional[int] = None) -> List[MerchantUnlinkedDeposit]:
body = { "ccy": ccy, "start": start, "end": end }
response = self._post("/auth/r/ext/pay/deposits/unlinked", body=body)
return [ MerchantUnlinkedDeposit(**sub_data) for sub_data in _to_snake_case_keys(response) ]
body = {
"ccy": ccy, "start": start, "end": end
}
data = self._post("/auth/r/ext/pay/deposits/unlinked", body=body)
return [ MerchantUnlinkedDeposit(**sub_data) for sub_data in data ]

View File

@@ -4,8 +4,6 @@ from decimal import Decimal
from ..middleware import Middleware
from ..enums import Config, Sort
from ...types import \
PlatformStatus, TradingPairTicker, FundingCurrencyTicker, \
TickersHistory, TradingPairTrade, FundingCurrencyTrade, \
@@ -17,8 +15,9 @@ from ...types import \
from ...types import serializers
#pylint: disable-next=too-many-public-methods
class RestPublicEndpoints(Middleware):
def conf(self, config: Config) -> Any:
def conf(self, config: str) -> Any:
return self._get(f"conf/{config}")[0]
def get_platform_status(self) -> PlatformStatus:
@@ -83,7 +82,7 @@ class RestPublicEndpoints(Middleware):
limit: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
sort: Optional[Sort] = None) -> List[TradingPairTrade]:
sort: Optional[int] = None) -> List[TradingPairTrade]:
params = { "limit": limit, "start": start, "end": end, "sort": sort }
data = self._get(f"trades/{pair}/hist", params=params)
return [ serializers.TradingPairTrade.parse(*sub_data) for sub_data in data ]
@@ -94,7 +93,7 @@ class RestPublicEndpoints(Middleware):
limit: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
sort: Optional[Sort] = None) -> List[FundingCurrencyTrade]:
sort: Optional[int] = None) -> List[FundingCurrencyTrade]:
params = { "limit": limit, "start": start, "end": end, "sort": sort }
data = self._get(f"trades/{currency}/hist", params=params)
return [ serializers.FundingCurrencyTrade.parse(*sub_data) for sub_data in data ]
@@ -132,7 +131,7 @@ class RestPublicEndpoints(Middleware):
def get_stats_hist(self,
resource: str,
*,
sort: Optional[Sort] = None,
sort: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Statistic]:
@@ -143,7 +142,7 @@ class RestPublicEndpoints(Middleware):
def get_stats_last(self,
resource: str,
*,
sort: Optional[Sort] = None,
sort: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> Statistic:
@@ -155,7 +154,7 @@ class RestPublicEndpoints(Middleware):
symbol: str,
tf: str = "1m",
*,
sort: Optional[Sort] = None,
sort: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Candle]:
@@ -167,7 +166,7 @@ class RestPublicEndpoints(Middleware):
symbol: str,
tf: str = "1m",
*,
sort: Optional[Sort] = None,
sort: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> Candle:
@@ -191,7 +190,7 @@ class RestPublicEndpoints(Middleware):
def get_derivatives_status_history(self,
key: str,
*,
sort: Optional[Sort] = None,
sort: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[DerivativesStatus]:
@@ -201,7 +200,7 @@ class RestPublicEndpoints(Middleware):
def get_liquidations(self,
*,
sort: Optional[Sort] = None,
sort: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Liquidation]:
@@ -213,7 +212,7 @@ class RestPublicEndpoints(Middleware):
symbol: str,
tf: str = "1m",
*,
sort: Optional[Sort] = None,
sort: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Candle]:
@@ -224,7 +223,7 @@ class RestPublicEndpoints(Middleware):
def get_leaderboards_hist(self,
resource: str,
*,
sort: Optional[Sort] = None,
sort: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> List[Leaderboard]:
@@ -235,7 +234,7 @@ class RestPublicEndpoints(Middleware):
def get_leaderboards_last(self,
resource: str,
*,
sort: Optional[Sort] = None,
sort: Optional[int] = None,
start: Optional[str] = None,
end: Optional[str] = None,
limit: Optional[int] = None) -> Leaderboard:
@@ -262,18 +261,18 @@ class RestPublicEndpoints(Middleware):
limit: Optional[int] = None) -> List[PulseMessage]:
messages = []
for subdata in self._get("pulse/hist", params={ "end": end, "limit": limit }):
subdata[18] = subdata[18][0]
message = serializers.PulseMessage.parse(*subdata)
for sub_data in self._get("pulse/hist", params={ "end": end, "limit": limit }):
sub_data[18] = sub_data[18][0]
message = serializers.PulseMessage.parse(*sub_data)
messages.append(message)
return messages
def get_trading_market_average_price(self,
symbol: str,
amount: Union[Decimal, float, str],
amount: Union[str, float, Decimal],
*,
price_limit: Optional[Union[Decimal, float, str]] = None
price_limit: Optional[Union[str, float, Decimal]] = None
) -> TradingMarketAveragePrice:
return serializers.TradingMarketAveragePrice.parse(*self._post("calc/trade/avg", body={
"symbol": symbol, "amount": amount, "price_limit": price_limit
@@ -281,10 +280,10 @@ class RestPublicEndpoints(Middleware):
def get_funding_market_average_price(self,
symbol: str,
amount: Union[Decimal, float, str],
amount: Union[str, float, Decimal],
period: int,
*,
rate_limit: Optional[Union[Decimal, float, str]] = None
rate_limit: Optional[Union[str, float, Decimal]] = None
) -> FundingMarketAveragePrice:
return serializers.FundingMarketAveragePrice.parse(*self._post("calc/trade/avg", body={
"symbol": symbol, "amount": amount, "period": period, "rate_limit": rate_limit

View File

@@ -1,47 +0,0 @@
#pylint: disable-next=wildcard-import,unused-wildcard-import
from ..enums import *
class Config(str, Enum):
MAP_CURRENCY_SYM = "pub:map:currency:sym"
MAP_CURRENCY_LABEL = "pub:map:currency:label"
MAP_CURRENCY_UNIT = "pub:map:currency:unit"
MAP_CURRENCY_UNDL = "pub:map:currency:undl"
MAP_CURRENCY_POOL = "pub:map:currency:pool"
MAP_CURRENCY_EXPLORER = "pub:map:currency:explorer"
MAP_CURRENCY_TX_FEE = "pub:map:currency:tx:fee"
MAP_TX_METHOD = "pub:map:tx:method"
LIST_PAIR_EXCHANGE = "pub:list:pair:exchange"
LIST_PAIR_MARGIN = "pub:list:pair:margin"
LIST_PAIR_FUTURES = "pub:list:pair:futures"
LIST_PAIR_SECURITIES = "pub:list:pair:securities"
LIST_CURRENCY = "pub:list:currency"
LIST_COMPETITIONS = "pub:list:competitions"
INFO_PAIR = "pub:info:pair"
INFO_PAIR_FUTURES = "pub:info:pair:futures"
INFO_TX_STATUS = "pub:info:tx:status"
SPEC_MARGIN = "pub:spec:margin"
FEES = "pub:fees"
class Precision(str, Enum):
P0 = "P0"
P1 = "P1"
P2 = "P2"
P3 = "P3"
P4 = "P4"
class Sort(int, Enum):
ASCENDING = +1
DESCENDING = -1
class MerchantSettingsKey(str, Enum):
PREFERRED_FIAT = "bfx_pay_preferred_fiat"
RECOMMEND_STORE = "bfx_pay_recommend_store"
NOTIFY_PAYMENT_COMPLETED = "bfx_pay_notify_payment_completed"
NOTIFY_PAYMENT_COMPLETED_EMAIL = "bfx_pay_notify_payment_completed_email"
NOTIFY_AUTOCONVERT_EXECUTED = "bfx_pay_notify_autoconvert_executed"
DUST_BALANCE_UI = "bfx_pay_dust_balance_ui"
MERCHANT_CUSTOMER_SUPPORT_URL = "bfx_pay_merchant_customer_support_url"
MERCHANT_UNDERPAID_THRESHOLD = "bfx_pay_merchant_underpaid_threshold"

View File

@@ -1,35 +1,10 @@
from ..exceptions import BfxBaseException
from bfxapi.exceptions import BfxBaseException
__all__ = [
"BfxRestException",
class NotFoundError(BfxBaseException):
pass
"ResourceNotFound",
"RequestParametersError",
"ResourceNotFound",
"InvalidAuthenticationCredentials"
]
class RequestParametersError(BfxBaseException):
pass
class BfxRestException(BfxBaseException):
"""
Base class for all custom exceptions in bfxapi/rest/exceptions.py.
"""
class ResourceNotFound(BfxRestException):
"""
This error indicates a failed HTTP request to a non-existent resource.
"""
class RequestParametersError(BfxRestException):
"""
This error indicates that there are some invalid parameters sent along with an HTTP request.
"""
class InvalidAuthenticationCredentials(BfxRestException):
"""
This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication.
"""
class UnknownGenericError(BfxRestException):
"""
This error indicates an undefined problem processing an HTTP request sent to the APIs.
"""
class UnknownGenericError(BfxBaseException):
pass

View File

@@ -1,16 +1,26 @@
from typing import TYPE_CHECKING, Optional, Any
from enum import IntEnum
from http import HTTPStatus
import time, hmac, hashlib, json, requests
from ..enums import Error
from ..exceptions import ResourceNotFound, RequestParametersError, InvalidAuthenticationCredentials, UnknownGenericError
from ...utils.json_encoder import JSONEncoder
from ..exceptions import NotFoundError, RequestParametersError, UnknownGenericError
from ...exceptions import InvalidCredentialError
from ..._utils.json_encoder import JSONEncoder
from ..._utils.json_decoder import JSONDecoder
if TYPE_CHECKING:
from requests.sessions import _Params
class _Error(IntEnum):
ERR_UNK = 10000
ERR_GENERIC = 10001
ERR_PARAMS = 10020
ERR_AUTH_FAIL = 10100
class Middleware:
TIMEOUT = 30
@@ -47,16 +57,16 @@ class Middleware:
)
if response.status_code == HTTPStatus.NOT_FOUND:
raise ResourceNotFound(f"No resources found at endpoint <{endpoint}>.")
raise NotFoundError(f"No resources found at endpoint <{endpoint}>.")
data = response.json()
data = response.json(cls=JSONDecoder)
if len(data) and data[0] == "error":
if data[1] == Error.ERR_PARAMS:
if data[1] == _Error.ERR_PARAMS:
raise RequestParametersError("The request was rejected with the " \
f"following parameter error: <{data[2]}>")
if data[1] is None or data[1] == Error.ERR_UNK or data[1] == Error.ERR_GENERIC:
if data[1] is None or data[1] == _Error.ERR_UNK or data[1] == _Error.ERR_GENERIC:
raise UnknownGenericError("The server replied to the request with " \
f"a generic error with message: <{data[2]}>.")
@@ -80,19 +90,19 @@ class Middleware:
)
if response.status_code == HTTPStatus.NOT_FOUND:
raise ResourceNotFound(f"No resources found at endpoint <{endpoint}>.")
raise NotFoundError(f"No resources found at endpoint <{endpoint}>.")
data = response.json()
data = response.json(cls=JSONDecoder)
if isinstance(data, list) and len(data) and data[0] == "error":
if data[1] == Error.ERR_PARAMS:
if data[1] == _Error.ERR_PARAMS:
raise RequestParametersError("The request was rejected with the " \
f"following parameter error: <{data[2]}>")
if data[1] == Error.ERR_AUTH_FAIL:
raise InvalidAuthenticationCredentials("Cannot authenticate with given API-KEY and API-SECRET.")
if data[1] == _Error.ERR_AUTH_FAIL:
raise InvalidCredentialError("Cannot authenticate with given API-KEY and API-SECRET.")
if data[1] is None or data[1] == Error.ERR_UNK or data[1] == Error.ERR_GENERIC:
if data[1] is None or data[1] == _Error.ERR_UNK or data[1] == _Error.ERR_GENERIC:
raise UnknownGenericError("The server replied to the request with " \
f"a generic error with message: <{data[2]}>.")

View File

@@ -1,15 +0,0 @@
import unittest
from .test_types_labeler import TestTypesLabeler
from .test_types_notification import TestTypesNotification
from .test_types_serializers import TestTypesSerializers
def suite():
return unittest.TestSuite([
unittest.makeSuite(TestTypesLabeler),
unittest.makeSuite(TestTypesNotification),
unittest.makeSuite(TestTypesSerializers),
])
if __name__ == "__main__":
unittest.TextTestRunner().run(suite())

View File

@@ -1,56 +0,0 @@
import unittest
from typing import Optional
from dataclasses import dataclass
from ..types.labeler import _Type, generate_labeler_serializer, generate_recursive_serializer
class TestTypesLabeler(unittest.TestCase):
def test_generate_labeler_serializer(self):
@dataclass
class Test(_Type):
A: Optional[int]
B: float
C: str
labels = [ "A", "_PLACEHOLDER", "B", "_PLACEHOLDER", "C" ]
serializer = generate_labeler_serializer("Test", Test, labels)
self.assertEqual(serializer.parse(5, None, 65.0, None, "X"), Test(5, 65.0, "X"),
msg="_Serializer should produce the right result.")
self.assertListEqual(serializer.get_labels(), [ "A", "B", "C" ],
msg="_Serializer::get_labels() should return the right list of labels.")
with self.assertRaises(AssertionError,
msg="_Serializer should raise an AssertionError if given " \
"fewer arguments than the serializer labels."):
serializer.parse(5, 65.0, "X")
def test_generate_recursive_serializer(self):
@dataclass
class Outer(_Type):
A: int
B: float
C: "Middle"
@dataclass
class Middle(_Type):
D: str
E: "Inner"
@dataclass
class Inner(_Type):
F: bool
inner = generate_labeler_serializer("Inner", Inner, ["F"])
middle = generate_recursive_serializer("Middle", Middle, ["D", "E"], serializers={ "E": inner })
outer = generate_recursive_serializer("Outer", Outer, ["A", "B", "C"], serializers={ "C": middle })
self.assertEqual(outer.parse(10, 45.5, [ "Y", [ True ] ]), Outer(10, 45.5, Middle("Y", Inner(True))),
msg="_RecursiveSerializer should produce the right result.")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,29 +0,0 @@
import unittest
from dataclasses import dataclass
from ..types.labeler import generate_labeler_serializer
from ..types.notification import _Type, _Notification, Notification
class TestTypesNotification(unittest.TestCase):
def test_types_notification(self):
@dataclass
class Test(_Type):
A: int
B: float
C: str
test = generate_labeler_serializer("Test", Test,
[ "A", "_PLACEHOLDER", "B", "_PLACEHOLDER", "C" ])
notification = _Notification[Test](test)
actual = notification.parse(*[ 1675787861506, "test", None, None, [ 5, None, 65.0, None, "X" ], \
0, "SUCCESS", "This is just a test notification." ])
expected = Notification[Test](1675787861506, "test", None, Test(5, 65.0, "X"),
0, "SUCCESS", "This is just a test notification.")
self.assertEqual(actual, expected, msg="_Notification should produce the right notification.")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,17 +0,0 @@
import unittest
from ..types import serializers
from ..types.labeler import _Type
class TestTypesSerializers(unittest.TestCase):
def test_types_serializers(self):
for serializer in map(serializers.__dict__.get, serializers.__serializers__):
self.assertTrue(issubclass(serializer.klass, _Type),
f"_Serializer <{serializer.name}>: .klass field must be a subclass " \
f"of _Type (got {serializer.klass}).")
self.assertListEqual(serializer.get_labels(), list(serializer.klass.__annotations__),
f"_Serializer <{serializer.name}> and _Type <{serializer.klass.__name__}> " \
"must have matching labels and fields.")
if __name__ == "__main__":
unittest.main()

View File

@@ -1,4 +1,4 @@
from .dataclasses import JSON, \
from .dataclasses import \
PlatformStatus, TradingPairTicker, FundingCurrencyTicker, \
TickersHistory, TradingPairTrade, FundingCurrencyTrade, \
TradingPairBook, FundingCurrencyBook, TradingPairRawBook, \

View File

@@ -1,12 +1,10 @@
from typing import Union, Type, \
from typing import \
List, Dict, Literal, Optional, Any
from dataclasses import dataclass
from .labeler import _Type, partial, compose
JSON = Union[Dict[str, "JSON"], List["JSON"], bool, int, float, str, Type[None]]
#region Dataclass definitions for types of public use
@dataclass
@@ -129,7 +127,7 @@ class Liquidation(_Type):
base_price: float
is_match: int
is_market_sold: int
price_acquired: float
liquidation_price: float
@dataclass
class Leaderboard(_Type):
@@ -172,7 +170,7 @@ class PulseMessage(_Type):
comments_disabled: int
tags: List[str]
attachments: List[str]
meta: List[JSON]
meta: List[Dict[str, Any]]
likes: int
profile: PulseProfile
comments: int
@@ -231,7 +229,7 @@ class LoginHistory(_Type):
id: int
time: int
ip: str
extra_info: JSON
extra_info: Dict[str, Any]
@dataclass
class BalanceAvailable(_Type):
@@ -260,7 +258,7 @@ class Order(_Type):
hidden: int
placed_id: int
routing: str
meta: JSON
meta: Dict[str, Any]
@dataclass
class Position(_Type):
@@ -280,7 +278,7 @@ class Position(_Type):
type: int
collateral: float
collateral_min: float
meta: JSON
meta: Dict[str, Any]
@dataclass
class Trade(_Type):
@@ -409,7 +407,7 @@ class Wallet(_Type):
unsettled_interest: float
available_balance: float
last_change: str
trade_details: JSON
trade_details: Dict[str, Any]
@dataclass
class Transfer(_Type):
@@ -486,7 +484,7 @@ class PositionClaim(_Type):
pos_type: int
collateral: str
min_collateral: str
meta: JSON
meta: Dict[str, Any]
@dataclass
class PositionIncreaseInfo(_Type):
@@ -547,7 +545,7 @@ class PositionAudit(_Type):
type: int
collateral: float
collateral_min: float
meta: JSON
meta: Dict[str, Any]
@dataclass
class DerivativePositionCollateral(_Type):
@@ -618,7 +616,7 @@ class InvoiceSubmission(_Type):
pay_currency: str
pool_currency: str
address: str
ext: JSON
ext: Dict[str, Any]
@compose(dataclass, partial)
class Payment:
@@ -659,8 +657,8 @@ class InvoiceStats(_Type):
@dataclass
class CurrencyConversion(_Type):
base_currency: str
convert_currency: str
base_ccy: str
convert_ccy: str
created: int
@dataclass

View File

@@ -34,8 +34,8 @@ class _Type:
class _Serializer(Generic[T]):
def __init__(self, name: str, klass: Type[_Type], labels: List[str],
*, flat: bool = False, ignore: List[str] = [ "_PLACEHOLDER" ]):
self.name, self.klass, self.__labels, self.__flat, self.__ignore = name, klass, labels, flat, ignore
*, flat: bool = False):
self.name, self.klass, self.__labels, self.__flat = name, klass, labels, flat
def _serialize(self, *args: Any) -> Iterable[Tuple[str, Any]]:
if self.__flat:
@@ -46,14 +46,14 @@ class _Serializer(Generic[T]):
"arguments should contain the same amount of elements.")
for index, label in enumerate(self.__labels):
if label not in self.__ignore:
if label != "_PLACEHOLDER":
yield label, args[index]
def parse(self, *values: Any) -> T:
return cast(T, self.klass(**dict(self._serialize(*values))))
def get_labels(self) -> List[str]:
return [ label for label in self.__labels if label not in self.__ignore ]
return [ label for label in self.__labels if label != "_PLACEHOLDER" ]
@classmethod
def __flatten(cls, array: List[Any]) -> List[Any]:
@@ -68,8 +68,8 @@ class _Serializer(Generic[T]):
class _RecursiveSerializer(_Serializer, Generic[T]):
def __init__(self, name: str, klass: Type[_Type], labels: List[str],
*, serializers: Dict[str, _Serializer[Any]],
flat: bool = False, ignore: List[str] = [ "_PLACEHOLDER" ]):
super().__init__(name, klass, labels, flat=flat, ignore=ignore)
flat: bool = False):
super().__init__(name, klass, labels, flat=flat)
self.serializers = serializers
@@ -83,14 +83,14 @@ class _RecursiveSerializer(_Serializer, Generic[T]):
return cast(T, self.klass(**serialization))
def generate_labeler_serializer(name: str, klass: Type[T], labels: List[str],
*, flat: bool = False, ignore: List[str] = [ "_PLACEHOLDER" ]
*, flat: bool = False
) -> _Serializer[T]:
return _Serializer[T](name, klass, labels, \
flat=flat, ignore=ignore)
flat=flat)
def generate_recursive_serializer(name: str, klass: Type[T], labels: List[str],
*, serializers: Dict[str, _Serializer[Any]],
flat: bool = False, ignore: List[str] = [ "_PLACEHOLDER" ]
flat: bool = False
) -> _RecursiveSerializer[T]:
return _RecursiveSerializer[T](name, klass, labels, \
serializers=serializers, flat=flat, ignore=ignore)
serializers=serializers, flat=flat)

View File

@@ -18,7 +18,7 @@ class _Notification(_Serializer, Generic[T]):
__LABELS = [ "mts", "type", "message_id", "_PLACEHOLDER", "data", "code", "status", "text" ]
def __init__(self, serializer: Optional[_Serializer] = None, is_iterable: bool = False):
super().__init__("Notification", Notification, _Notification.__LABELS, ignore = [ "_PLACEHOLDER" ])
super().__init__("Notification", Notification, _Notification.__LABELS)
self.serializer, self.is_iterable = serializer, is_iterable

View File

@@ -1,6 +1,7 @@
from .import dataclasses
from .labeler import \
#pylint: disable-next=unused-import
from .labeler import _Serializer, \
generate_labeler_serializer, generate_recursive_serializer
#pylint: disable-next=unused-import
@@ -229,7 +230,7 @@ Liquidation = generate_labeler_serializer(
"is_match",
"is_market_sold",
"_PLACEHOLDER",
"price_acquired"
"liquidation_price"
]
)

View File

@@ -1,5 +0,0 @@
REST_HOST = "https://api.bitfinex.com/v2"
PUB_REST_HOST = "https://api-pub.bitfinex.com/v2"
WSS_HOST = "wss://api.bitfinex.com/ws/2"
PUB_WSS_HOST = "wss://api-pub.bitfinex.com/ws/2"

View File

@@ -1,31 +0,0 @@
import json
from decimal import Decimal
from datetime import datetime
from typing import Type, List, Dict, Union, Any
JSON = Union[Dict[str, "JSON"], List["JSON"], bool, int, float, str, Type[None]]
def _strip(dictionary: Dict) -> Dict:
return { key: value for key, value in dictionary.items() if value is not None }
def _convert_float_to_str(data: JSON) -> JSON:
if isinstance(data, float):
return format(Decimal(repr(data)), "f")
if isinstance(data, list):
return [ _convert_float_to_str(sub_data) for sub_data in data ]
if isinstance(data, dict):
return _strip({ key: _convert_float_to_str(value) for key, value in data.items() })
return data
class JSONEncoder(json.JSONEncoder):
def encode(self, o: JSON) -> str:
return json.JSONEncoder.encode(self, _convert_float_to_str(o))
def default(self, o: Any) -> Any:
if isinstance(o, Decimal):
return format(o, "f")
if isinstance(o, datetime):
return str(o)
return json.JSONEncoder.default(self, o)

View File

@@ -1,51 +0,0 @@
import logging, sys
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = range(8)
COLOR_SEQ, ITALIC_COLOR_SEQ = "\033[1;%dm", "\033[3;%dm"
COLORS = {
"DEBUG": CYAN,
"INFO": BLUE,
"WARNING": YELLOW,
"ERROR": RED
}
RESET_SEQ = "\033[0m"
class _ColorFormatter(logging.Formatter):
def __init__(self, msg, use_color = True):
logging.Formatter.__init__(self, msg, "%d-%m-%Y %H:%M:%S")
self.use_color = use_color
def format(self, record):
levelname = record.levelname
if self.use_color and levelname in COLORS:
record.name = ITALIC_COLOR_SEQ % (30 + BLACK) + record.name + RESET_SEQ
record.levelname = COLOR_SEQ % (30 + COLORS[levelname]) + levelname + RESET_SEQ
return logging.Formatter.format(self, record)
class ColorLogger(logging.Logger):
FORMAT = "[%(name)s] [%(levelname)s] [%(asctime)s] %(message)s"
def __init__(self, name, level):
logging.Logger.__init__(self, name, level)
colored_formatter = _ColorFormatter(self.FORMAT, use_color=True)
handler = logging.StreamHandler(stream=sys.stderr)
handler.setFormatter(fmt=colored_formatter)
self.addHandler(hdlr=handler)
class FileLogger(logging.Logger):
FORMAT = "[%(name)s] [%(levelname)s] [%(asctime)s] %(message)s"
def __init__(self, name, level, filename):
logging.Logger.__init__(self, name, level)
formatter = logging.Formatter(self.FORMAT)
handler = logging.FileHandler(filename=filename)
handler.setFormatter(fmt=formatter)
self.addHandler(hdlr=handler)

View File

@@ -1 +0,0 @@
__version__ = "3.0.0b2"

View File

@@ -1 +1 @@
from .client import BfxWebSocketClient, BfxWebSocketBucket, BfxWebSocketInputs
from ._client import BfxWebSocketClient

View File

@@ -0,0 +1 @@
from .bfx_websocket_client import BfxWebSocketClient

View File

@@ -0,0 +1,153 @@
from typing import \
List, Dict, Any, \
Optional, cast
import asyncio, json, uuid
import websockets.client
from pyee import EventEmitter
from bfxapi._utils.json_decoder import JSONDecoder
from bfxapi.websocket._connection import Connection
from bfxapi.websocket._handlers import PublicChannelsHandler
from bfxapi.websocket.subscriptions import Subscription
_CHECKSUM_FLAG_VALUE = 131_072
def _strip(message: Dict[str, Any], keys: List[str]) -> Dict[str, Any]:
return { key: value for key, value in message.items() \
if not key in keys }
class BfxWebSocketBucket(Connection):
__MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25
def __init__(self, host: str, event_emitter: EventEmitter) -> None:
super().__init__(host)
self.__event_emitter = event_emitter
self.__pendings: List[Dict[str, Any]] = [ ]
self.__subscriptions: Dict[int, Subscription] = { }
self.__condition = asyncio.locks.Condition()
self.__handler = PublicChannelsHandler( \
event_emitter=self.__event_emitter)
@property
def count(self) -> int:
return len(self.__pendings) + \
len(self.__subscriptions)
@property
def is_full(self) -> bool:
return self.count == \
BfxWebSocketBucket.__MAXIMUM_SUBSCRIPTIONS_AMOUNT
@property
def ids(self) -> List[str]:
return [ pending["subId"] for pending in self.__pendings ] + \
[ subscription["sub_id"] for subscription in self.__subscriptions.values() ]
async def start(self) -> None:
async with websockets.client.connect(self._host) as websocket:
self._websocket = websocket
await self.__recover_state()
async with self.__condition:
self.__condition.notify(1)
async for _message in self._websocket:
message = json.loads(_message, cls=JSONDecoder)
if isinstance(message, dict):
if message["event"] == "subscribed":
self.__on_subscribed(message)
if isinstance(message, list):
if (chan_id := cast(int, message[0])) and \
(message[1] != Connection._HEARTBEAT):
self.__handler.handle(self.__subscriptions[chan_id], message[1:])
def __on_subscribed(self, message: Dict[str, Any]) -> None:
chan_id = cast(int, message["chan_id"])
subscription = cast(Subscription, _strip(message, \
keys=["chan_id", "event", "pair", "currency"]))
self.__pendings = [ pending \
for pending in self.__pendings \
if pending["subId"] != message["sub_id"] ]
self.__subscriptions[chan_id] = subscription
self.__event_emitter.emit("subscribed", subscription)
async def __recover_state(self) -> None:
for pending in self.__pendings:
await self._websocket.send(message = \
json.dumps(pending))
for chan_id in list(self.__subscriptions.keys()):
subscription = self.__subscriptions.pop(chan_id)
await self.subscribe(**subscription)
await self.__set_config([ _CHECKSUM_FLAG_VALUE ])
async def __set_config(self, flags: List[int]) -> None:
await self._websocket.send(json.dumps( \
{ "event": "conf", "flags": sum(flags) }))
@Connection._require_websocket_connection
async def subscribe(self,
channel: str,
sub_id: Optional[str] = None,
**kwargs: Any) -> None:
subscription: Dict[str, Any] = \
{ **kwargs, "event": "subscribe", "channel": channel }
subscription["subId"] = sub_id or str(uuid.uuid4())
self.__pendings.append(subscription)
await self._websocket.send(message = \
json.dumps(subscription))
@Connection._require_websocket_connection
async def unsubscribe(self, sub_id: str) -> None:
for chan_id, subscription in list(self.__subscriptions.items()):
if subscription["sub_id"] == sub_id:
unsubscription = {
"event": "unsubscribe",
"chanId": chan_id }
del self.__subscriptions[chan_id]
await self._websocket.send(message = \
json.dumps(unsubscription))
@Connection._require_websocket_connection
async def resubscribe(self, sub_id: str) -> None:
for subscription in self.__subscriptions.values():
if subscription["sub_id"] == sub_id:
await self.unsubscribe(sub_id)
await self.subscribe(**subscription)
@Connection._require_websocket_connection
async def close(self, code: int = 1000, reason: str = str()) -> None:
await self._websocket.close(code, reason)
def has(self, sub_id: str) -> bool:
for subscription in self.__subscriptions.values():
if subscription["sub_id"] == sub_id:
return True
return False
async def wait(self) -> None:
async with self.__condition:
await self.__condition \
.wait_for(lambda: self.open)

View File

@@ -0,0 +1,340 @@
from typing import \
TypedDict, List, Dict, \
Optional, Any
from logging import Logger
from datetime import datetime
from socket import gaierror
from asyncio import Task
import \
traceback, json, asyncio, \
random, websockets
import websockets.client
from websockets.exceptions import \
ConnectionClosedError, \
InvalidStatusCode
from bfxapi._utils.json_encoder import JSONEncoder
from bfxapi.websocket._connection import Connection
from bfxapi.websocket._handlers import AuthEventsHandler
from bfxapi.websocket._event_emitter import BfxEventEmitter
from bfxapi.exceptions import \
InvalidCredentialError
from bfxapi.websocket.exceptions import \
ReconnectionTimeoutError, \
VersionMismatchError, \
UnknownChannelError, \
UnknownSubscriptionError, \
SubIdError
from .bfx_websocket_bucket import BfxWebSocketBucket
from .bfx_websocket_inputs import BfxWebSocketInputs
_Credentials = TypedDict("_Credentials", \
{ "api_key": str, "api_secret": str, "filters": Optional[List[str]] })
_Reconnection = TypedDict("_Reconnection",
{ "attempts": int, "reason": str, "timestamp": datetime })
_DEFAULT_LOGGER = Logger("bfxapi.websocket._client", level=0)
class _Delay:
__BACKOFF_MIN = 1.92
__BACKOFF_MAX = 60.0
def __init__(self, backoff_factor: float) -> None:
self.__backoff_factor = backoff_factor
self.__backoff_delay = _Delay.__BACKOFF_MIN
self.__initial_delay = random.uniform(1.0, 5.0)
def next(self) -> float:
_backoff_delay = self.peek()
__backoff_delay = _backoff_delay * self.__backoff_factor
self.__backoff_delay = min(__backoff_delay, _Delay.__BACKOFF_MAX)
return _backoff_delay
def peek(self) -> float:
return (self.__backoff_delay == _Delay.__BACKOFF_MIN) \
and self.__initial_delay or self.__backoff_delay
def reset(self) -> None:
self.__backoff_delay = _Delay.__BACKOFF_MIN
#pylint: disable-next=too-many-instance-attributes
class BfxWebSocketClient(Connection):
def __init__(self,
host: str,
*,
credentials: Optional[_Credentials] = None,
timeout: Optional[int] = 60 * 15,
logger: Logger = _DEFAULT_LOGGER) -> None:
super().__init__(host)
self.__credentials, self.__timeout, self.__logger = \
credentials, \
timeout, \
logger
self.__buckets: Dict[BfxWebSocketBucket, Optional[Task]] = { }
self.__reconnection: Optional[_Reconnection] = None
self.__event_emitter = BfxEventEmitter(loop=None)
self.__handler = AuthEventsHandler( \
event_emitter=self.__event_emitter)
self.__inputs = BfxWebSocketInputs( \
handle_websocket_input=self.__handle_websocket_input)
@self.__event_emitter.listens_to("error")
def error(exception: Exception) -> None:
header = f"{type(exception).__name__}: {str(exception)}"
stack_trace = traceback.format_exception( \
type(exception), exception, exception.__traceback__)
#pylint: disable-next=logging-not-lazy
self.__logger.critical(header + "\n" + \
str().join(stack_trace)[:-1])
@property
def inputs(self) -> BfxWebSocketInputs:
return self.__inputs
def run(self) -> None:
return asyncio.get_event_loop() \
.run_until_complete(self.start())
#pylint: disable-next=too-many-branches
async def start(self) -> None:
_delay = _Delay(backoff_factor=1.618)
_sleep: Optional[Task] = None
def _on_timeout():
if not self.open:
if _sleep:
_sleep.cancel()
while True:
if self.__reconnection:
_sleep = asyncio.create_task( \
asyncio.sleep(int(_delay.next())))
try:
await _sleep
except asyncio.CancelledError:
raise ReconnectionTimeoutError("Connection has been offline for too long " \
f"without being able to reconnect (timeout: {self.__timeout}s).") \
from None
try:
await self.__connect()
except (ConnectionClosedError, InvalidStatusCode, gaierror) as error:
async def _cancel(task: Task) -> None:
task.cancel()
try:
await task
except (ConnectionClosedError, InvalidStatusCode, gaierror) as _e:
if type(error) is not type(_e) or error.args != _e.args:
raise _e
except asyncio.CancelledError:
pass
# pylint: disable-next=consider-using-dict-items
for bucket in self.__buckets:
if task := self.__buckets[bucket]:
self.__buckets[bucket] = None
await _cancel(task)
if isinstance(error, ConnectionClosedError) and error.code in (1006, 1012):
if error.code == 1006:
self.__logger.error("Connection lost: trying to reconnect...")
if error.code == 1012:
self.__logger.warning("WSS server is restarting: all " \
"clients need to reconnect (server sent 20051).")
if self.__timeout:
asyncio.get_event_loop().call_later(
self.__timeout, _on_timeout)
self.__reconnection = \
{ "attempts": 1, "reason": error.reason, "timestamp": datetime.now() }
self._authentication = False
_delay.reset()
elif ((isinstance(error, InvalidStatusCode) and error.status_code == 408) or \
isinstance(error, gaierror)) and self.__reconnection:
#pylint: disable-next=logging-fstring-interpolation
self.__logger.warning("Reconnection attempt unsuccessful (no." \
f"{self.__reconnection['attempts']}): next attempt in " \
f"~{int(_delay.peek())}.0s.")
#pylint: disable-next=logging-fstring-interpolation
self.__logger.info(f"The client has been offline for " \
f"{datetime.now() - self.__reconnection['timestamp']}.")
self.__reconnection["attempts"] += 1
else:
raise error
if not self.__reconnection:
self.__event_emitter.emit("disconnected",
self._websocket.close_code, \
self._websocket.close_reason)
break
async def __connect(self) -> None:
async with websockets.client.connect(self._host) as websocket:
if self.__reconnection:
#pylint: disable-next=logging-fstring-interpolation
self.__logger.warning("Reconnection attempt successful (no." \
f"{self.__reconnection['attempts']}): recovering " \
"connection state...")
self.__reconnection = None
self._websocket = websocket
for bucket in self.__buckets:
self.__buckets[bucket] = \
asyncio.create_task(bucket.start())
if len(self.__buckets) == 0 or \
(await asyncio.gather(*[bucket.wait() for bucket in self.__buckets])):
self.__event_emitter.emit("open")
if self.__credentials:
authentication = Connection. \
_get_authentication_message(**self.__credentials)
await self._websocket.send(authentication)
async for _message in self._websocket:
message = json.loads(_message)
if isinstance(message, dict):
if message["event"] == "info" and "version" in message:
if message["version"] != 2:
raise VersionMismatchError("Mismatch between the client and the server version: " + \
"please update bitfinex-api-py to the latest version to resolve this error " + \
f"(client version: 2, server version: {message['version']}).")
elif message["event"] == "info" and message["code"] == 20051:
rcvd = websockets.frames.Close( \
1012, "Stop/Restart WebSocket Server (please reconnect).")
raise ConnectionClosedError(rcvd=rcvd, sent=None)
elif message["event"] == "auth":
if message["status"] != "OK":
raise InvalidCredentialError("Can't authenticate " + \
"with given API-KEY and API-SECRET.")
self.__event_emitter.emit("authenticated", message)
self._authentication = True
if isinstance(message, list) and \
message[0] == 0 and message[1] != Connection._HEARTBEAT:
self.__handler.handle(message[1], message[2])
async def __new_bucket(self) -> BfxWebSocketBucket:
bucket = BfxWebSocketBucket( \
self._host, self.__event_emitter)
self.__buckets[bucket] = asyncio \
.create_task(bucket.start())
await bucket.wait()
return bucket
@Connection._require_websocket_connection
async def subscribe(self,
channel: str,
sub_id: Optional[str] = None,
**kwargs: Any) -> None:
if not channel in ["ticker", "trades", "book", "candles", "status"]:
raise UnknownChannelError("Available channels are: " + \
"ticker, trades, book, candles and status.")
for bucket in self.__buckets:
if sub_id in bucket.ids:
raise SubIdError("sub_id must be " + \
"unique for all subscriptions.")
for bucket in self.__buckets:
if not bucket.is_full:
return await bucket.subscribe( \
channel, sub_id, **kwargs)
bucket = await self.__new_bucket()
return await bucket.subscribe( \
channel, sub_id, **kwargs)
@Connection._require_websocket_connection
async def unsubscribe(self, sub_id: str) -> None:
# pylint: disable-next=consider-using-dict-items
for bucket in self.__buckets:
if bucket.has(sub_id):
if bucket.count == 1:
del self.__buckets[bucket]
return await bucket.close( \
code=1001, reason="Going Away")
return await bucket.unsubscribe(sub_id)
raise UnknownSubscriptionError("Unable to find " + \
f"a subscription with sub_id <{sub_id}>.")
@Connection._require_websocket_connection
async def resubscribe(self, sub_id: str) -> None:
for bucket in self.__buckets:
if bucket.has(sub_id):
return await bucket.resubscribe(sub_id)
raise UnknownSubscriptionError("Unable to find " + \
f"a subscription with sub_id <{sub_id}>.")
@Connection._require_websocket_connection
async def close(self, code: int = 1000, reason: str = str()) -> None:
for bucket in self.__buckets:
await bucket.close(code=code, reason=reason)
if self._websocket.open:
await self._websocket.close( \
code=code, reason=reason)
@Connection._require_websocket_authentication
async def notify(self,
info: Any,
message_id: Optional[int] = None,
**kwargs: Any) -> None:
await self._websocket.send(
json.dumps([ 0, "n", message_id,
{ "type": "ucm-test", "info": info, **kwargs } ]))
@Connection._require_websocket_authentication
async def __handle_websocket_input(self, event: str, data: Any) -> None:
await self._websocket.send(json.dumps( \
[ 0, event, None, data], cls=JSONEncoder))
def on(self, event, callback = None):
return self.__event_emitter.on(event, callback)

View File

@@ -0,0 +1,95 @@
from typing import \
Callable, Awaitable, Tuple, \
List, Union, Optional, \
Any
from decimal import Decimal
_Handler = Callable[[str, Any], Awaitable[None]]
class BfxWebSocketInputs:
def __init__(self, handle_websocket_input: _Handler) -> None:
self.__handle_websocket_input = handle_websocket_input
async def submit_order(self,
type: str,
symbol: str,
amount: Union[str, float, Decimal],
price: Union[str, float, Decimal],
*,
lev: Optional[int] = None,
price_trailing: Optional[Union[str, float, Decimal]] = None,
price_aux_limit: Optional[Union[str, float, Decimal]] = None,
price_oco_stop: Optional[Union[str, float, Decimal]] = None,
gid: Optional[int] = None,
cid: Optional[int] = None,
flags: Optional[int] = None,
tif: Optional[str] = None) -> None:
await self.__handle_websocket_input("on", {
"type": type, "symbol": symbol, "amount": amount,
"price": price, "lev": lev, "price_trailing": price_trailing,
"price_aux_limit": price_aux_limit, "price_oco_stop": price_oco_stop, "gid": gid,
"cid": cid, "flags": flags, "tif": tif
})
async def update_order(self,
id: int,
*,
amount: Optional[Union[str, float, Decimal]] = None,
price: Optional[Union[str, float, Decimal]] = None,
cid: Optional[int] = None,
cid_date: Optional[str] = None,
gid: Optional[int] = None,
flags: Optional[int] = None,
lev: Optional[int] = None,
delta: Optional[Union[str, float, Decimal]] = None,
price_aux_limit: Optional[Union[str, float, Decimal]] = None,
price_trailing: Optional[Union[str, float, Decimal]] = None,
tif: Optional[str] = None) -> None:
await self.__handle_websocket_input("ou", {
"id": id, "amount": amount, "price": price,
"cid": cid, "cid_date": cid_date, "gid": gid,
"flags": flags, "lev": lev, "delta": delta,
"price_aux_limit": price_aux_limit, "price_trailing": price_trailing, "tif": tif
})
async def cancel_order(self,
*,
id: Optional[int] = None,
cid: Optional[int] = None,
cid_date: Optional[str] = None) -> None:
await self.__handle_websocket_input("oc", {
"id": id, "cid": cid, "cid_date": cid_date
})
async def cancel_order_multi(self,
*,
id: Optional[List[int]] = None,
cid: Optional[List[Tuple[int, str]]] = None,
gid: Optional[List[int]] = None,
all: Optional[bool] = None) -> None:
await self.__handle_websocket_input("oc_multi", {
"id": id, "cid": cid, "gid": gid,
"all": all
})
#pylint: disable-next=too-many-arguments
async def submit_funding_offer(self,
type: str,
symbol: str,
amount: Union[str, float, Decimal],
rate: Union[str, float, Decimal],
period: int,
*,
flags: Optional[int] = None) -> None:
await self.__handle_websocket_input("fon", {
"type": type, "symbol": symbol, "amount": amount,
"rate": rate, "period": period, "flags": flags
})
async def cancel_funding_offer(self, id: int) -> None:
await self.__handle_websocket_input("foc", { "id": id })
async def calc(self, *args: str) -> None:
await self.__handle_websocket_input("calc",
list(map(lambda arg: [arg], args)))

View File

@@ -0,0 +1,111 @@
from typing import \
TypeVar, Callable, Awaitable, \
List, Dict, Optional, \
Any, cast
# pylint: disable-next=wrong-import-order
from typing_extensions import \
ParamSpec, Concatenate
from abc import \
ABC, abstractmethod
from functools import wraps
from datetime import datetime
import hmac, hashlib, json
from websockets.client import WebSocketClientProtocol
from bfxapi.websocket.exceptions import \
ConnectionNotOpen, ActionRequiresAuthentication
_S = TypeVar("_S", bound="Connection")
_R = TypeVar("_R")
_P = ParamSpec("_P")
class Connection(ABC):
_HEARTBEAT = "hb"
def __init__(self, host: str) -> None:
self._host = host
self._authentication: bool = False
self.__protocol: Optional[WebSocketClientProtocol] = None
@property
def open(self) -> bool:
return self.__protocol is not None and \
self.__protocol.open
@property
def authentication(self) -> bool:
return self._authentication
@property
def _websocket(self) -> WebSocketClientProtocol:
return cast(WebSocketClientProtocol, self.__protocol)
@_websocket.setter
def _websocket(self, protocol: WebSocketClientProtocol) -> None:
self.__protocol = protocol
@abstractmethod
async def start(self) -> None:
...
@staticmethod
def _require_websocket_connection(
function: Callable[Concatenate[_S, _P], Awaitable[_R]]
) -> Callable[Concatenate[_S, _P], Awaitable[_R]]:
@wraps(function)
async def wrapper(self: _S, *args: Any, **kwargs: Any) -> _R:
if self.open:
return await function(self, *args, **kwargs)
raise ConnectionNotOpen("No open connection with the server.")
return wrapper
@staticmethod
def _require_websocket_authentication(
function: Callable[Concatenate[_S, _P], Awaitable[_R]]
) -> Callable[Concatenate[_S, _P], Awaitable[_R]]:
@wraps(function)
async def wrapper(self: _S, *args: Any, **kwargs: Any) -> _R:
if not self.authentication:
raise ActionRequiresAuthentication("To perform this action you need to " \
"authenticate using your API_KEY and API_SECRET.")
internal = Connection._require_websocket_connection(function)
return await internal(self, *args, **kwargs)
return wrapper
@staticmethod
def _get_authentication_message(
api_key: str,
api_secret: str,
filters: Optional[List[str]] = None
) -> str:
message: Dict[str, Any] = \
{ "event": "auth", "filter": filters, "apiKey": api_key }
message["authNonce"] = round(datetime.now().timestamp() * 1_000_000)
message["authPayload"] = f"AUTH{message['authNonce']}"
auth_sig = hmac.new(
key=api_secret.encode("utf8"),
msg=message["authPayload"].encode("utf8"),
digestmod=hashlib.sha384
)
message["authSig"] = auth_sig.hexdigest()
return json.dumps(message)

View File

@@ -0,0 +1 @@
from .bfx_event_emitter import BfxEventEmitter

View File

@@ -0,0 +1,90 @@
from typing import \
TypeVar, Callable, List, \
Dict, Union, Optional, \
Any
from collections import defaultdict
from asyncio import AbstractEventLoop
from pyee.asyncio import AsyncIOEventEmitter
from bfxapi.websocket.exceptions import UnknownEventError
_Handler = TypeVar("_Handler", bound=Callable[..., None])
_ONCE_PER_CONNECTION = [
"open", "authenticated", "order_snapshot",
"position_snapshot", "funding_offer_snapshot", "funding_credit_snapshot",
"funding_loan_snapshot", "wallet_snapshot"
]
_ONCE_PER_SUBSCRIPTION = [
"subscribed", "t_trades_snapshot", "f_trades_snapshot",
"t_book_snapshot", "f_book_snapshot", "t_raw_book_snapshot",
"f_raw_book_snapshot", "candles_snapshot"
]
_COMMON = [
"disconnected", "t_ticker_update", "f_ticker_update",
"t_trade_execution", "t_trade_execution_update", "f_trade_execution",
"f_trade_execution_update", "t_book_update", "f_book_update",
"t_raw_book_update", "f_raw_book_update", "candles_update",
"derivatives_status_update", "liquidation_feed_update", "checksum",
"order_new", "order_update", "order_cancel",
"position_new", "position_update", "position_close",
"funding_offer_new", "funding_offer_update", "funding_offer_cancel",
"funding_credit_new", "funding_credit_update", "funding_credit_close",
"funding_loan_new", "funding_loan_update", "funding_loan_close",
"trade_execution", "trade_execution_update", "wallet_update",
"notification", "on-req-notification", "ou-req-notification",
"oc-req-notification", "fon-req-notification", "foc-req-notification"
]
class BfxEventEmitter(AsyncIOEventEmitter):
_EVENTS = _ONCE_PER_CONNECTION + \
_ONCE_PER_SUBSCRIPTION + \
_COMMON
def __init__(self, loop: Optional[AbstractEventLoop] = None) -> None:
super().__init__(loop)
self._connection: List[str] = [ ]
self._subscriptions: Dict[str, List[str]] = \
defaultdict(lambda: [ ])
def emit(
self,
event: str,
*args: Any,
**kwargs: Any
) -> bool:
if event in _ONCE_PER_CONNECTION:
if event in self._connection:
return self._has_listeners(event)
self._connection += [ event ]
if event in _ONCE_PER_SUBSCRIPTION:
sub_id = args[0]["sub_id"]
if event in self._subscriptions[sub_id]:
return self._has_listeners(event)
self._subscriptions[sub_id] += [ event ]
return super().emit(event, *args, **kwargs)
def on(
self, event: str, f: Optional[_Handler] = None
) -> Union[_Handler, Callable[[_Handler], _Handler]]:
if event not in BfxEventEmitter._EVENTS:
raise UnknownEventError(f"Can't register to unknown event: <{event}> " + \
"(to get a full list of available events see https://docs.bitfinex.com/).")
return super().on(event, f)
def _has_listeners(self, event: str) -> bool:
with self._lock:
listeners = self._events.get(event)
return bool(listeners)

View File

@@ -0,0 +1,3 @@
from .public_channels_handler import PublicChannelsHandler
from .auth_events_handler import AuthEventsHandler

View File

@@ -0,0 +1,66 @@
from typing import \
Dict, Tuple, Any
from pyee.base import EventEmitter
from bfxapi.types import serializers
from bfxapi.types.serializers import _Notification
from bfxapi.types.dataclasses import \
Order, FundingOffer
class AuthEventsHandler:
__ABBREVIATIONS = {
"os": "order_snapshot", "on": "order_new", "ou": "order_update",
"oc": "order_cancel", "ps": "position_snapshot", "pn": "position_new",
"pu": "position_update", "pc": "position_close", "te": "trade_execution",
"tu": "trade_execution_update", "fos": "funding_offer_snapshot", "fon": "funding_offer_new",
"fou": "funding_offer_update", "foc": "funding_offer_cancel", "fcs": "funding_credit_snapshot",
"fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close",
"fls": "funding_loan_snapshot", "fln": "funding_loan_new", "flu": "funding_loan_update",
"flc": "funding_loan_close", "ws": "wallet_snapshot", "wu": "wallet_update"
}
__SERIALIZERS: Dict[Tuple[str, ...], serializers._Serializer] = {
("os", "on", "ou", "oc"): serializers.Order,
("ps", "pn", "pu", "pc"): serializers.Position,
("te", "tu"): serializers.Trade,
("fos", "fon", "fou", "foc"): serializers.FundingOffer,
("fcs", "fcn", "fcu", "fcc"): serializers.FundingCredit,
("fls", "fln", "flu", "flc"): serializers.FundingLoan,
("ws", "wu"): serializers.Wallet
}
def __init__(self, event_emitter: EventEmitter) -> None:
self.__event_emitter = event_emitter
def handle(self, abbrevation: str, stream: Any) -> None:
if abbrevation == "n":
self.__notification(stream)
for abbrevations, serializer in AuthEventsHandler.__SERIALIZERS.items():
if abbrevation in abbrevations:
event = AuthEventsHandler.__ABBREVIATIONS[abbrevation]
if all(isinstance(sub_stream, list) for sub_stream in stream):
data = [ serializer.parse(*sub_stream) for sub_stream in stream ]
else:
data = serializer.parse(*stream)
self.__event_emitter.emit(event, data)
def __notification(self, stream: Any) -> None:
event: str = "notification"
serializer: _Notification = _Notification[None](serializer=None)
if stream[1] in ("on-req", "ou-req", "oc-req"):
event, serializer = f"{stream[1]}-notification", \
_Notification[Order](serializer=serializers.Order)
if stream[1] in ("fon-req", "foc-req"):
event, serializer = f"{stream[1]}-notification", \
_Notification[FundingOffer](serializer=serializers.FundingOffer)
self.__event_emitter.emit(event, serializer.parse(*stream))

View File

@@ -0,0 +1,135 @@
from typing import \
List, Any, cast
from pyee.base import EventEmitter
from bfxapi.types import serializers
from bfxapi.websocket.subscriptions import \
Subscription, Ticker, Trades, \
Book, Candles, Status
_CHECKSUM = "cs"
class PublicChannelsHandler:
def __init__(self, event_emitter: EventEmitter) -> None:
self.__event_emitter = event_emitter
def handle(self, subscription: Subscription, stream: List[Any]) -> None:
if subscription["channel"] == "ticker":
self.__ticker_channel_handler(cast(Ticker, subscription), stream)
elif subscription["channel"] == "trades":
self.__trades_channel_handler(cast(Trades, subscription), stream)
elif subscription["channel"] == "book":
subscription = cast(Book, subscription)
if stream[0] == _CHECKSUM:
self.__checksum_handler(subscription, stream[1])
else:
if subscription["prec"] != "R0":
self.__book_channel_handler(subscription, stream)
else:
self.__raw_book_channel_handler(subscription, stream)
elif subscription["channel"] == "candles":
self.__candles_channel_handler(cast(Candles, subscription), stream)
elif subscription["channel"] == "status":
self.__status_channel_handler(cast(Status, subscription), stream)
#pylint: disable-next=inconsistent-return-statements
def __ticker_channel_handler(self, subscription: Ticker, stream: List[Any]):
if subscription["symbol"].startswith("t"):
return self.__event_emitter.emit("t_ticker_update", subscription, \
serializers.TradingPairTicker.parse(*stream[0]))
if subscription["symbol"].startswith("f"):
return self.__event_emitter.emit("f_ticker_update", subscription, \
serializers.FundingCurrencyTicker.parse(*stream[0]))
#pylint: disable-next=inconsistent-return-statements
def __trades_channel_handler(self, subscription: Trades, stream: List[Any]):
if (event := stream[0]) and event in [ "te", "tu", "fte", "ftu" ]:
events = { "te": "t_trade_execution", "tu": "t_trade_execution_update", \
"fte": "f_trade_execution", "ftu": "f_trade_execution_update" }
if subscription["symbol"].startswith("t"):
return self.__event_emitter.emit(events[event], subscription, \
serializers.TradingPairTrade.parse(*stream[1]))
if subscription["symbol"].startswith("f"):
return self.__event_emitter.emit(events[event], subscription, \
serializers.FundingCurrencyTrade.parse(*stream[1]))
if subscription["symbol"].startswith("t"):
return self.__event_emitter.emit("t_trades_snapshot", subscription, \
[ serializers.TradingPairTrade.parse(*sub_stream) \
for sub_stream in stream[0] ])
if subscription["symbol"].startswith("f"):
return self.__event_emitter.emit("f_trades_snapshot", subscription, \
[ serializers.FundingCurrencyTrade.parse(*sub_stream) \
for sub_stream in stream[0] ])
#pylint: disable-next=inconsistent-return-statements
def __book_channel_handler(self, subscription: Book, stream: List[Any]):
if subscription["symbol"].startswith("t"):
if all(isinstance(sub_stream, list) for sub_stream in stream[0]):
return self.__event_emitter.emit("t_book_snapshot", subscription, \
[ serializers.TradingPairBook.parse(*sub_stream) \
for sub_stream in stream[0] ])
return self.__event_emitter.emit("t_book_update", subscription, \
serializers.TradingPairBook.parse(*stream[0]))
if subscription["symbol"].startswith("f"):
if all(isinstance(sub_stream, list) for sub_stream in stream[0]):
return self.__event_emitter.emit("f_book_snapshot", subscription, \
[ serializers.FundingCurrencyBook.parse(*sub_stream) \
for sub_stream in stream[0] ])
return self.__event_emitter.emit("f_book_update", subscription, \
serializers.FundingCurrencyBook.parse(*stream[0]))
#pylint: disable-next=inconsistent-return-statements
def __raw_book_channel_handler(self, subscription: Book, stream: List[Any]):
if subscription["symbol"].startswith("t"):
if all(isinstance(sub_stream, list) for sub_stream in stream[0]):
return self.__event_emitter.emit("t_raw_book_snapshot", subscription, \
[ serializers.TradingPairRawBook.parse(*sub_stream) \
for sub_stream in stream[0] ])
return self.__event_emitter.emit("t_raw_book_update", subscription, \
serializers.TradingPairRawBook.parse(*stream[0]))
if subscription["symbol"].startswith("f"):
if all(isinstance(sub_stream, list) for sub_stream in stream[0]):
return self.__event_emitter.emit("f_raw_book_snapshot", subscription, \
[ serializers.FundingCurrencyRawBook.parse(*sub_stream) \
for sub_stream in stream[0] ])
return self.__event_emitter.emit("f_raw_book_update", subscription, \
serializers.FundingCurrencyRawBook.parse(*stream[0]))
#pylint: disable-next=inconsistent-return-statements
def __candles_channel_handler(self, subscription: Candles, stream: List[Any]):
if all(isinstance(sub_stream, list) for sub_stream in stream[0]):
return self.__event_emitter.emit("candles_snapshot", subscription, \
[ serializers.Candle.parse(*sub_stream) \
for sub_stream in stream[0] ])
return self.__event_emitter.emit("candles_update", subscription, \
serializers.Candle.parse(*stream[0]))
#pylint: disable-next=inconsistent-return-statements
def __status_channel_handler(self, subscription: Status, stream: List[Any]):
if subscription["key"].startswith("deriv:"):
return self.__event_emitter.emit("derivatives_status_update", subscription, \
serializers.DerivativesStatus.parse(*stream[0]))
if subscription["key"].startswith("liq:"):
return self.__event_emitter.emit("liquidation_feed_update", subscription, \
serializers.Liquidation.parse(*stream[0][0]))
#pylint: disable-next=inconsistent-return-statements
def __checksum_handler(self, subscription: Book, value: int):
return self.__event_emitter.emit( \
"checksum", subscription, value & 0xFFFFFFFF)

View File

@@ -1,3 +0,0 @@
from .bfx_websocket_client import BfxWebSocketClient
from .bfx_websocket_bucket import BfxWebSocketBucket
from .bfx_websocket_inputs import BfxWebSocketInputs

View File

@@ -1,113 +0,0 @@
from typing import Literal, TypeVar, Callable, cast
import asyncio, json, uuid, websockets
from ..handlers import PublicChannelsHandler
from ..exceptions import ConnectionNotOpen, TooManySubscriptions
_HEARTBEAT = "hb"
F = TypeVar("F", bound=Callable[..., Literal[None]])
def _require_websocket_connection(function: F) -> F:
async def wrapper(self, *args, **kwargs):
if self.websocket is None or not self.websocket.open:
raise ConnectionNotOpen("No open connection with the server.")
await function(self, *args, **kwargs)
return cast(F, wrapper)
class BfxWebSocketBucket:
VERSION = 2
MAXIMUM_SUBSCRIPTIONS_AMOUNT = 25
def __init__(self, host, event_emitter, events_per_subscription):
self.host, self.event_emitter, self.events_per_subscription = host, event_emitter, events_per_subscription
self.websocket, self.subscriptions, self.pendings = None, {}, []
self.on_open_event = asyncio.locks.Event()
self.handler = PublicChannelsHandler(event_emitter=self.event_emitter, \
events_per_subscription=self.events_per_subscription)
async def connect(self):
async def _connection():
async with websockets.connect(self.host) as websocket:
self.websocket = websocket
self.on_open_event.set()
await self.__recover_state()
async for message in websocket:
message = json.loads(message)
if isinstance(message, dict):
if message["event"] == "subscribed" and (chan_id := message["chanId"]):
self.pendings = [ pending \
for pending in self.pendings if pending["subId"] != message["subId"] ]
self.subscriptions[chan_id] = message
sub_id = message["subId"]
if "subscribed" not in self.events_per_subscription.get(sub_id, []):
self.events_per_subscription.setdefault(sub_id, []).append("subscribed")
self.event_emitter.emit("subscribed", message)
elif message["event"] == "unsubscribed" and (chan_id := message["chanId"]):
if message["status"] == "OK":
del self.subscriptions[chan_id]
elif message["event"] == "error":
self.event_emitter.emit("wss-error", message["code"], message["msg"])
if isinstance(message, list):
if (chan_id := message[0]) and message[1] != _HEARTBEAT:
self.handler.handle(self.subscriptions[chan_id], *message[1:])
try:
await _connection()
except websockets.exceptions.ConnectionClosedError as error:
if error.code in (1006, 1012):
self.on_open_event.clear()
async def __recover_state(self):
for pending in self.pendings:
await self.websocket.send(json.dumps(pending))
for _, subscription in self.subscriptions.items():
await self.subscribe(sub_id=subscription.pop("subId"), **subscription)
self.subscriptions.clear()
@_require_websocket_connection
async def subscribe(self, channel, sub_id=None, **kwargs):
if len(self.subscriptions) + len(self.pendings) == BfxWebSocketBucket.MAXIMUM_SUBSCRIPTIONS_AMOUNT:
raise TooManySubscriptions("The client has reached the maximum number of subscriptions.")
subscription = {
**kwargs,
"event": "subscribe",
"channel": channel,
"subId": sub_id or str(uuid.uuid4()),
}
self.pendings.append(subscription)
await self.websocket.send(json.dumps(subscription))
@_require_websocket_connection
async def unsubscribe(self, chan_id):
await self.websocket.send(json.dumps({
"event": "unsubscribe",
"chanId": chan_id
}))
@_require_websocket_connection
async def close(self, code=1000, reason=str()):
await self.websocket.close(code=code, reason=reason)
def get_chan_id(self, sub_id):
for subscription in self.subscriptions.values():
if subscription["subId"] == sub_id:
return subscription["chanId"]

View File

@@ -1,289 +0,0 @@
from typing import cast
from collections import namedtuple
from datetime import datetime
import traceback, json, asyncio, hmac, hashlib, time, socket, random, websockets
from pyee.asyncio import AsyncIOEventEmitter
from .bfx_websocket_bucket import _HEARTBEAT, F, _require_websocket_connection, BfxWebSocketBucket
from .bfx_websocket_inputs import BfxWebSocketInputs
from ..handlers import PublicChannelsHandler, AuthenticatedEventsHandler
from ..exceptions import WebSocketAuthenticationRequired, InvalidAuthenticationCredentials, EventNotSupported, \
ZeroConnectionsError, ReconnectionTimeoutError, OutdatedClientVersion
from ...utils.json_encoder import JSONEncoder
from ...utils.logger import ColorLogger, FileLogger
def _require_websocket_authentication(function: F) -> F:
async def wrapper(self, *args, **kwargs):
if hasattr(self, "authentication") and not self.authentication:
raise WebSocketAuthenticationRequired("To perform this action you need to " \
"authenticate using your API_KEY and API_SECRET.")
await _require_websocket_connection(function)(self, *args, **kwargs)
return cast(F, wrapper)
class _Delay:
BACKOFF_MIN, BACKOFF_MAX = 1.92, 60.0
BACKOFF_INITIAL = 5.0
def __init__(self, backoff_factor):
self.__backoff_factor = backoff_factor
self.__backoff_delay = _Delay.BACKOFF_MIN
self.__initial_delay = random.random() * _Delay.BACKOFF_INITIAL
def next(self):
backoff_delay = self.peek()
__backoff_delay = self.__backoff_delay * self.__backoff_factor
self.__backoff_delay = min(__backoff_delay, _Delay.BACKOFF_MAX)
return backoff_delay
def peek(self):
return (self.__backoff_delay == _Delay.BACKOFF_MIN) \
and self.__initial_delay or self.__backoff_delay
class BfxWebSocketClient:
VERSION = BfxWebSocketBucket.VERSION
MAXIMUM_CONNECTIONS_AMOUNT = 20
ONCE_EVENTS = [
"open", "authenticated", "disconnection",
*AuthenticatedEventsHandler.ONCE_EVENTS
]
EVENTS = [
"subscribed", "wss-error",
*ONCE_EVENTS,
*PublicChannelsHandler.EVENTS,
*AuthenticatedEventsHandler.ON_EVENTS
]
def __init__(self, host, credentials, *, wss_timeout = 60 * 15, log_filename = None, log_level = "INFO"):
self.websocket, self.authentication, self.buckets = None, False, []
self.host, self.credentials, self.wss_timeout = host, credentials, wss_timeout
self.events_per_subscription = {}
self.event_emitter = AsyncIOEventEmitter()
self.handler = AuthenticatedEventsHandler(event_emitter=self.event_emitter)
self.inputs = BfxWebSocketInputs(handle_websocket_input=self.__handle_websocket_input)
if log_filename is None:
self.logger = ColorLogger("BfxWebSocketClient", level=log_level)
else: self.logger = FileLogger("BfxWebSocketClient", level=log_level, filename=log_filename)
self.event_emitter.add_listener("error",
lambda exception: self.logger.error(f"{type(exception).__name__}: {str(exception)}" + "\n" +
str().join(traceback.format_exception(type(exception), exception, exception.__traceback__))[:-1])
)
def run(self, connections = 5):
return asyncio.run(self.start(connections))
async def start(self, connections = 5):
if connections == 0:
self.logger.info("With connections set to 0 it will not be possible to subscribe to any public channel. " \
"Attempting a subscription will cause a ZeroConnectionsError to be thrown.")
if connections > BfxWebSocketClient.MAXIMUM_CONNECTIONS_AMOUNT:
self.logger.warning(f"It is not safe to use more than {BfxWebSocketClient.MAXIMUM_CONNECTIONS_AMOUNT} " \
f"buckets from the same connection ({connections} in use), the server could momentarily " \
"block the client with <429 Too Many Requests>.")
for _ in range(connections):
self.buckets += [BfxWebSocketBucket(self.host, self.event_emitter, self.events_per_subscription)]
await self.__connect()
#pylint: disable-next=too-many-statements,too-many-branches
async def __connect(self):
Reconnection = namedtuple("Reconnection", ["status", "attempts", "timestamp"])
reconnection = Reconnection(status=False, attempts=0, timestamp=None)
timer, tasks, on_timeout_event = None, [], asyncio.locks.Event()
delay = None
def _on_wss_timeout():
on_timeout_event.set()
#pylint: disable-next=too-many-branches
async def _connection():
nonlocal reconnection, timer, tasks
async with websockets.connect(self.host, ping_interval=None) as websocket:
if reconnection.status:
self.logger.info(f"Reconnection attempt successful (no.{reconnection.attempts}): The " \
f"client has been offline for a total of {datetime.now() - reconnection.timestamp} " \
f"(connection lost on: {reconnection.timestamp:%d-%m-%Y at %H:%M:%S}).")
reconnection = Reconnection(status=False, attempts=0, timestamp=None)
if isinstance(timer, asyncio.events.TimerHandle):
timer.cancel()
self.websocket = websocket
coroutines = [ BfxWebSocketBucket.connect(bucket) for bucket in self.buckets ]
tasks = [ asyncio.create_task(coroutine) for coroutine in coroutines ]
if len(self.buckets) == 0 or \
(await asyncio.gather(*[bucket.on_open_event.wait() for bucket in self.buckets])):
self.event_emitter.emit("open")
if self.credentials:
await self.__authenticate(**self.credentials)
async for message in websocket:
message = json.loads(message)
if isinstance(message, dict):
if message["event"] == "info" and "version" in message:
if BfxWebSocketClient.VERSION != message["version"]:
raise OutdatedClientVersion("Mismatch between the client version and the server " \
"version. Update the library to the latest version to continue (client version: " \
f"{BfxWebSocketClient.VERSION}, server version: {message['version']}).")
elif message["event"] == "info" and message["code"] == 20051:
rcvd = websockets.frames.Close(code=1012,
reason="Stop/Restart WebSocket Server (please reconnect).")
raise websockets.exceptions.ConnectionClosedError(rcvd=rcvd, sent=None)
elif message["event"] == "auth":
if message["status"] != "OK":
raise InvalidAuthenticationCredentials(
"Cannot authenticate with given API-KEY and API-SECRET.")
self.event_emitter.emit("authenticated", message)
self.authentication = True
elif message["event"] == "error":
self.event_emitter.emit("wss-error", message["code"], message["msg"])
if isinstance(message, list):
if message[0] == 0 and message[1] != _HEARTBEAT:
self.handler.handle(message[1], message[2])
while True:
if reconnection.status:
await asyncio.sleep(delay.next())
if on_timeout_event.is_set():
raise ReconnectionTimeoutError("Connection has been offline for too long " \
f"without being able to reconnect (wss_timeout: {self.wss_timeout}s).")
try:
await _connection()
except (websockets.exceptions.ConnectionClosedError, socket.gaierror) as error:
if isinstance(error, websockets.exceptions.ConnectionClosedError):
if error.code in (1006, 1012):
if error.code == 1006:
self.logger.error("Connection lost: no close frame received " \
"or sent (1006). Trying to reconnect...")
if error.code == 1012:
self.logger.info("WSS server is about to restart, clients need " \
"to reconnect (server sent 20051). Reconnection attempt in progress...")
for task in tasks:
task.cancel()
reconnection = Reconnection(status=True, attempts=1, timestamp=datetime.now())
if self.wss_timeout is not None:
timer = asyncio.get_event_loop().call_later(self.wss_timeout, _on_wss_timeout)
delay = _Delay(backoff_factor=1.618)
self.authentication = False
elif isinstance(error, socket.gaierror) and reconnection.status:
self.logger.warning(f"Reconnection attempt was unsuccessful (no.{reconnection.attempts}). " \
f"Next reconnection attempt in {delay.peek():.2f} seconds. (at the moment " \
f"the client has been offline for {datetime.now() - reconnection.timestamp})")
reconnection = reconnection._replace(attempts=reconnection.attempts + 1)
else: raise error
if not reconnection.status:
self.event_emitter.emit("disconnection",
self.websocket.close_code, self.websocket.close_reason)
break
async def __authenticate(self, api_key, api_secret, filters=None):
data = { "event": "auth", "filter": filters, "apiKey": api_key }
data["authNonce"] = str(round(time.time() * 1_000_000))
data["authPayload"] = "AUTH" + data["authNonce"]
data["authSig"] = hmac.new(
api_secret.encode("utf8"),
data["authPayload"].encode("utf8"),
hashlib.sha384
).hexdigest()
await self.websocket.send(json.dumps(data))
async def subscribe(self, channel, **kwargs):
if len(self.buckets) == 0:
raise ZeroConnectionsError("Unable to subscribe: the number of connections must be greater than 0.")
counters = [ len(bucket.pendings) + len(bucket.subscriptions) for bucket in self.buckets ]
index = counters.index(min(counters))
await self.buckets[index].subscribe(channel, **kwargs)
async def unsubscribe(self, sub_id):
for bucket in self.buckets:
if (chan_id := bucket.get_chan_id(sub_id)):
await bucket.unsubscribe(chan_id=chan_id)
async def close(self, code=1000, reason=str()):
for bucket in self.buckets:
await bucket.close(code=code, reason=reason)
if self.websocket is not None and self.websocket.open:
await self.websocket.close(code=code, reason=reason)
@_require_websocket_authentication
async def notify(self, info, message_id=None, **kwargs):
await self.websocket.send(json.dumps([ 0, "n", message_id, { "type": "ucm-test", "info": info, **kwargs } ]))
@_require_websocket_authentication
async def __handle_websocket_input(self, event, data):
await self.websocket.send(json.dumps([ 0, event, None, data], cls=JSONEncoder))
def on(self, *events, callback = None):
for event in events:
if event not in BfxWebSocketClient.EVENTS:
raise EventNotSupported(f"Event <{event}> is not supported. To get a list " \
"of available events print BfxWebSocketClient.EVENTS")
def _register_event(event, function):
if event in BfxWebSocketClient.ONCE_EVENTS:
self.event_emitter.once(event, function)
else: self.event_emitter.on(event, function)
if callback is not None:
for event in events:
_register_event(event, callback)
if callback is None:
def handler(function):
for event in events:
_register_event(event, function)
return handler

View File

@@ -1,94 +0,0 @@
from decimal import Decimal
from datetime import datetime
from typing import Union, Optional, List, Tuple
from ..enums import OrderType, FundingOfferType
from ...types import JSON
class BfxWebSocketInputs:
def __init__(self, handle_websocket_input):
self.__handle_websocket_input = handle_websocket_input
async def submit_order(self,
type: OrderType,
symbol: str,
amount: Union[Decimal, float, str],
*,
price: Optional[Union[Decimal, float, str]] = None,
lev: Optional[int] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None,
price_oco_stop: Optional[Union[Decimal, float, str]] = None,
gid: Optional[int] = None,
cid: Optional[int] = None,
flags: Optional[int] = 0,
tif: Optional[Union[datetime, str]] = None,
meta: Optional[JSON] = None):
await self.__handle_websocket_input("on", {
"type": type, "symbol": symbol, "amount": amount,
"price": price, "lev": lev, "price_trailing": price_trailing,
"price_aux_limit": price_aux_limit, "price_oco_stop": price_oco_stop, "gid": gid,
"cid": cid, "flags": flags, "tif": tif,
"meta": meta
})
async def update_order(self,
id: int,
*,
amount: Optional[Union[Decimal, float, str]] = None,
price: Optional[Union[Decimal, float, str]] = None,
cid: Optional[int] = None,
cid_date: Optional[str] = None,
gid: Optional[int] = None,
flags: Optional[int] = 0,
lev: Optional[int] = None,
delta: Optional[Union[Decimal, float, str]] = None,
price_aux_limit: Optional[Union[Decimal, float, str]] = None,
price_trailing: Optional[Union[Decimal, float, str]] = None,
tif: Optional[Union[datetime, str]] = None):
await self.__handle_websocket_input("ou", {
"id": id, "amount": amount, "price": price,
"cid": cid, "cid_date": cid_date, "gid": gid,
"flags": flags, "lev": lev, "delta": delta,
"price_aux_limit": price_aux_limit, "price_trailing": price_trailing, "tif": tif
})
async def cancel_order(self,
*,
id: Optional[int] = None,
cid: Optional[int] = None,
cid_date: Optional[str] = None):
await self.__handle_websocket_input("oc", {
"id": id, "cid": cid, "cid_date": cid_date
})
async def cancel_order_multi(self,
*,
ids: Optional[List[int]] = None,
cids: Optional[List[Tuple[int, str]]] = None,
gids: Optional[List[int]] = None,
all: bool = False):
await self.__handle_websocket_input("oc_multi", {
"ids": ids, "cids": cids, "gids": gids,
"all": int(all)
})
#pylint: disable-next=too-many-arguments
async def submit_funding_offer(self,
type: FundingOfferType,
symbol: str,
amount: Union[Decimal, float, str],
rate: Union[Decimal, float, str],
period: int,
*,
flags: Optional[int] = 0):
await self.__handle_websocket_input("fon", {
"type": type, "symbol": symbol, "amount": amount,
"rate": rate, "period": period, "flags": flags
})
async def cancel_funding_offer(self, id: int):
await self.__handle_websocket_input("foc", { "id": id })
async def calc(self, *args: str):
await self.__handle_websocket_input("calc", list(map(lambda arg: [arg], args)))

View File

@@ -1,9 +0,0 @@
#pylint: disable-next=wildcard-import,unused-wildcard-import
from ..enums import *
class Channel(str, Enum):
TICKER = "ticker"
TRADES = "trades"
BOOK = "book"
CANDLES = "candles"
STATUS = "status"

View File

@@ -1,59 +1,25 @@
from ..exceptions import BfxBaseException
from bfxapi.exceptions import BfxBaseException
__all__ = [
"BfxWebSocketException",
class ConnectionNotOpen(BfxBaseException):
pass
"ConnectionNotOpen",
"TooManySubscriptions",
"ZeroConnectionsError",
"ReconnectionTimeoutError",
"WebSocketAuthenticationRequired",
"InvalidAuthenticationCredentials",
"EventNotSupported",
"OutdatedClientVersion"
]
class ActionRequiresAuthentication(BfxBaseException):
pass
class BfxWebSocketException(BfxBaseException):
"""
Base class for all custom exceptions in bfxapi/websocket/exceptions.py.
"""
class ReconnectionTimeoutError(BfxBaseException):
pass
class ConnectionNotOpen(BfxWebSocketException):
"""
This error indicates an attempt to communicate via websocket before starting the connection with the servers.
"""
class VersionMismatchError(BfxBaseException):
pass
class TooManySubscriptions(BfxWebSocketException):
"""
This error indicates a subscription attempt after reaching the limit of simultaneous connections.
"""
class SubIdError(BfxBaseException):
pass
class ZeroConnectionsError(BfxWebSocketException):
"""
This error indicates an attempt to subscribe to a public channel while the number of connections is 0.
"""
class UnknownChannelError(BfxBaseException):
pass
class ReconnectionTimeoutError(BfxWebSocketException):
"""
This error indicates that the connection has been offline for too long without being able to reconnect.
"""
class UnknownEventError(BfxBaseException):
pass
class WebSocketAuthenticationRequired(BfxWebSocketException):
"""
This error indicates an attempt to access a protected resource without logging in first.
"""
class InvalidAuthenticationCredentials(BfxWebSocketException):
"""
This error indicates that the user has provided incorrect credentials (API-KEY and API-SECRET) for authentication.
"""
class EventNotSupported(BfxWebSocketException):
"""
This error indicates a failed attempt to subscribe to an event not supported by the BfxWebSocketClient.
"""
class OutdatedClientVersion(BfxWebSocketException):
"""
This error indicates a mismatch between the client version and the server WSS version.
"""
class UnknownSubscriptionError(BfxBaseException):
pass

View File

@@ -1,2 +0,0 @@
from .public_channels_handler import PublicChannelsHandler
from .authenticated_events_handler import AuthenticatedEventsHandler

View File

@@ -1,70 +0,0 @@
from ...types import serializers
from ...types.serializers import _Notification
class AuthenticatedEventsHandler:
__once_abbreviations = {
"os": "order_snapshot", "ps": "position_snapshot", "fos": "funding_offer_snapshot",
"fcs": "funding_credit_snapshot", "fls": "funding_loan_snapshot", "ws": "wallet_snapshot"
}
__on_abbreviations = {
"on": "order_new", "ou": "order_update", "oc": "order_cancel",
"pn": "position_new", "pu": "position_update", "pc": "position_close",
"fon": "funding_offer_new", "fou": "funding_offer_update", "foc": "funding_offer_cancel",
"fcn": "funding_credit_new", "fcu": "funding_credit_update", "fcc": "funding_credit_close",
"fln": "funding_loan_new", "flu": "funding_loan_update", "flc": "funding_loan_close",
"te": "trade_execution", "tu": "trade_execution_update", "wu": "wallet_update"
}
__abbreviations = {
**__once_abbreviations,
**__on_abbreviations
}
__serializers = {
("os", "on", "ou", "oc",): serializers.Order,
("ps", "pn", "pu", "pc",): serializers.Position,
("te", "tu"): serializers.Trade,
("fos", "fon", "fou", "foc",): serializers.FundingOffer,
("fcs", "fcn", "fcu", "fcc",): serializers.FundingCredit,
("fls", "fln", "flu", "flc",): serializers.FundingLoan,
("ws", "wu",): serializers.Wallet
}
ONCE_EVENTS = [
*list(__once_abbreviations.values())
]
ON_EVENTS = [
*list(__on_abbreviations.values()),
"notification", "on-req-notification", "ou-req-notification",
"oc-req-notification", "fon-req-notification", "foc-req-notification"
]
def __init__(self, event_emitter):
self.event_emitter = event_emitter
def handle(self, abbrevation, stream):
if abbrevation == "n":
return self.__notification(stream)
for abbrevations, serializer in AuthenticatedEventsHandler.__serializers.items():
if abbrevation in abbrevations:
event = AuthenticatedEventsHandler.__abbreviations[abbrevation]
if all(isinstance(substream, list) for substream in stream):
return self.event_emitter.emit(event, [ serializer.parse(*substream) for substream in stream ])
return self.event_emitter.emit(event, serializer.parse(*stream))
def __notification(self, stream):
event, serializer = "notification", _Notification(serializer=None)
if stream[1] == "on-req" or stream[1] == "ou-req" or stream[1] == "oc-req":
event, serializer = f"{stream[1]}-notification", _Notification(serializer=serializers.Order)
if stream[1] == "fon-req" or stream[1] == "foc-req":
event, serializer = f"{stream[1]}-notification", _Notification(serializer=serializers.FundingOffer)
return self.event_emitter.emit(event, serializer.parse(*stream))

View File

@@ -1,139 +0,0 @@
from ...types import serializers
class PublicChannelsHandler:
ONCE_PER_SUBSCRIPTION_EVENTS = [
"t_trades_snapshot", "f_trades_snapshot", "t_book_snapshot",
"f_book_snapshot", "t_raw_book_snapshot", "f_raw_book_snapshot",
"candles_snapshot"
]
EVENTS = [
*ONCE_PER_SUBSCRIPTION_EVENTS,
"t_ticker_update", "f_ticker_update", "t_trade_execution",
"t_trade_execution_update", "f_trade_execution", "f_trade_execution_update",
"t_book_update", "f_book_update", "t_raw_book_update",
"f_raw_book_update", "candles_update", "derivatives_status_update"
]
def __init__(self, event_emitter, events_per_subscription):
self.__event_emitter, self.__events_per_subscription = \
event_emitter, events_per_subscription
self.__handlers = {
"ticker": self.__ticker_channel_handler,
"trades": self.__trades_channel_handler,
"book": self.__book_channel_handler,
"candles": self.__candles_channel_handler,
"status": self.__status_channel_handler
}
def handle(self, subscription, *stream):
#pylint: disable-next=unnecessary-lambda-assignment
_clear = lambda dictionary, *args: { key: value for key, value in dictionary.items() if key not in args }
#pylint: disable-next=consider-iterating-dictionary
if (channel := subscription["channel"]) and channel in self.__handlers.keys():
return self.__handlers[channel](_clear(subscription, "event", "channel", "chanId"), *stream)
def __emit(self, event, sub, data):
sub_id, should_emit_event = sub["subId"], True
if event in PublicChannelsHandler.ONCE_PER_SUBSCRIPTION_EVENTS:
if sub_id not in self.__events_per_subscription:
self.__events_per_subscription[sub_id] = [ event ]
elif event not in self.__events_per_subscription[sub_id]:
self.__events_per_subscription[sub_id] += [ event ]
else: should_emit_event = False
if should_emit_event:
return self.__event_emitter.emit(event, sub, data)
def __ticker_channel_handler(self, subscription, *stream):
if subscription["symbol"].startswith("t"):
return self.__emit(
"t_ticker_update",
subscription,
serializers.TradingPairTicker.parse(*stream[0])
)
if subscription["symbol"].startswith("f"):
return self.__emit(
"f_ticker_update",
subscription,
serializers.FundingCurrencyTicker.parse(*stream[0])
)
def __trades_channel_handler(self, subscription, *stream):
if (event := stream[0]) and event in [ "te", "tu", "fte", "ftu" ]:
if subscription["symbol"].startswith("t"):
return self.__emit(
{ "te": "t_trade_execution", "tu": "t_trade_execution_update" }[event],
subscription,
serializers.TradingPairTrade.parse(*stream[1])
)
if subscription["symbol"].startswith("f"):
return self.__emit(
{ "fte": "f_trade_execution", "ftu": "f_trade_execution_update" }[event],
subscription,
serializers.FundingCurrencyTrade.parse(*stream[1])
)
if subscription["symbol"].startswith("t"):
return self.__emit(
"t_trades_snapshot",
subscription,
[ serializers.TradingPairTrade.parse(*substream) for substream in stream[0] ]
)
if subscription["symbol"].startswith("f"):
return self.__emit(
"f_trades_snapshot",
subscription,
[ serializers.FundingCurrencyTrade.parse(*substream) for substream in stream[0] ]
)
def __book_channel_handler(self, subscription, *stream):
event = subscription["symbol"][0]
if subscription["prec"] == "R0":
_trading_pair_serializer, _funding_currency_serializer, is_raw_book = \
serializers.TradingPairRawBook, serializers.FundingCurrencyRawBook, True
else: _trading_pair_serializer, _funding_currency_serializer, is_raw_book = \
serializers.TradingPairBook, serializers.FundingCurrencyBook, False
if all(isinstance(substream, list) for substream in stream[0]):
return self.__emit(
event + "_" + (is_raw_book and "raw_book" or "book") + "_snapshot",
subscription,
[ { "t": _trading_pair_serializer, "f": _funding_currency_serializer }[event] \
.parse(*substream) for substream in stream[0] ]
)
return self.__emit(
event + "_" + (is_raw_book and "raw_book" or "book") + "_update",
subscription,
{ "t": _trading_pair_serializer, "f": _funding_currency_serializer }[event].parse(*stream[0])
)
def __candles_channel_handler(self, subscription, *stream):
if all(isinstance(substream, list) for substream in stream[0]):
return self.__emit(
"candles_snapshot",
subscription,
[ serializers.Candle.parse(*substream) for substream in stream[0] ]
)
return self.__emit(
"candles_update",
subscription,
serializers.Candle.parse(*stream[0])
)
def __status_channel_handler(self, subscription, *stream):
if subscription["key"].startswith("deriv:"):
return self.__emit(
"derivatives_status_update",
subscription,
serializers.DerivativesStatus.parse(*stream[0])
)

View File

@@ -1,43 +1,34 @@
from typing import TypedDict, Union, Literal, Optional
from typing import \
Union, Literal, TypedDict
__all__ = [
"Subscription",
Subscription = Union["Ticker", "Trades", "Book", "Candles", "Status"]
"Ticker",
"Trades",
"Book",
"Candles",
"Status"
]
_Header = TypedDict("_Header", { "event": Literal["subscribed"], "channel": str, "chanId": int })
Subscription = Union[_Header, "Ticker", "Trades", "Book", "Candles", "Status"]
Channel = Literal["ticker", "trades", "book", "candles", "status"]
class Ticker(TypedDict):
subId: str
channel: Literal["ticker"]
sub_id: str
symbol: str
pair: Optional[str]
currency: Optional[str]
class Trades(TypedDict):
subId: str
channel: Literal["trades"]
sub_id: str
symbol: str
pair: Optional[str]
currency: Optional[str]
class Book(TypedDict):
subId: str
channel: Literal["book"]
sub_id: str
symbol: str
prec: str
freq: str
len: str
pair: str
prec: Literal["R0", "P0", "P1", "P2", "P3", "P4"]
freq: Literal["F0", "F1"]
len: Literal["1", "25", "100", "250"]
class Candles(TypedDict):
subId: str
channel: Literal["candles"]
sub_id: str
key: str
class Status(TypedDict):
subId: str
channel: Literal["status"]
sub_id: str
key: str

Binary file not shown.

View File

@@ -1,19 +1,19 @@
# python -c "import examples.rest.authenticated.claim_position"
# python -c "import examples.rest.auth.claim_position"
import os
from bfxapi import Client, REST_HOST
from bfxapi import Client
from bfxapi.types import Notification, PositionClaim
bfx = Client(
rest_host=REST_HOST,
api_key=os.getenv("BFX_API_KEY"),
api_secret=os.getenv("BFX_API_SECRET")
)
# Claims all active positions
for position in bfx.rest.auth.get_positions():
notification: Notification[PositionClaim] = bfx.rest.auth.claim_position(position.position_id)
notification: Notification[PositionClaim] = bfx.rest.auth.claim_position(
position.position_id
)
claim: PositionClaim = notification.data
print(f"Position: {position} | PositionClaim: {claim}")

View File

@@ -1,16 +1,19 @@
# python -c "import examples.rest.authenticated.get_wallets"
# python -c "import examples.rest.auth.get_wallets"
import os
from typing import List
from bfxapi import Client, REST_HOST
from bfxapi.types import Wallet, Transfer, DepositAddress, \
LightningNetworkInvoice, Withdrawal, Notification
from bfxapi import Client
from bfxapi.types import (
DepositAddress,
LightningNetworkInvoice,
Notification,
Transfer,
Wallet,
Withdrawal,
)
bfx = Client(
rest_host=REST_HOST,
api_key=os.getenv("BFX_API_KEY"),
api_secret=os.getenv("BFX_API_SECRET")
)
@@ -20,26 +23,35 @@ wallets: List[Wallet] = bfx.rest.auth.get_wallets()
# Transfers funds (0.001 ETH) from exchange wallet to funding wallet
A: Notification[Transfer] = bfx.rest.auth.transfer_between_wallets(
from_wallet="exchange", to_wallet="funding", currency="ETH",
currency_to="ETH", amount=0.001)
from_wallet="exchange",
to_wallet="funding",
currency="ETH",
currency_to="ETH",
amount=0.001,
)
print("Transfer:", A.data)
# Retrieves the deposit address for bitcoin currency in exchange wallet.
B: Notification[DepositAddress] = bfx.rest.auth.get_deposit_address(
wallet="exchange", method="bitcoin", renew=False)
wallet="exchange", method="bitcoin", op_renew=False
)
print("Deposit address:", B.data)
# Generates a lightning network deposit invoice
C: Notification[LightningNetworkInvoice] = bfx.rest.auth.generate_deposit_invoice(
wallet="funding", currency="LNX", amount=0.001)
wallet="funding", currency="LNX", amount=0.001
)
print("Lightning network invoice:", C.data)
# Withdraws 1.0 UST from user's exchange wallet to address 0x742d35Cc6634C0532925a3b844Bc454e4438f44e
# Withdraws 1.0 UST from user's exchange wallet to address 0x742d35...
D: Notification[Withdrawal] = bfx.rest.auth.submit_wallet_withdrawal(
wallet="exchange", method="tetheruse", address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e",
amount=1.0)
wallet="exchange",
method="tetheruse",
address="0x742d35Cc6634C0532925a3b844Bc454e4438f44e",
amount=1.0,
)
print("Withdrawal:", D.data)

View File

@@ -1,36 +1,39 @@
# python -c "import examples.rest.authenticated.set_derivatives_position_collateral"
# python -c "import examples.rest.auth.set_derivative_position_collateral"
import os
from bfxapi import Client, REST_HOST
from bfxapi.types import DerivativePositionCollateral, DerivativePositionCollateralLimits
from bfxapi import Client
from bfxapi.types import (
DerivativePositionCollateral,
DerivativePositionCollateralLimits,
)
bfx = Client(
rest_host=REST_HOST,
api_key=os.getenv("BFX_API_KEY"),
api_secret=os.getenv("BFX_API_SECRET")
)
submit_order_notification = bfx.rest.auth.submit_order(
type="LIMIT",
symbol="tBTCF0:USTF0",
amount="0.015",
price="16700",
lev=10
type="LIMIT", symbol="tBTCF0:USTF0", amount="0.015", price="16700", lev=10
)
print("New Order:", submit_order_notification.data)
# Update the amount of collateral for tBTCF0:USTF0 derivative position
derivative_position_collateral: DerivativePositionCollateral = \
bfx.rest.auth.set_derivative_position_collateral(symbol="tBTCF0:USTF0", collateral=50.0)
derivative_position_collateral: DerivativePositionCollateral = (
bfx.rest.auth.set_derivative_position_collateral(
symbol="tBTCF0:USTF0", collateral=50.0
)
)
print("Status:", bool(derivative_position_collateral.status))
# Calculate the minimum and maximum collateral that can be assigned to tBTCF0:USTF0.
derivative_position_collateral_limits: DerivativePositionCollateralLimits = \
derivative_position_collateral_limits: DerivativePositionCollateralLimits = (
bfx.rest.auth.get_derivative_position_collateral_limits(symbol="tBTCF0:USTF0")
)
print(f"Minimum collateral: {derivative_position_collateral_limits.min_collateral} | " \
f"Maximum collateral: {derivative_position_collateral_limits.max_collateral}")
print(
f"Minimum collateral: {derivative_position_collateral_limits.min_collateral} | "
f"Maximum collateral: {derivative_position_collateral_limits.max_collateral}"
)

View File

@@ -1,25 +1,18 @@
# python -c "import examples.rest.authenticated.submit_funding_offer"
# python -c "import examples.rest.auth.submit_funding_offer"
import os
from bfxapi import Client, REST_HOST
from bfxapi.types import Notification, FundingOffer
from bfxapi.enums import FundingOfferType, Flag
from bfxapi import Client
from bfxapi.types import FundingOffer, Notification
bfx = Client(
rest_host=REST_HOST,
api_key=os.getenv("BFX_API_KEY"),
api_secret=os.getenv("BFX_API_SECRET")
)
# Submit a new funding offer
notification: Notification[FundingOffer] = bfx.rest.auth.submit_funding_offer(
type=FundingOfferType.LIMIT,
symbol="fUSD",
amount=123.45,
rate=0.001,
period=2,
flags=Flag.HIDDEN
type="LIMIT", symbol="fUSD", amount=123.45, rate=0.001, period=2
)
print("Funding Offer notification:", notification)

View File

@@ -1,24 +1,18 @@
# python -c "import examples.rest.authenticated.submit_order"
# python -c "import examples.rest.auth.submit_order"
import os
from bfxapi import Client, REST_HOST
from bfxapi import Client
from bfxapi.types import Notification, Order
from bfxapi.enums import OrderType, Flag
bfx = Client(
rest_host=REST_HOST,
api_key=os.getenv("BFX_API_KEY"),
api_secret=os.getenv("BFX_API_SECRET")
)
# Submit a new order
submit_order_notification: Notification[Order] = bfx.rest.auth.submit_order(
type=OrderType.EXCHANGE_LIMIT,
symbol="tBTCUST",
amount=0.015,
price=10000,
flags=Flag.HIDDEN + Flag.OCO + Flag.CLOSE
type="EXCHANGE LIMIT", symbol="tBTCUST", amount=0.015, price=10000
)
print("Submit order notification:", submit_order_notification)
@@ -27,16 +21,12 @@ order: Order = submit_order_notification.data
# Update its amount and its price
update_order_notification: Notification[Order] = bfx.rest.auth.update_order(
id=order.id,
amount=0.020,
price=10150
id=order.id, amount=0.020, price=10150
)
print("Update order notification:", update_order_notification)
# Cancel it by its ID
cancel_order_notification: Notification[Order] = bfx.rest.auth.cancel_order(
id=order.id
)
cancel_order_notification: Notification[Order] = bfx.rest.auth.cancel_order(id=order.id)
print("Cancel order notification:", cancel_order_notification)

View File

@@ -1,15 +1,12 @@
# python -c "import examples.rest.authenticated.toggle_keep_funding"
# python -c "import examples.rest.auth.toggle_keep_funding"
import os
from typing import List
from bfxapi import Client, REST_HOST
from bfxapi import Client
from bfxapi.types import FundingLoan, Notification
bfx = Client(
rest_host=REST_HOST,
api_key=os.getenv("BFX_API_KEY"),
api_secret=os.getenv("BFX_API_SECRET")
)
@@ -18,9 +15,7 @@ loans: List[FundingLoan] = bfx.rest.auth.get_funding_loans(symbol="fUSD")
# Set every loan's keep funding status to <off> (1: <on>, 2: <off>)
notification: Notification[None] = bfx.rest.auth.toggle_keep_funding(
type="loan",
ids=[ loan.id for loan in loans ],
changes={ loan.id: 2 for loan in loans }
type="loan", ids=[loan.id for loan in loans], changes={loan.id: 2 for loan in loans}
)
print("Toggle keep funding notification:", notification)

View File

@@ -2,27 +2,28 @@
import os
from bfxapi import Client, REST_HOST
from bfxapi.rest.enums import MerchantSettingsKey
from bfxapi import Client
bfx = Client(
rest_host=REST_HOST,
api_key=os.getenv("BFX_API_KEY"),
api_secret=os.getenv("BFX_API_SECRET")
)
if not bfx.rest.merchant.set_merchant_settings(MerchantSettingsKey.RECOMMEND_STORE, 1):
print(f"Cannot set <{MerchantSettingsKey.RECOMMEND_STORE}> to <1>.")
if not bfx.rest.merchant.set_merchant_settings("bfx_pay_recommend_store", 1):
print("Cannot set <bfx_pay_recommend_store> to <1>.")
print(f"The current <{MerchantSettingsKey.PREFERRED_FIAT}> value is:",
bfx.rest.merchant.get_merchant_settings(MerchantSettingsKey.PREFERRED_FIAT))
print(
"The current <bfx_pay_preferred_fiat> value is:",
bfx.rest.merchant.get_merchant_settings("bfx_pay_preferred_fiat"),
)
settings = bfx.rest.merchant.list_merchant_settings([
MerchantSettingsKey.DUST_BALANCE_UI,
MerchantSettingsKey.MERCHANT_CUSTOMER_SUPPORT_URL,
MerchantSettingsKey.MERCHANT_UNDERPAID_THRESHOLD
])
settings = bfx.rest.merchant.list_merchant_settings(
[
"bfx_pay_dust_balance_ui",
"bfx_pay_merchant_customer_support_url",
"bfx_pay_merchant_underpaid_threshold",
]
)
for key, value in settings.items():
print(f"<{key}>:", value)

View File

@@ -2,12 +2,10 @@
import os
from bfxapi import Client, REST_HOST
from bfxapi import Client
from bfxapi.types import InvoiceSubmission
bfx = Client(
rest_host=REST_HOST,
api_key=os.getenv("BFX_API_KEY"),
api_secret=os.getenv("BFX_API_SECRET")
)
@@ -20,7 +18,7 @@ customer_info = {
"residStreet": "5-6 Leicester Square",
"residBuildingNo": "23 A",
"fullName": "John Doe",
"email": "john@example.com"
"email": "john@example.com",
}
invoice: InvoiceSubmission = bfx.rest.merchant.submit_invoice(
@@ -29,17 +27,19 @@ invoice: InvoiceSubmission = bfx.rest.merchant.submit_invoice(
order_id="test",
customer_info=customer_info,
pay_currencies=["ETH"],
duration=86400 * 10
duration=86400,
)
print("Invoice submission:", invoice)
print(bfx.rest.merchant.complete_invoice(
id=invoice.id,
pay_currency="ETH",
deposit_id=1
))
print(
bfx.rest.merchant.complete_invoice(id=invoice.id, pay_currency="ETH", deposit_id=1)
)
print(bfx.rest.merchant.get_invoices(limit=25))
print(bfx.rest.merchant.get_invoices_paginated(page=1, page_size=60, sort="asc", sort_field="t"))
print(
bfx.rest.merchant.get_invoices_paginated(
page=1, page_size=60, sort="asc", sort_field="t"
)
)

View File

@@ -2,14 +2,19 @@
from typing import List
from bfxapi import Client, PUB_REST_HOST
from bfxapi import Client
from bfxapi.types import (
FundingCurrencyBook,
FundingCurrencyRawBook,
TradingPairBook,
TradingPairRawBook,
)
from bfxapi.types import TradingPairBook, TradingPairRawBook, \
FundingCurrencyBook, FundingCurrencyRawBook
bfx = Client()
bfx = Client(rest_host=PUB_REST_HOST)
t_book: List[TradingPairBook] = bfx.rest.public.get_t_book("tBTCUSD", precision="P0", len=25)
t_book: List[TradingPairBook] = bfx.rest.public.get_t_book(
"tBTCUSD", precision="P0", len=25
)
print("25 price points of tBTCUSD order book (with precision P0):", t_book)
@@ -17,7 +22,9 @@ t_raw_book: List[TradingPairRawBook] = bfx.rest.public.get_t_raw_book("tBTCUSD")
print("tBTCUSD raw order book:", t_raw_book)
f_book: List[FundingCurrencyBook] = bfx.rest.public.get_f_book("fUSD", precision="P0", len=25)
f_book: List[FundingCurrencyBook] = bfx.rest.public.get_f_book(
"fUSD", precision="P0", len=25
)
print("25 price points of fUSD order book (with precision P0):", f_book)

View File

@@ -1,18 +1,14 @@
# python -c "import examples.rest.public.conf"
from bfxapi import Client, PUB_REST_HOST
from bfxapi import Client
from bfxapi.rest.enums import Config
bfx = Client()
bfx = Client(rest_host=PUB_REST_HOST)
# Prints a map from symbols to their API symbols
print(bfx.rest.public.conf("pub:map:currency:sym"))
print("Available configs:", [ config.value for config in Config ])
# Prints all the available exchange trading pairs
print(bfx.rest.public.conf("pub:list:pair:exchange"))
# Prints a map from symbols to their API symbols (pub:map:currency:sym)
print (bfx.rest.public.conf(Config.MAP_CURRENCY_SYM))
# Prints all the available exchange trading pairs (pub:list:pair:exchange)
print(bfx.rest.public.conf(Config.LIST_PAIR_EXCHANGE))
# Prints all the available funding currencies (pub:list:currency)
print(bfx.rest.public.conf(Config.LIST_CURRENCY))
# Prints all the available funding currencies
print(bfx.rest.public.conf("pub:list:currency"))

View File

@@ -1,11 +1,14 @@
# python -c "import examples.rest.public.get_candles_hist"
from bfxapi import Client, PUB_REST_HOST
from bfxapi import Client
bfx = Client(rest_host=PUB_REST_HOST)
bfx = Client()
print(f"Candles: {bfx.rest.public.get_candles_hist(symbol='tBTCUSD')}")
# Be sure to specify a period or aggregated period when retrieving funding candles.
# If you wish to mimic the candles found in the UI, use the following setup to aggregate all funding candles: a30:p2:p30
print(f"Candles: {bfx.rest.public.get_candles_hist(tf='15m', symbol='fUSD:a30:p2:p30')}")
# If you wish to mimic the candles found in the UI, use the following setup
# to aggregate all funding candles: a30:p2:p30
print(
f"Candles: {bfx.rest.public.get_candles_hist(tf='15m', symbol='fUSD:a30:p2:p30')}"
)

View File

@@ -1,20 +1,20 @@
# python -c "import examples.rest.public.pulse_endpoints"
import datetime
from typing import List
from bfxapi import Client, PUB_REST_HOST
from bfxapi import Client
from bfxapi.types import PulseMessage, PulseProfile
bfx = Client(rest_host=PUB_REST_HOST)
bfx = Client()
# POSIX timestamp in milliseconds (check https://currentmillis.com/)
end = datetime.datetime(2020, 5, 2).timestamp() * 1000
# Retrieves 25 pulse messages up to 2020/05/02
messages: List[PulseMessage] = bfx.rest.public.get_pulse_message_history(end=end, limit=25)
messages: List[PulseMessage] = bfx.rest.public.get_pulse_message_history(
end=end, limit=25
)
for message in messages:
print(f"Message author: {message.profile.nickname} ({message.profile.puid})")
@@ -23,4 +23,7 @@ for message in messages:
profile: PulseProfile = bfx.rest.public.get_pulse_profile_details("News")
URL = profile.picture.replace("size", "small")
print(f"<{profile.nickname}>'s profile picture: https://s3-eu-west-1.amazonaws.com/bfx-pub/{URL}")
print(
f"<{profile.nickname}>'s profile picture:"
f" https://s3-eu-west-1.amazonaws.com/bfx-pub/{URL}"
)

View File

@@ -1,24 +1,22 @@
# python -c "import examples.rest.public.rest_calculation_endpoints"
from bfxapi import Client, PUB_REST_HOST
from bfxapi import Client
from bfxapi.types import FundingMarketAveragePrice, FxRate, TradingMarketAveragePrice
from bfxapi.types import TradingMarketAveragePrice, FundingMarketAveragePrice, FxRate
bfx = Client()
bfx = Client(rest_host=PUB_REST_HOST)
trading_market_average_price: TradingMarketAveragePrice = bfx.rest.public.get_trading_market_average_price(
symbol="tBTCUSD",
amount=-100,
price_limit=20000.5
trading_market_average_price: TradingMarketAveragePrice = (
bfx.rest.public.get_trading_market_average_price(
symbol="tBTCUSD", amount=-100, price_limit=20000.5
)
)
print("Average execution price for tBTCUSD:", trading_market_average_price.price_avg)
funding_market_average_price: FundingMarketAveragePrice = bfx.rest.public.get_funding_market_average_price(
symbol="fUSD",
amount=100,
period=2,
rate_limit=0.00015
funding_market_average_price: FundingMarketAveragePrice = (
bfx.rest.public.get_funding_market_average_price(
symbol="fUSD", amount=100, period=2, rate_limit=0.00015
)
)
print("Average execution rate for fUSD:", funding_market_average_price.rate_avg)

View File

@@ -2,18 +2,19 @@
from typing import List
from bfxapi import Client, PUB_REST_HOST
from bfxapi.types import TradingPairTrade, FundingCurrencyTrade
from bfxapi.rest.enums import Sort
from bfxapi import Client
from bfxapi.types import FundingCurrencyTrade, TradingPairTrade
bfx = Client(rest_host=PUB_REST_HOST)
bfx = Client()
t_trades: List[TradingPairTrade] = bfx.rest.public.get_t_trades("tBTCUSD", \
limit=15, sort=Sort.ASCENDING)
t_trades: List[TradingPairTrade] = bfx.rest.public.get_t_trades(
"tBTCUSD", limit=15, sort=+1
)
print("Latest 15 trades for tBTCUSD (in ascending order):", t_trades)
f_trades: List[FundingCurrencyTrade] = bfx.rest.public.get_f_trades("fUSD", \
limit=15, sort=Sort.DESCENDING)
f_trades: List[FundingCurrencyTrade] = bfx.rest.public.get_f_trades(
"fUSD", limit=15, sort=-1
)
print("Latest 15 trades for fUSD (in descending order):", f_trades)

View File

@@ -1,44 +1,40 @@
# python -c "import examples.websocket.authenticated.submit_order"
# python -c "import examples.websocket.auth.submit_order"
import os
from bfxapi import Client, WSS_HOST
from bfxapi.enums import Error, OrderType
from bfxapi import Client
from bfxapi.types import Notification, Order
bfx = Client(
wss_host=WSS_HOST,
api_key=os.getenv("BFX_API_KEY"),
api_secret=os.getenv("BFX_API_SECRET")
api_secret=os.getenv("BFX_API_SECRET"),
)
@bfx.wss.on("wss-error")
def on_wss_error(code: Error, msg: str):
print(code, msg)
@bfx.wss.on("authenticated")
async def on_authenticated(event):
print(f"Authentication: {event}")
await bfx.wss.inputs.submit_order(
type=OrderType.EXCHANGE_LIMIT,
symbol="tBTCUSD",
amount="0.1",
price="10000.0"
type="EXCHANGE LIMIT", symbol="tBTCUSD", amount=0.165212, price=30264.0
)
print("The order has been sent.")
@bfx.wss.on("on-req-notification")
async def on_notification(notification: Notification[Order]):
print(f"Notification: {notification}.")
@bfx.wss.on("order_new")
async def on_order_new(order_new: Order):
print(f"Order new: {order_new}")
@bfx.wss.on("subscribed")
def on_subscribed(subscription):
print(f"Subscription successful for <{subscription}>.")
bfx.wss.run()

View File

@@ -1,22 +1,17 @@
# python -c "import examples.websocket.authenticated.wallets"
# python -c "import examples.websocket.auth.wallets"
import os
from typing import List
from bfxapi import Client
from bfxapi.enums import Error
from bfxapi.types import Wallet
bfx = Client(
api_key=os.getenv("BFX_API_KEY"),
api_secret=os.getenv("BFX_API_SECRET"),
filters=["wallet"]
filters=["wallet"],
)
@bfx.wss.on("wss-error")
def on_wss_error(code: Error, msg: str):
print(code, msg)
@bfx.wss.on("wallet_snapshot")
def on_wallet_snapshot(wallets: List[Wallet]):
@@ -25,8 +20,10 @@ def on_wallet_snapshot(wallets: List[Wallet]):
print(f"Available balance: {wallet.available_balance}")
print(f"Wallet trade details: {wallet.trade_details}")
@bfx.wss.on("wallet_update")
def on_wallet_update(wallet: Wallet):
print(f"Wallet update: {wallet}")
bfx.wss.run()

View File

@@ -1,23 +1,20 @@
# python -c "import examples.websocket.public.derivatives_status"
from bfxapi import Client, PUB_WSS_HOST
from bfxapi import Client
from bfxapi.types import DerivativesStatus
from bfxapi.websocket.subscriptions import Status
from bfxapi.websocket.enums import Error, Channel
bfx = Client()
bfx = Client(wss_host=PUB_WSS_HOST)
@bfx.wss.on("derivatives_status_update")
def on_derivatives_status_update(subscription: Status, data: DerivativesStatus):
print(f"{subscription}:", data)
@bfx.wss.on("wss-error")
def on_wss_error(code: Error, msg: str):
print(code, msg)
@bfx.wss.on("open")
async def on_open():
await bfx.wss.subscribe(Channel.STATUS, key="deriv:tBTCF0:USTF0")
await bfx.wss.subscribe("status", key="deriv:tBTCF0:USTF0")
bfx.wss.run()

View File

@@ -1,23 +1,22 @@
# python -c "import examples.websocket.public.order_book"
import zlib
from collections import OrderedDict
from typing import Dict, List
from typing import List
from bfxapi import Client, PUB_WSS_HOST
from bfxapi import Client
from bfxapi.types import TradingPairBook
from bfxapi.websocket.subscriptions import Book
from bfxapi.websocket.enums import Channel, Error
class OrderBook:
def __init__(self, symbols: List[str]):
self.__order_book = {
symbol: {
"bids": OrderedDict(), "asks": OrderedDict()
} for symbol in symbols
symbol: {"bids": OrderedDict(), "asks": OrderedDict()} for symbol in symbols
}
self.cooldown: Dict[str, bool] = {symbol: False for symbol in symbols}
def update(self, symbol: str, data: TradingPairBook) -> None:
price, count, amount = data.price, data.count, data.amount
@@ -25,41 +24,93 @@ class OrderBook:
if count > 0:
self.__order_book[symbol][kind][price] = {
"price": price,
"price": price,
"count": count,
"amount": amount
"amount": amount,
}
if count == 0:
if price in self.__order_book[symbol][kind]:
del self.__order_book[symbol][kind][price]
SYMBOLS = [ "tBTCUSD", "tLTCUSD", "tLTCBTC", "tETHUSD", "tETHBTC" ]
def verify(self, symbol: str, checksum: int) -> bool:
values: List[int] = []
bids = sorted(
[
(data["price"], data["count"], data["amount"])
for _, data in self.__order_book[symbol]["bids"].items()
],
key=lambda data: -data[0],
)
asks = sorted(
[
(data["price"], data["count"], data["amount"])
for _, data in self.__order_book[symbol]["asks"].items()
],
key=lambda data: data[0],
)
if len(bids) < 25 or len(asks) < 25:
raise AssertionError("Not enough bids or asks (need at least 25).")
for _i in range(25):
bid, ask = bids[_i], asks[_i]
values.extend([bid[0], bid[2]])
values.extend([ask[0], ask[2]])
local = ":".join(str(value) for value in values)
crc32 = zlib.crc32(local.encode("UTF-8"))
return crc32 == checksum
SYMBOLS = ["tLTCBTC", "tETHUSD", "tETHBTC"]
order_book = OrderBook(symbols=SYMBOLS)
bfx = Client(wss_host=PUB_WSS_HOST)
bfx = Client()
@bfx.wss.on("wss-error")
def on_wss_error(code: Error, msg: str):
print(code, msg)
@bfx.wss.on("open")
async def on_open():
for symbol in SYMBOLS:
await bfx.wss.subscribe(Channel.BOOK, symbol=symbol)
await bfx.wss.subscribe("book", symbol=symbol)
@bfx.wss.on("subscribed")
def on_subscribed(subscription):
print(f"Subscription successful for pair <{subscription['pair']}>")
print(f"Subscription successful for symbol <{subscription['symbol']}>")
@bfx.wss.on("t_book_snapshot")
def on_t_book_snapshot(subscription: Book, snapshot: List[TradingPairBook]):
for data in snapshot:
order_book.update(subscription["symbol"], data)
@bfx.wss.on("t_book_update")
def on_t_book_update(subscription: Book, data: TradingPairBook):
order_book.update(subscription["symbol"], data)
@bfx.wss.on("checksum")
async def on_checksum(subscription: Book, value: int):
symbol = subscription["symbol"]
if order_book.verify(symbol, value):
order_book.cooldown[symbol] = False
elif not order_book.cooldown[symbol]:
print(
"Mismatch between local and remote checksums: "
f"restarting book for symbol <{symbol}>..."
)
await bfx.wss.resubscribe(sub_id=subscription["sub_id"])
order_book.cooldown[symbol] = True
bfx.wss.run()

View File

@@ -1,23 +1,22 @@
# python -c "import examples.websocket.public.raw_order_book"
import zlib
from collections import OrderedDict
from typing import Dict, List
from typing import List
from bfxapi import Client, PUB_WSS_HOST
from bfxapi import Client
from bfxapi.types import TradingPairRawBook
from bfxapi.websocket.subscriptions import Book
from bfxapi.websocket.enums import Channel, Error
class RawOrderBook:
def __init__(self, symbols: List[str]):
self.__raw_order_book = {
symbol: {
"bids": OrderedDict(), "asks": OrderedDict()
} for symbol in symbols
symbol: {"bids": OrderedDict(), "asks": OrderedDict()} for symbol in symbols
}
self.cooldown: Dict[str, bool] = {symbol: False for symbol in symbols}
def update(self, symbol: str, data: TradingPairRawBook) -> None:
order_id, price, amount = data.order_id, data.price, data.amount
@@ -26,40 +25,92 @@ class RawOrderBook:
if price > 0:
self.__raw_order_book[symbol][kind][order_id] = {
"order_id": order_id,
"price": price,
"amount": amount
"price": price,
"amount": amount,
}
if price == 0:
if order_id in self.__raw_order_book[symbol][kind]:
del self.__raw_order_book[symbol][kind][order_id]
SYMBOLS = [ "tBTCUSD", "tLTCUSD", "tLTCBTC", "tETHUSD", "tETHBTC" ]
def verify(self, symbol: str, checksum: int) -> bool:
values: List[int] = []
bids = sorted(
[
(data["order_id"], data["price"], data["amount"])
for _, data in self.__raw_order_book[symbol]["bids"].items()
],
key=lambda data: (-data[1], data[0]),
)
asks = sorted(
[
(data["order_id"], data["price"], data["amount"])
for _, data in self.__raw_order_book[symbol]["asks"].items()
],
key=lambda data: (data[1], data[0]),
)
if len(bids) < 25 or len(asks) < 25:
raise AssertionError("Not enough bids or asks (need at least 25).")
for _i in range(25):
bid, ask = bids[_i], asks[_i]
values.extend([bid[0], bid[2]])
values.extend([ask[0], ask[2]])
local = ":".join(str(value) for value in values)
crc32 = zlib.crc32(local.encode("UTF-8"))
return crc32 == checksum
SYMBOLS = ["tLTCBTC", "tETHUSD", "tETHBTC"]
raw_order_book = RawOrderBook(symbols=SYMBOLS)
bfx = Client(wss_host=PUB_WSS_HOST)
bfx = Client()
@bfx.wss.on("wss-error")
def on_wss_error(code: Error, msg: str):
print(code, msg)
@bfx.wss.on("open")
async def on_open():
for symbol in SYMBOLS:
await bfx.wss.subscribe(Channel.BOOK, symbol=symbol, prec="R0")
await bfx.wss.subscribe("book", symbol=symbol, prec="R0")
@bfx.wss.on("subscribed")
def on_subscribed(subscription):
print(f"Subscription successful for pair <{subscription['pair']}>")
print(f"Subscription successful for symbol <{subscription['symbol']}>")
@bfx.wss.on("t_raw_book_snapshot")
def on_t_raw_book_snapshot(subscription: Book, snapshot: List[TradingPairRawBook]):
for data in snapshot:
raw_order_book.update(subscription["symbol"], data)
@bfx.wss.on("t_raw_book_update")
def on_t_raw_book_update(subscription: Book, data: TradingPairRawBook):
raw_order_book.update(subscription["symbol"], data)
@bfx.wss.on("checksum")
async def on_checksum(subscription: Book, value: int):
symbol = subscription["symbol"]
if raw_order_book.verify(symbol, value):
raw_order_book.cooldown[symbol] = False
elif not raw_order_book.cooldown[symbol]:
print(
"Mismatch between local and remote checksums: "
f"restarting book for symbol <{symbol}>..."
)
await bfx.wss.resubscribe(sub_id=subscription["sub_id"])
raw_order_book.cooldown[symbol] = True
bfx.wss.run()

View File

@@ -1,21 +1,22 @@
# python -c "import examples.websocket.public.ticker"
from bfxapi import Client, PUB_WSS_HOST
from bfxapi import Client
from bfxapi.types import TradingPairTicker
from bfxapi.websocket.subscriptions import Ticker
from bfxapi.websocket.enums import Channel
bfx = Client(wss_host=PUB_WSS_HOST)
bfx = Client()
@bfx.wss.on("t_ticker_update")
def on_t_ticker_update(subscription: Ticker, data: TradingPairTicker):
print(f"Subscription with subId: {subscription['subId']}")
print(f"Subscription with sub_id: {subscription['sub_id']}")
print(f"Data: {data}")
@bfx.wss.on("open")
async def on_open():
await bfx.wss.subscribe(Channel.TICKER, symbol="tBTCUSD")
await bfx.wss.subscribe("ticker", symbol="tBTCUSD")
bfx.wss.run()

View File

@@ -1,29 +1,27 @@
# python -c "import examples.websocket.public.trades"
from bfxapi import Client, PUB_WSS_HOST
from bfxapi import Client
from bfxapi.types import Candle, TradingPairTrade
from bfxapi.websocket.subscriptions import Candles, Trades
from bfxapi.websocket.enums import Error, Channel
bfx = Client(wss_host=PUB_WSS_HOST)
bfx = Client()
@bfx.wss.on("candles_update")
def on_candles_update(_sub: Candles, candle: Candle):
print(f"New candle: {candle}")
@bfx.wss.on("t_trade_execution")
def on_t_trade_execution(_sub: Trades, trade: TradingPairTrade):
print(f"New trade: {trade}")
@bfx.wss.on("wss-error")
def on_wss_error(code: Error, msg: str):
print(code, msg)
@bfx.wss.on("open")
async def on_open():
await bfx.wss.subscribe(Channel.CANDLES, key="trade:1m:tBTCUSD")
await bfx.wss.subscribe("candles", key="trade:1m:tBTCUSD")
await bfx.wss.subscribe("trades", symbol="tBTCUSD")
await bfx.wss.subscribe(Channel.TRADES, symbol="tBTCUSD")
bfx.wss.run()

Binary file not shown.

View File

@@ -1,12 +1,14 @@
from distutils.core import setup
version = {}
with open("bfxapi/version.py", encoding="utf-8") as fp:
exec(fp.read(), version) #pylint: disable=exec-used
_version = { }
with open("bfxapi/_version.py", encoding="utf-8") as f:
#pylint: disable-next=exec-used
exec(f.read(), _version)
setup(
name="bitfinex-api-py",
version=version["__version__"],
version=_version["__version__"],
description="Official Bitfinex Python API",
long_description="A Python reference implementation of the Bitfinex API for both REST and websocket interaction",
long_description_content_type="text/markdown",
@@ -25,6 +27,7 @@ setup(
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
keywords="bitfinex,api,trading",
project_urls={
@@ -32,14 +35,22 @@ setup(
"Source": "https://github.com/bitfinexcom/bitfinex-api-py",
},
packages=[
"bfxapi", "bfxapi.utils", "bfxapi.types",
"bfxapi.websocket", "bfxapi.websocket.client", "bfxapi.websocket.handlers",
"bfxapi.rest", "bfxapi.rest.endpoints", "bfxapi.rest.middleware",
"bfxapi",
"bfxapi._utils",
"bfxapi.types",
"bfxapi.websocket",
"bfxapi.websocket._client",
"bfxapi.websocket._handlers",
"bfxapi.websocket._event_emitter",
"bfxapi.rest",
"bfxapi.rest.endpoints",
"bfxapi.rest.middleware",
],
install_requires=[
"pyee~=9.0.4",
"websockets~=10.4",
"requests~=2.28.1"
"websockets~=11.0.3",
"requests~=2.28.1",
"urllib3~=1.26.14",
],
python_requires=">=3.8"
)
)