Add bfxapi/websocket/serializers.py. Implement _Serializer internal class. Separate labeling from PublicChannelsHandler in handlers.py.

This commit is contained in:
Davide Casale
2022-11-16 18:33:33 +01:00
parent d3715d3f9c
commit 6448fd59b9
2 changed files with 127 additions and 63 deletions

View File

@@ -1,5 +1,7 @@
from enum import Enum from enum import Enum
from . import serializers
from .errors import BfxWebsocketException from .errors import BfxWebsocketException
def _get_sub_dictionary(dictionary, keys): def _get_sub_dictionary(dictionary, keys):
@@ -26,128 +28,92 @@ class PublicChannelsHandler(object):
Channels.TICKER: self.__ticker_channel_handler, Channels.TICKER: self.__ticker_channel_handler,
Channels.TRADES: self.__trades_channel_handler, Channels.TRADES: self.__trades_channel_handler,
Channels.BOOK: self.__book_channel_handler, Channels.BOOK: self.__book_channel_handler,
Channels.CANDLES: self.__candles_channel_handler Channels.CANDLES: self.__candles_channel_handler,
} }
def handle(self, subscription, *stream): def handle(self, subscription, *stream):
return self.__handlers[subscription["channel"]](subscription, *stream) if channel := subscription["channel"] or channel in self.__handlers.keys():
return self.__handlers[channel](subscription, *stream)
def __ticker_channel_handler(self, subscription, *stream): def __ticker_channel_handler(self, subscription, *stream):
_trading_pairs_labels = [
"BID",
"BID_SIZE",
"ASK",
"ASK_SIZE",
"DAILY_CHANGE",
"DAILY_CHANGE_RELATIVE",
"LAST_PRICE",
"VOLUME",
"HIGH",
"LOW"
]
_funding_currencies_labels = [
"FRR",
"BID",
"BID_PERIOD",
"BID_SIZE",
"ASK",
"ASK_PERIOD",
"ASK_SIZE",
"DAILY_CHANGE",
"DAILY_CHANGE_RELATIVE",
"LAST_PRICE",
"VOLUME",
"HIGH",
"LOW"
"_PLACEHOLDER",
"_PLACEHOLDER",
"FRR_AMOUNT_AVAILABLE"
]
if subscription["symbol"].startswith("t"): if subscription["symbol"].startswith("t"):
return self.event_emitter.emit( return self.event_emitter.emit(
"tp_ticker_update", "tp_ticker_update",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]), _get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]),
_label_stream_data(_trading_pairs_labels, *stream[0]) serializers.TradingPairTicker(*stream[0])
) )
if subscription["symbol"].startswith("f"): if subscription["symbol"].startswith("f"):
return self.event_emitter.emit( return self.event_emitter.emit(
"fc_ticker_update", "fc_ticker_update",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]), _get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]),
_label_stream_data(_funding_currencies_labels, *stream[0]) serializers.FundingCurrencyTicker(*stream[0])
) )
def __trades_channel_handler(self, subscription, *stream): def __trades_channel_handler(self, subscription, *stream):
_trading_pairs_labels, _funding_currencies_labels = [ "ID", "MTS", "AMOUNT", "PRICE" ], [ "ID", "MTS", "AMOUNT", "RATE", "PERIOD" ]
if len(stream) == 1:
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
"tp_trades_snapshot",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]),
[ _label_stream_data(_trading_pairs_labels, *substream) for substream in stream[0] ]
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
"fc_trades_snapshot",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]),
[ _label_stream_data(_funding_currencies_labels, *substream) for substream in stream[0] ]
)
if type := stream[0] or type in [ "te", "tu", "fte", "ftu" ]: if type := stream[0] or type in [ "te", "tu", "fte", "ftu" ]:
if subscription["symbol"].startswith("t"): if subscription["symbol"].startswith("t"):
return self.event_emitter.emit( return self.event_emitter.emit(
{ "te": "tp_trade_executed", "tu": "tp_trade_execution_update" }[type], { "te": "tp_trade_executed", "tu": "tp_trade_execution_update" }[type],
_get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]), _get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]),
_label_stream_data(_trading_pairs_labels, *stream[1]) serializers.TradingPairTrade(*stream[1])
) )
if subscription["symbol"].startswith("f"): if subscription["symbol"].startswith("f"):
return self.event_emitter.emit( return self.event_emitter.emit(
{ "fte": "fc_trade_executed", "ftu": "fc_trade_execution_update" }[type], { "fte": "fc_trade_executed", "ftu": "fc_trade_execution_update" }[type],
_get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]), _get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]),
_label_stream_data(_funding_currencies_labels, *stream[1]) serializers.FundingCurrencyTrade(*stream[1])
) )
if subscription["symbol"].startswith("t"):
return self.event_emitter.emit(
"tp_trades_snapshot",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "pair" ]),
[ serializers.TradingPairTrade(*substream) for substream in stream[0] ]
)
if subscription["symbol"].startswith("f"):
return self.event_emitter.emit(
"fc_trades_snapshot",
_get_sub_dictionary(subscription, [ "chanId", "symbol", "currency" ]),
[ serializers.FundingCurrencyTrade(*substream) for substream in stream[0] ]
)
def __book_channel_handler(self, subscription, *stream): def __book_channel_handler(self, subscription, *stream):
subscription = _get_sub_dictionary(subscription, [ "chanId", "symbol", "prec", "freq", "len", "subId", "pair" ]) subscription = _get_sub_dictionary(subscription, [ "chanId", "symbol", "prec", "freq", "len", "subId", "pair" ])
if subscription["prec"] == "R0": if subscription["prec"] == "R0":
_trading_pairs_labels, _funding_currencies_labels, IS_RAW_BOOK = [ "ORDER_ID", "PRICE", "AMOUNT" ], [ "OFFER_ID", "PERIOD", "RATE", "AMOUNT" ], True _trading_pair_serializer, _funding_currency_serializer, IS_RAW_BOOK = serializers.TradingPairRawBook, serializers.FundingCurrencyRawBook, True
else: _trading_pairs_labels, _funding_currencies_labels, IS_RAW_BOOK = [ "PRICE", "COUNT", "AMOUNT" ], [ "RATE", "PERIOD", "COUNT", "AMOUNT" ], False else: _trading_pair_serializer, _funding_currency_serializer, IS_RAW_BOOK = serializers.TradingPairBook, serializers.FundingCurrencyBook, False
if all(isinstance(substream, list) for substream in stream[0]): if all(isinstance(substream, list) for substream in stream[0]):
return self.event_emitter.emit( return self.event_emitter.emit(
IS_RAW_BOOK and "raw_book_snapshot" or "book_snapshot", IS_RAW_BOOK and "raw_book_snapshot" or "book_snapshot",
subscription, subscription,
[ _label_stream_data({ "t": _trading_pairs_labels, "f": _funding_currencies_labels }[subscription["symbol"][0]], *substream) for substream in stream[0] ] [ { "t": _trading_pair_serializer, "f": _funding_currency_serializer }[subscription["symbol"][0]](*substream) for substream in stream[0] ]
) )
return self.event_emitter.emit( return self.event_emitter.emit(
IS_RAW_BOOK and "raw_book_update" or "book_update", IS_RAW_BOOK and "raw_book_update" or "book_update",
subscription, subscription,
_label_stream_data({ "t": _trading_pairs_labels, "f": _funding_currencies_labels }[subscription["symbol"][0]], *stream[0]) { "t": _trading_pair_serializer, "f": _funding_currency_serializer }[subscription["symbol"][0]](*stream[0])
) )
def __candles_channel_handler(self, subscription, *stream): def __candles_channel_handler(self, subscription, *stream):
_labels = [ "MTS", "OPEN", "CLOSE", "HIGH", "LOW", "VOLUME" ]
subscription = _get_sub_dictionary(subscription, [ "chanId", "key" ]) subscription = _get_sub_dictionary(subscription, [ "chanId", "key" ])
if all(isinstance(substream, list) for substream in stream[0]): if all(isinstance(substream, list) for substream in stream[0]):
return self.event_emitter.emit( return self.event_emitter.emit(
"candles_snapshot", "candles_snapshot",
subscription, subscription,
[ _label_stream_data(_labels, *substream) for substream in stream[0] ] [ serializers.Candle(*substream) for substream in stream[0] ]
) )
return self.event_emitter.emit( return self.event_emitter.emit(
"candles_update", "candles_update",
subscription, subscription,
_label_stream_data(_labels, *stream[0]) serializers.Candle(*stream[0])
) )
class AuthenticatedChannelsHandler(object): class AuthenticatedChannelsHandler(object):

