diff --git a/bfxapi/websocket/BfxWebsocketClient.py b/bfxapi/websocket/BfxWebsocketClient.py index 9ba6b45..628e316 100644 --- a/bfxapi/websocket/BfxWebsocketClient.py +++ b/bfxapi/websocket/BfxWebsocketClient.py @@ -2,7 +2,7 @@ import json, asyncio, hmac, hashlib, time, websockets from pyee.asyncio import AsyncIOEventEmitter -from .handlers import Channels, PublicChannelsHandler, AuthenticatedEventsHandler +from .handlers import Channels, PublicChannelsHandler, AuthenticatedChannelsHandler from .errors import BfxWebsocketException, ConnectionNotOpen, InvalidAuthenticationCredentials @@ -16,7 +16,7 @@ class BfxWebsocketClient(object): self.handlers = { "public": PublicChannelsHandler(event_emitter=self.event_emitter), - "authenticated": AuthenticatedEventsHandler(event_emitter=self.event_emitter) + "authenticated": AuthenticatedChannelsHandler(event_emitter=self.event_emitter) } async def connect(self): diff --git a/bfxapi/websocket/handlers.py b/bfxapi/websocket/handlers.py index a601aa6..d9433aa 100644 --- a/bfxapi/websocket/handlers.py +++ b/bfxapi/websocket/handlers.py @@ -47,456 +47,271 @@ class PublicChannelsHandler(object): def __status_channel_handler(self, subscription, *parameters): self.event_emitter.emit("status", subscription, parameters[0]) -class AuthenticatedEventsHandler(object): +class AuthenticatedChannelsHandler(object): def __init__(self, event_emitter, strict = False): self.event_emitter, self.strict = event_emitter, strict self.__handlers = { - "bu": self.__bu_event_handler, - "ws": self.__ws_event_handler, - "wu": self.__wu_event_handler, - "os": self.__os_event_handler, - "on": self.__on_event_handler, - "ou": self.__ou_event_handler, - "oc": self.__oc_event_handler, - "ps": self.__ps_event_handler, - "pn": self.__pn_event_handler, - "pu": self.__pu_event_handler, - "pc": self.__pc_event_handler, - "fos": self.__fos_event_handler, - "fon": self.__fon_event_handler, - "fou": self.__fou_event_handler, - "foc": self.__foc_event_handler, + ("os", "on", "ou", "oc",): self.__orders_channel_handler, + ("ps", "pn", "pu", "pc",): self.__positions_channel_handler, + ("te", "tu",): self.__trades_channel_handler, + ("fos", "fon", "fou", "foc",): self.__funding_offers_channel_handler, + ("fcs", "fcn", "fcu", "fcc",): self.__funding_credits_channel_handler, + ("fls", "fln", "flu", "flc",): self.__funding_loans_channel_handler, + ("ws", "wu",): self.__wallets_channel_handler, + ("bu",): self.__balance_info_channel_handler } def handle(self, type, stream): - if type in self.__handlers: - self.__handlers[type](*stream) - elif self.strict == True: + for abbreviations in self.__handlers.keys(): + if type in abbreviations: + return self.__handlers[abbreviations](type, stream) + + if self.strict == True: raise BfxWebsocketException(f"Event of type <{type}> not found in self.__handlers.") - def __bu_event_handler(self, *stream): - self.event_emitter.emit("balance_update", _label_stream_data( - [ - "AUM", - "AUM_NET" - ], - *stream - )) + def __orders_channel_handler(self, type, stream): + _labels = [ + "ID", + "GID", + "CID", + "SYMBOL", + "MTS_CREATE", + "MTS_UPDATE", + "AMOUNT", + "AMOUNT_ORIG", + "ORDER_TYPE", + "TYPE_PREV", + "MTS_TIF", + "_PLACEHOLDER", + "FLAGS", + "ORDER_STATUS", + "_PLACEHOLDER", + "_PLACEHOLDER", + "PRICE", + "PRICE_AVG", + "PRICE_TRAILING", + "PRICE_AUX_LIMIT", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER", + "NOTIFY", + "HIDDEN", + "PLACED_ID", + "_PLACEHOLDER", + "_PLACEHOLDER", + "ROUTING", + "_PLACEHOLDER", + "_PLACEHOLDER", + "META" + ] - def __ws_event_handler(self, *stream): - self.event_emitter.emit("wallet_snapshot", [ - _label_stream_data( - [ - "WALLET_TYPE", - "CURRENCY", - "BALANCE", - "UNSETTLED_INTEREST", - "BALANCE_AVAILABLE", - "DESCRIPTION", - "META" - ], - *substream - ) for substream in stream - ]) - - def __wu_event_handler(self, *stream): - self.event_emitter.emit("wallet_update", _label_stream_data( - [ - "WALLET_TYPE", - "CURRENCY", - "BALANCE", - "UNSETTLED_INTEREST", - "BALANCE_AVAILABLE", - "DESCRIPTION", - "META" - ], - *stream - )) + if type == "os": + self.event_emitter.emit("order_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ]) - def __os_event_handler(self, *stream): - self.event_emitter.emit("order_snapshot", [ - _label_stream_data( - [ - "ID", - "GID", - "CID", - "SYMBOL", - "MTS_CREATE", - "MTS_UPDATE", - "AMOUNT", - "AMOUNT_ORIG", - "ORDER_TYPE", - "TYPE_PREV", - "MTS_TIF", - "_PLACEHOLDER", - "FLAGS", - "STATUS", - "_PLACEHOLDER", - "_PLACEHOLDER", - "PRICE", - "PRICE_AVG", - "PRICE_TRAILING", - "PRICE_AUX_LIMIT", - "_PLACEHOLDER", - "_PLACEHOLDER", - "_PLACEHOLDER", - "NOTIFY", - "HIDDEN", - "PLACED_ID", - "_PLACEHOLDER", - "_PLACEHOLDER", - "ROUTING", - "_PLACEHOLDER", - "_PLACEHOLDER", - "META" - ], - *substream - ) for substream in stream - ]) + if type == "on" or type == "ou" or type == "oc": + self.event_emitter.emit({ + "on": "new_order", + "ou": "order_update", + "oc": "order_cancel" + }[type], _label_stream_data(_labels, *stream)) - def __on_event_handler(self, *stream): - self.event_emitter.emit("new_order", _label_stream_data( - [ + def __positions_channel_handler(self, type, stream): + _labels = [ + "SYMBOL", + "STATUS", + "AMOUNT", + "BASE_PRICE", + "MARGIN_FUNDING", + "MARGIN_FUNDING_TYPE", + "PL", + "PL_PERC", + "PRICE_LIQ", + "LEVERAGE", + "FLAG", + "POSITION_ID", + "MTS_CREATE", + "MTS_UPDATE", + "_PLACEHOLDER", + "TYPE", + "_PLACEHOLDER", + "COLLATERAL", + "COLLATERAL_MIN", + "META" + ] + + if type == "ps": + self.event_emitter.emit("position_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ]) + + if type == "pn" or type == "pu" or type == "pc": + self.event_emitter.emit({ + "pn": "new_position", + "pu": "position_update", + "pc": "position_close" + }[type], _label_stream_data(_labels, *stream)) + + def __trades_channel_handler(self, type, stream): + if type == "te": + self.event_emitter.emit("trade_executed", _label_stream_data([ "ID", - "GID", - "CID", "SYMBOL", - "MTS_CREATE", - "MTS_UPDATE", - "AMOUNT", - "AMOUNT_ORIG", + "MTS_CREATE", + "ORDER_ID", + "EXEC_AMOUNT", + "EXEC_PRICE", "ORDER_TYPE", - "TYPE_PREV", - "MTS_TIF", + "ORDER_PRICE", + "MAKER", "_PLACEHOLDER", - "FLAGS", - "ORDER_STATUS", - "_PLACEHOLDER", - "_PLACEHOLDER", - "PRICE", - "PRICE_AVG", - "PRICE_TRAILING", - "PRICE_AUX_LIMIT", - "_PLACEHOLDER", - "_PLACEHOLDER", - "_PLACEHOLDER", - "NOTIFY", - "HIDDEN", - "PLACED_ID", - "_PLACEHOLDER", - "_PLACEHOLDER", - "ROUTING", - "_PLACEHOLDER", - "_PLACEHOLDER", - "_PLACEHOLDER" - ], - *stream - )) + "_PLACEHOLDER", + "CID" + ], *stream)) - def __ou_event_handler(self, *stream): - self.event_emitter.emit("order_update", _label_stream_data( - [ + if type == "tu": + self.event_emitter.emit("trade_execution_update", _label_stream_data([ "ID", - "GID", - "CID", "SYMBOL", - "MTS_CREATE", - "MTS_UPDATE", - "AMOUNT", - "AMOUNT_ORIG", + "MTS_CREATE", + "ORDER_ID", + "EXEC_AMOUNT", + "EXEC_PRICE", "ORDER_TYPE", - "TYPE_PREV", - "MTS_TIF", - "_PLACEHOLDER", - "FLAGS", - "ORDER_STATUS", - "_PLACEHOLDER", - "_PLACEHOLDER", - "PRICE", - "PRICE_AVG", - "PRICE_TRAILING", - "PRICE_AUX_LIMIT", - "_PLACEHOLDER", - "_PLACEHOLDER", - "_PLACEHOLDER", - "NOTIFY", - "HIDDEN", - "PLACED_ID", - "_PLACEHOLDER", - "_PLACEHOLDER", - "ROUTING", - "_PLACEHOLDER", - "_PLACEHOLDER", - "_PLACEHOLDER" - ], - *stream - )) + "ORDER_PRICE", + "MAKER", + "FEE", + "FEE_CURRENCY", + "CID" + ], *stream)) - def __oc_event_handler(self, *stream): - self.event_emitter.emit("order_cancel", _label_stream_data( - [ - "ID", - "GID", - "CID", - "SYMBOL", - "MTS_CREATE", - "MTS_UPDATE", - "AMOUNT", - "AMOUNT_ORIG", - "ORDER_TYPE", - "TYPE_PREV", - "MTS_TIF", - "_PLACEHOLDER", - "FLAGS", - "ORDER_STATUS", - "_PLACEHOLDER", - "_PLACEHOLDER", - "PRICE", - "PRICE_AVG", - "PRICE_TRAILING", - "PRICE_AUX_LIMIT", - "_PLACEHOLDER", - "_PLACEHOLDER", - "_PLACEHOLDER", - "NOTIFY", - "HIDDEN", - "PLACED_ID", - "_PLACEHOLDER", - "_PLACEHOLDER", - "ROUTING", - "_PLACEHOLDER", - "_PLACEHOLDER", - "_PLACEHOLDER" - ], - *stream - )) + def __funding_offers_channel_handler(self, type, stream): + _labels = [ + "ID", + "SYMBOL", + "MTS_CREATED", + "MTS_UPDATED", + "AMOUNT", + "AMOUNT_ORIG", + "OFFER_TYPE", + "_PLACEHOLDER", + "_PLACEHOLDER", + "FLAGS", + "STATUS", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER", + "RATE", + "PERIOD", + "NOTIFY", + "HIDDEN", + "_PLACEHOLDER", + "RENEW", + "_PLACEHOLDER", + ] - def __ps_event_handler(self, *stream): - self.event_emitter.emit("position_snapshot", [ - _label_stream_data( - [ - "SYMBOL", - "STATUS", - "AMOUNT", - "BASE_PRICE", - "MARGIN_FUNDING", - "MARGIN_FUNDING_TYPE", - "PL", - "PL_PERC", - "PRICE_LIQ", - "LEVERAGE", - "FLAG", - "POSITION_ID", - "MTS_CREATE", - "MTS_UPDATE", - "_PLACEHOLDER", - "TYPE", - "_PLACEHOLDER", - "COLLATERAL", - "COLLATERAL_MIN", - "META" - ], - *substream - ) - for substream in stream - ]) + if type == "fos": + self.event_emitter.emit("funding_offer_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ]) - def __pn_event_handler(self, *stream): - self.event_emitter.emit("new_position", _label_stream_data( - [ - "SYMBOL", - "STATUS", - "AMOUNT", - "BASE_PRICE", - "MARGIN_FUNDING", - "MARGIN_FUNDING_TYPE", - "PL", - "PL_PERC", - "PRICE_LIQ", - "LEVERAGE", - "FLAG", - "POSITION_ID", - "MTS_CREATE", - "MTS_UPDATE", - "_PLACEHOLDER", - "TYPE", - "_PLACEHOLDER", - "COLLATERAL", - "COLLATERAL_MIN", - "META" - ], - *stream - )) + if type == "fon" or type == "fou" or type == "foc": + self.event_emitter.emit({ + "fon": "funding_offer_new", + "fou": "funding_offer_update", + "foc": "funding_offer_cancel" + }[type], _label_stream_data(_labels, *stream)) - def __pu_event_handler(self, *stream): - self.event_emitter.emit("position_update", _label_stream_data( - [ - "SYMBOL", - "STATUS", - "AMOUNT", - "BASE_PRICE", - "MARGIN_FUNDING", - "MARGIN_FUNDING_TYPE", - "PL", - "PL_PERC", - "PRICE_LIQ", - "LEVERAGE", - "FLAG", - "POSITION_ID", - "MTS_CREATE", - "MTS_UPDATE", - "_PLACEHOLDER", - "TYPE", - "_PLACEHOLDER", - "COLLATERAL", - "COLLATERAL_MIN", - "META" - ], - *stream - )) + def __funding_credits_channel_handler(self, type, stream): + _labels = [ + "ID", + "SYMBOL", + "SIDE", + "MTS_CREATE", + "MTS_UPDATE", + "AMOUNT", + "FLAGS", + "STATUS", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER", + "RATE", + "PERIOD", + "MTS_OPENING", + "MTS_LAST_PAYOUT", + "NOTIFY", + "HIDDEN", + "_PLACEHOLDER", + "RENEW", + "RATE_REAL", + "NO_CLOSE", + "POSITION_PAIR" + ] - def __pc_event_handler(self, *stream): - self.event_emitter.emit("position_cancel", _label_stream_data( - [ - "SYMBOL", - "STATUS", - "AMOUNT", - "BASE_PRICE", - "MARGIN_FUNDING", - "MARGIN_FUNDING_TYPE", - "PL", - "PL_PERC", - "PRICE_LIQ", - "LEVERAGE", - "FLAG", - "POSITION_ID", - "MTS_CREATE", - "MTS_UPDATE", - "_PLACEHOLDER", - "TYPE", - "_PLACEHOLDER", - "COLLATERAL", - "COLLATERAL_MIN", - "META" - ], - *stream - )) + if type == "fcs": + self.event_emitter.emit("funding_credit_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ]) - def __fos_event_handler(self, *stream): - self.event_emitter.emit("funding_offer_snapshot", [ - _label_stream_data( - [ - "ID", - "SYMBOL", - "MTS_CREATED", - "MTS_UPDATED", - "AMOUNT", - "AMOUNT_ORIG", - "OFFER_TYPE", - "_PLACEHOLDER", - "_PLACEHOLDER", - "FLAGS", - "STATUS", - "_PLACEHOLDER", - "_PLACEHOLDER", - "_PLACEHOLDER", - "RATE", - "PERIOD", - "NOTIFY", - "HIDDEN", - "_PLACEHOLDER", - "RENEW", - "_PLACEHOLDER", - ], - *substream - ) - for substream in stream - ]) + if type == "fcn" or type == "fcu" or type == "fcc": + self.event_emitter.emit({ + "fcn": "funding_credit_new", + "fcu": "funding_credit_update", + "fcc": "funding_credit_close" + }[type], _label_stream_data(_labels, *stream)) - def __fon_event_handler(self, *stream): - self.event_emitter.emit("funding_offer_new", _label_stream_data( - [ - "ID", - "SYMBOL", - "MTS_CREATED", - "MTS_UPDATED", - "AMOUNT", - "AMOUNT_ORIG", - "TYPE", - "_PLACEHOLDER", - "_PLACEHOLDER", - "FLAGS", - "STATUS", - "_PLACEHOLDER", - "_PLACEHOLDER", - "_PLACEHOLDER", - "RATE", - "PERIOD", - "NOTIFY", - "HIDDEN", - "_PLACEHOLDER", - "RENEW", - "RATE_REAL" - ], - *stream - )) + def __funding_loans_channel_handler(self, type, stream): + _labels = [ + "ID", + "SYMBOL", + "SIDE", + "MTS_CREATE", + "MTS_UPDATE", + "AMOUNT", + "FLAGS", + "STATUS", + "_PLACEHOLDER", + "_PLACEHOLDER", + "_PLACEHOLDER", + "RATE", + "PERIOD", + "MTS_OPENING", + "MTS_LAST_PAYOUT", + "NOTIFY", + "HIDDEN", + "_PLACEHOLDER", + "RENEW", + "RATE_REAL", + "NO_CLOSE" + ] - def __fou_event_handler(self, *stream): - self.event_emitter.emit("funding_offer_update", _label_stream_data( - [ - "ID", - "SYMBOL", - "MTS_CREATED", - "MTS_UPDATED", - "AMOUNT", - "AMOUNT_ORIG", - "TYPE", - "_PLACEHOLDER", - "_PLACEHOLDER", - "FLAGS", - "STATUS", - "_PLACEHOLDER", - "_PLACEHOLDER", - "_PLACEHOLDER", - "RATE", - "PERIOD", - "NOTIFY", - "HIDDEN", - "_PLACEHOLDER", - "RENEW", - "RATE_REAL" - ], - *stream - )) + if type == "fls": + self.event_emitter.emit("funding_loan_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ]) - def __foc_event_handler(self, *stream): - self.event_emitter.emit("funding_offer_cancel", _label_stream_data( - [ - "ID", - "SYMBOL", - "MTS_CREATED", - "MTS_UPDATED", - "AMOUNT", - "AMOUNT_ORIG", - "TYPE", - "_PLACEHOLDER", - "_PLACEHOLDER", - "FLAGS", - "STATUS", - "_PLACEHOLDER", - "_PLACEHOLDER", - "_PLACEHOLDER", - "RATE", - "PERIOD", - "NOTIFY", - "HIDDEN", - "_PLACEHOLDER", - "RENEW", - "RATE_REAL" - ], - *stream - )) + if type == "fln" or type == "flu" or type == "flc": + self.event_emitter.emit({ + "fln": "funding_loan_new", + "flu": "funding_loan_update", + "flc": "funding_loan_close" + }[type], _label_stream_data(_labels, *stream)) + + def __wallets_channel_handler(self, type, stream): + _labels = [ + "WALLET_TYPE", + "CURRENCY", + "BALANCE", + "UNSETTLED_INTEREST", + "BALANCE_AVAILABLE", + "DESCRIPTION", + "META" + ] + + if type == "ws": + self.event_emitter.emit("wallet_snapshot", [ _label_stream_data(_labels, *substream) for substream in stream ]) + + if type == "wu": + self.event_emitter.emit("wallet_update", _label_stream_data(_labels, *stream)) + + def __balance_info_channel_handler(self, type, stream): + if type == "bu": + self.event_emitter.emit("balance_update", _label_stream_data([ + "AUM", + "AUM_NET" + ], *stream)) def _label_stream_data(labels, *args, IGNORE = [ "_PLACEHOLDER" ]): if len(labels) != len(args):