Files
bitfinex-api-py/bfxapi/rest/BfxRestInterface.py
2022-12-01 17:53:57 +01:00

62 lines
2.5 KiB
Python

import requests
from http import HTTPStatus
from typing import List, Union, Optional
from . import serializers
from .typings import PlatformStatus, TradingPairTicker, FundingCurrencyTicker, TickerHistories, TradingPairTrades, FundingCurrencyTrades
from .exceptions import RequestParametersError
class BfxRestInterface(object):
def __init__(self, host):
self.host = host
def __GET(self, endpoint, params = None):
data = requests.get(f"{self.host}/{endpoint}", params=params).json()
if data[0] == "error":
if data[1] == 10020:
raise RequestParametersError(f"The request was rejected with the following parameter error: <{data[2]}>")
return data
def platform_status(self) -> PlatformStatus:
return serializers.PlatformStatus.parse(*self.__GET("platform/status"))
def tickers(self, symbols: List[str]) -> List[Union[TradingPairTicker, FundingCurrencyTicker]]:
return [
{
"t": serializers.TradingPairTicker.parse,
"f": serializers.FundingCurrencyTicker.parse
}[subdata[0][0]](*subdata)
for subdata in self.__GET("tickers", params={ "symbols": ",".join(symbols) })
]
def ticker(self, symbol: str) -> Union[TradingPairTicker, FundingCurrencyTicker]:
return {
"t": serializers.TradingPairTicker.parse,
"f": serializers.FundingCurrencyTicker.parse
}[symbol[0]](*self.__GET(f"ticker/{symbol}"), skip=["SYMBOL"])
def tickers_hist(self, symbols: List[str], start: Optional[int] = None, end: Optional[int] = None, limit: Optional[int] = None) -> TickerHistories:
params = {
"symbols": ",".join(symbols),
"start": start, "end": end,
"limit": limit
}
return [ serializers.TickerHistory.parse(*subdata) for subdata in self.__GET("tickers/hist", params=params) ]
def trades(self, symbol: str, limit: Optional[int] = None, start: Optional[str] = None, end: Optional[str] = None, sort: Optional[int] = None) -> Union[TradingPairTrades, FundingCurrencyTrades]:
params = { "symbol": symbol, "limit": limit, "start": start, "end": end, "sort": sort }
return [
{
"t": serializers.TradingPairTrade.parse,
"f": serializers.FundingCurrencyTrade.parse
}[symbol[0]](*subdata)
for subdata in self.__GET(f"trades/{symbol}/hist", params=params)
]