View File

@@ -0,0 +1,98 @@
from .errors import BfxWebsocketException
class _Serializer(object):
def __init__(self, name, labels):
self.name, self.__labels = name, labels
def __serialize(self, *args, IGNORE = [ "_PLACEHOLDER" ]):
if len(self.__labels) != len(args):
raise BfxWebsocketException("<self.__labels> and <*args> arguments should contain the same amount of elements.")
for index, label in enumerate(self.__labels):
if label not in IGNORE:
yield label, args[index]
def __call__(self, *values):
return dict(self.__serialize(*values))
TradingPairTicker = _Serializer("TradingPairTicker", labels=[
"BID",
"BID_SIZE",
"ASK",
"ASK_SIZE",
"DAILY_CHANGE",
"DAILY_CHANGE_RELATIVE",
"LAST_PRICE",
"VOLUME",
"HIGH",
"LOW"
])
FundingCurrencyTicker = _Serializer("FundingCurrencyTicker", labels=[
"FRR",
"BID",
"BID_PERIOD",
"BID_SIZE",
"ASK",
"ASK_PERIOD",
"ASK_SIZE",
"DAILY_CHANGE",
"DAILY_CHANGE_RELATIVE",
"LAST_PRICE",
"VOLUME",
"HIGH",
"LOW"
"_PLACEHOLDER",
"_PLACEHOLDER",
"FRR_AMOUNT_AVAILABLE"
])
TradingPairTrade = _Serializer("TradingPairTrade", labels=[
"ID",
"MTS",
"AMOUNT",
"PRICE"
])
FundingCurrencyTrade = _Serializer("FundingCurrencyTrade", labels=[
"ID",
"MTS",
"AMOUNT",
"RATE",
"PERIOD"
])
TradingPairBook = _Serializer("TradingPairBook", labels=[
"PRICE",
"COUNT",
"AMOUNT"
])
FundingCurrencyBook = _Serializer("FundingCurrencyBook", labels=[
"RATE",
"PERIOD",
"COUNT",
"AMOUNT"
])
TradingPairRawBook = _Serializer("TradingPairRawBook", labels=[
"ORDER_ID",
"PRICE",
"AMOUNT"
])
FundingCurrencyRawBook = _Serializer("FundingCurrencyRawBook", labels=[
"OFFER_ID",
"PERIOD",
"RATE",
"AMOUNT"
])
Candle = _Serializer("Candle", labels=[
"MTS",
"OPEN",
"CLOSE",
"HIGH",
"LOW",
"VOLUME"
])