diff --git a/bfxapi/websocket/handlers.py b/bfxapi/websocket/handlers.py index d8bca05..ed406a3 100644 --- a/bfxapi/websocket/handlers.py +++ b/bfxapi/websocket/handlers.py @@ -2,6 +2,15 @@ from enum import Enum from .errors import BfxWebsocketException +def _get_sub_dictionary(dictionary, keys): + return { key: dictionary[key] for key in dictionary if key in keys } + +def _label_stream_data(labels, *args, IGNORE = [ "_PLACEHOLDER" ]): + if len(labels) != len(args): + raise BfxWebsocketException(" and <*args> arguments should contain the same amount of elements.") + + return { label: args[index] for index, label in enumerate(labels) if label not in IGNORE } + class Channels(str, Enum): TICKER = "ticker" TRADES = "trades" @@ -14,14 +23,97 @@ class PublicChannelsHandler(object): self.event_emitter = event_emitter self.__handlers = { + Channels.TICKER: self.__ticker_channel_handler, + Channels.TRADES: self.__trades_channel_handler, Channels.BOOK: self.__book_channel_handler, + Channels.CANDLES: self.__candles_channel_handler } def handle(self, subscription, *stream): return self.__handlers[subscription["channel"]](subscription, *stream) + def __ticker_channel_handler(self, subscription, *stream): + _trading_pairs_labels = [ + "BID", + "BID_SIZE", + "ASK", + "ASK_SIZE", + "DAILY_CHANGE", + "DAILY_CHANGE_RELATIVE", + "LAST_PRICE", + "VOLUME", + "HIGH", + "LOW" + ] + + _funding_currencies_labels = [ + "FRR", + "BID", + "BID_PERIOD", + "BID_SIZE", + "ASK", + "ASK_PERIOD", + "ASK_SIZE", + "DAILY_CHANGE", + "DAILY_CHANGE_RELATIVE", + "LAST_PRICE", + "VOLUME", + "HIGH", + "LOW" + "_PLACEHOLDER", + "_PLACEHOLDER", + "FRR_AMOUNT_AVAILABLE" + ] + + if subscription["symbol"].startswith("t"): + return self.event_emitter.emit( + "tp_ticker_update", + _get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]), + _label_stream_data(_trading_pairs_labels, *stream[0]) + ) + + if subscription["symbol"].startswith("f"): + return self.event_emitter.emit( + "fc_ticker_update", + _get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]), + _label_stream_data(_funding_currencies_labels, *stream[0]) + ) + + def __trades_channel_handler(self, subscription, *stream): + _trading_pairs_labels, _funding_currencies_labels = [ "ID", "MTS", "AMOUNT", "PRICE" ], [ "ID", "MTS", "AMOUNT", "RATE", "PERIOD" ] + + if len(stream) == 1: + if subscription["symbol"].startswith("t"): + return self.event_emitter.emit( + "tp_trades_snapshot", + _get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]), + [ _label_stream_data(_trading_pairs_labels, *substream) for substream in stream[0] ] + ) + + if subscription["symbol"].startswith("f"): + return self.event_emitter.emit( + "fc_trades_snapshot", + _get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]), + [ _label_stream_data(_funding_currencies_labels, *substream) for substream in stream[0] ] + ) + + if type := stream[0] or type in [ "te", "tu", "fte", "ftu" ]: + if subscription["symbol"].startswith("t"): + return self.event_emitter.emit( + { "te": "tp_trade_executed", "tu": "tp_trade_execution_update" }[type], + _get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]), + _label_stream_data(_trading_pairs_labels, *stream[1]) + ) + + if subscription["symbol"].startswith("f"): + return self.event_emitter.emit( + { "fte": "fc_trade_executed", "ftu": "fc_trade_execution_update" }[type], + _get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]), + _label_stream_data(_funding_currencies_labels, *stream[1]) + ) + def __book_channel_handler(self, subscription, *stream): - subscription = _filter_dictionary_keys(subscription, [ "chanId", "symbol", "prec", "freq", "len", "subId", "pair" ]) + subscription = _get_sub_dictionary(subscription, [ "chanId", "symbol", "prec", "freq", "len", "subId", "pair" ]) if subscription["prec"] == "R0": _trading_pairs_labels, _funding_currencies_labels, IS_RAW_BOOK = [ "ORDER_ID", "PRICE", "AMOUNT" ], [ "OFFER_ID", "PERIOD", "RATE", "AMOUNT" ], True @@ -31,15 +123,33 @@ class PublicChannelsHandler(object): return self.event_emitter.emit( IS_RAW_BOOK and "raw_book_snapshot" or "book_snapshot", subscription, - [ _label_stream_data({ 3: _trading_pairs_labels, 4: _funding_currencies_labels }[len(substream)], *substream) for substream in stream[0] ] + [ _label_stream_data({ "t": _trading_pairs_labels, "f": _funding_currencies_labels }[subscription["symbol"][0]], *substream) for substream in stream[0] ] ) return self.event_emitter.emit( IS_RAW_BOOK and "raw_book_update" or "book_update", subscription, - _label_stream_data({ 3: _trading_pairs_labels, 4: _funding_currencies_labels }[len(stream[0])], *stream[0]) + _label_stream_data({ "t": _trading_pairs_labels, "f": _funding_currencies_labels }[subscription["symbol"][0]], *stream[0]) ) + def __candles_channel_handler(self, subscription, *stream): + _labels = [ "MTS", "OPEN", "CLOSE", "HIGH", "LOW", "VOLUME" ] + + subscription = _get_sub_dictionary(subscription, [ "chanId", "key" ]) + + if all(isinstance(substream, list) for substream in stream[0]): + return self.event_emitter.emit( + "candles_snapshot", + subscription, + [ _label_stream_data(_labels, *substream) for substream in stream[0] ] + ) + + return self.event_emitter.emit( + "candles_update", + subscription, + _label_stream_data(_labels, *stream[0]) + ) + class AuthenticatedChannelsHandler(object): def __init__(self, event_emitter, strict = False): self.event_emitter, self.strict = event_emitter, strict @@ -304,13 +414,4 @@ class AuthenticatedChannelsHandler(object): self.event_emitter.emit("balance_update", _label_stream_data([ "AUM", "AUM_NET" - ], *stream)) - -def _label_stream_data(labels, *args, IGNORE = [ "_PLACEHOLDER" ]): - if len(labels) != len(args): - raise BfxWebsocketException(" and <*args> arguments should contain the same amount of elements.") - - return { label: args[index] for index, label in enumerate(labels) if label not in IGNORE } - -def _filter_dictionary_keys(dictionary, keys): - return { key: dictionary[key] for key in dictionary if key in keys } \ No newline at end of file + ], *stream)) \ No newline at end of file diff --git a/bfxapi/websocket/typings.py b/bfxapi/websocket/typings.py index baecc8a..bf432fb 100644 --- a/bfxapi/websocket/typings.py +++ b/bfxapi/websocket/typings.py @@ -2,7 +2,31 @@ from typing import Type, List, Dict, TypedDict, Union, Optional JSON = Union[Dict[str, "JSON"], List["JSON"], bool, int, float, str, Type[None]] -#region Type hinting for Websocket Public Channels +#region Type hinting for subscription objects + +TradingPairsTickerSubscription = TypedDict("TradingPairsTickerSubscription", { + "chanId": int, + "symbol": str, + "pair": str +}) + +FundingCurrenciesTickerSubscription = TypedDict("FundingCurrenciesTickerSubscription", { + "chanId": int, + "symbol": str, + "currency": str +}) + +TradingPairsTradesSubscription = TypedDict("TradingPairsTradesSubscription", { + "chanId": int, + "symbol": str, + "pair": str +}) + +FundingCurrenciesTradesSubscription = TypedDict("FundingCurrenciesTradesSubscription", { + "chanId": int, + "symbol": str, + "currency": str +}) BookSubscription = TypedDict("BookSubscription", { "chanId": int, @@ -14,6 +38,52 @@ BookSubscription = TypedDict("BookSubscription", { "pair": str }) +CandlesSubscription = TypedDict("CandlesSubscription", { + "chanId": int, + "key": str +}) + +#endregion + +#region Type hinting for Websocket Public Channels + +TradingPairTicker = TypedDict("TradingPairTicker", { + "BID": float, + "BID_SIZE": float, + "ASK": float, + "ASK_SIZE": float, + "DAILY_CHANGE": float, + "DAILY_CHANGE_RELATIVE": float, + "LAST_PRICE": float, + "VOLUME": float, + "HIGH": float, + "LOW": float +}) + +FundingCurrencyTicker = TypedDict("FundingCurrencyTicker", { + "FRR": float, + "BID": float, + "BID_PERIOD": int, + "BID_SIZE": float, + "ASK": float, + "ASK_PERIOD": int, + "ASK_SIZE": float, + "DAILY_CHANGE": float, + "DAILY_CHANGE_RELATIVE": float, + "LAST_PRICE": float, + "VOLUME": float, + "HIGH": float, + "LOW": float, + "FRR_AMOUNT_AVAILABLE": float +}) + +(TradingPairTrade, FundingCurrencyTrade) = ( + TypedDict("TradingPairTrade", { "ID": int, "MTS": int, "AMOUNT": float, "PRICE": float }), + TypedDict("FundingCurrencyTrade", { "ID": int, "MTS": int, "AMOUNT": float, "RATE": float, "PERIOD": int }) +) + +(TradingPairTrades, FundingCurrencyTrades) = (List[TradingPairTrade], List[FundingCurrencyTrade]) + (TradingPairBook, FundingCurrencyBook) = ( TypedDict("TradingPairBook", { "PRICE": float, "COUNT": int, "AMOUNT": float }), TypedDict("FundingCurrencyBook", { "RATE": float, "PERIOD": int, "COUNT": int, "AMOUNT": float }) @@ -36,6 +106,17 @@ RawBook = Union[TradingPairRawBook, FundingCurrencyRawBook] RawBooks = Union[TradingPairRawBooks, FundingCurrencyRawBooks] +Candle = TypedDict("Candle", { + "MTS": int, + "OPEN": float, + "CLOSE": float, + "HIGH": float, + "LOW": float, + "VOLUME": float +}) + +Candles = List[Candle] + #endregion #region Type hinting for Websocket Authenticated Channels