From d1308dad7b0d44c9daae6e3bc8d8690008bbf0c2 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Mon, 29 Nov 2021 09:49:33 +0100 Subject: [PATCH] refactoring --- bfxapi/websockets/bfx_websocket.py | 56 +++++++++++++------------- bfxapi/websockets/generic_websocket.py | 4 +- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/bfxapi/websockets/bfx_websocket.py b/bfxapi/websockets/bfx_websocket.py index 07771bd..49febc5 100644 --- a/bfxapi/websockets/bfx_websocket.py +++ b/bfxapi/websockets/bfx_websocket.py @@ -266,13 +266,13 @@ class BfxWebsocket(GenericWebsocket): socketId, ERRORS[data.get('code', 10000)], data.get("msg", "")) - await self._emit(Exception(err_string)) + self._emit(Exception(err_string)) async def _system_auth_handler(self, socketId, data): if data.get('status') == 'FAILED': raise AuthError(ERRORS[data.get('code')]) else: - await self._emit('authenticated', data) + self._emit('authenticated', data) self.logger.info("Authentication successful.") async def _trade_update_handler(self, data): @@ -281,7 +281,7 @@ class BfxWebsocket(GenericWebsocket): if self.subscriptionManager.is_subscribed(data[0]): symbol = self.subscriptionManager.get(data[0]).symbol tradeObj = _parse_trade(tData, symbol) - await self._emit('trade_update', tradeObj) + self._emit('trade_update', tradeObj) async def _trade_executed_handler(self, data): tData = data[2] @@ -289,28 +289,28 @@ class BfxWebsocket(GenericWebsocket): if self.subscriptionManager.is_subscribed(data[0]): symbol = self.subscriptionManager.get(data[0]).symbol tradeObj = _parse_trade(tData, symbol) - await self._emit('new_trade', tradeObj) + self._emit('new_trade', tradeObj) async def _wallet_update_handler(self, data): # [0,"wu",["exchange","USD",89134.66933283,0]] uw = self.wallets._update_from_event(data) - await self._emit('wallet_update', uw) + self._emit('wallet_update', uw) self.logger.info("Wallet update: {}".format(uw)) async def _heart_beat_handler(self, data): self.logger.debug("Heartbeat - {}".format(self.host)) async def _margin_info_update_handler(self, data): - await self._emit('margin_info_update', data) + self._emit('margin_info_update', data) self.logger.info("Margin info update: {}".format(data)) async def _funding_info_update_handler(self, data): - await self._emit('funding_info_update', data) + self._emit('funding_info_update', data) self.logger.info("Funding info update: {}".format(data)) async def _notification_handler(self, data): nInfo = data[2] - await self._emit('notification', nInfo) + self._emit('notification', nInfo) notificationType = nInfo[6] notificationText = nInfo[7] if notificationType == 'ERROR': @@ -324,7 +324,7 @@ class BfxWebsocket(GenericWebsocket): async def _balance_update_handler(self, data): self.logger.info('Balance update: {}'.format(data[2])) - await self._emit('balance_update', data[2]) + self._emit('balance_update', data[2]) async def _order_closed_handler(self, data): await self.orderManager.confirm_order_closed(data) @@ -343,34 +343,34 @@ class BfxWebsocket(GenericWebsocket): async def _wallet_snapshot_handler(self, data): wallets = self.wallets._update_from_snapshot(data) - await self._emit('wallet_snapshot', wallets) + self._emit('wallet_snapshot', wallets) async def _position_snapshot_handler(self, data): - await self._emit('position_snapshot', data) + self._emit('position_snapshot', data) self.logger.info("Position snapshot: {}".format(data)) async def _position_update_handler(self, data): - await self._emit('position_update', data) + self._emit('position_update', data) self.logger.info("Position update: {}".format(data)) async def _position_close_handler(self, data): - await self._emit('position_close', data) + self._emit('position_close', data) self.logger.info("Position close: {}".format(data)) async def _position_new_handler(self, data): - await self._emit('position_new', data) + self._emit('position_new', data) self.logger.info("Position new: {}".format(data)) async def _funding_offer_snapshot_handler(self, data): - await self._emit('funding_offer_snapshot', data) + self._emit('funding_offer_snapshot', data) self.logger.info("Funding offer snapshot: {}".format(data)) async def _funding_load_snapshot_handler(self, data): - await self._emit('funding_loan_snapshot', data[2]) + self._emit('funding_loan_snapshot', data[2]) self.logger.info("Funding loan snapshot: {}".format(data)) async def _funding_credit_snapshot_handler(self, data): - await self._emit('funding_credit_snapshot', data[2]) + self._emit('funding_credit_snapshot', data[2]) self.logger.info("Funding credit snapshot: {}".format(data)) async def _status_handler(self, data): @@ -381,7 +381,7 @@ class BfxWebsocket(GenericWebsocket): if status_type == "deriv": status = _parse_deriv_status_update(rstatus, symbol) if status: - await self._emit('status_update', status) + self._emit('status_update', status) else: self.logger.warn('Unknown status data type: {}'.format(data)) @@ -392,13 +392,13 @@ class BfxWebsocket(GenericWebsocket): t = None if symbol[0] == 't': t = Ticker.from_raw_ticker(raw_ticker, symbol) - await self._emit('new_trading_ticker', t) + self._emit('new_trading_ticker', t) elif symbol[0] == 'f': t = FundingTicker.from_raw_ticker(raw_ticker, symbol) - await self._emit('new_funding_ticker', t) + self._emit('new_funding_ticker', t) else: self.logger.warn('Unknown ticker type: {}'.format(raw_ticker)) - await self._emit('new_ticker', t) + self._emit('new_ticker', t) async def _trade_handler(self, data): symbol = self.subscriptionManager.get(data[0]).symbol @@ -414,7 +414,7 @@ class BfxWebsocket(GenericWebsocket): 'price': t[3], 'symbol': symbol } - await self._emit('seed_trade', trade) + self._emit('seed_trade', trade) async def _candle_handler(self, data): subscription = self.subscriptionManager.get(data[0]) @@ -429,11 +429,11 @@ class BfxWebsocket(GenericWebsocket): for c in candlesSnapshot: candle = _parse_candle( c, subscription.symbol, subscription.timeframe) - await self._emit('seed_candle', candle) + self._emit('seed_candle', candle) else: candle = _parse_candle( data[1], subscription.symbol, subscription.timeframe) - await self._emit('new_candle', candle) + self._emit('new_candle', candle) async def _order_book_handler(self, data, orig_raw_message): obInfo = data[1] @@ -461,17 +461,17 @@ class BfxWebsocket(GenericWebsocket): if isSnapshot: self.orderBooks[symbol] = OrderBook() self.orderBooks[symbol].update_from_snapshot(obInfo, orig_raw_message) - await self._emit('order_book_snapshot', { + self._emit('order_book_snapshot', { 'symbol': symbol, 'data': obInfo}) else: self.orderBooks[symbol].update_with(obInfo, orig_raw_message) - await self._emit('order_book_update', {'symbol': symbol, 'data': obInfo}) + self._emit('order_book_update', {'symbol': symbol, 'data': obInfo}) async def on_message(self, socketId, message): self.logger.debug(message) # convert float values to decimal msg = json.loads(message, parse_float=self.parse_float) - await self._emit('all', msg) + self._emit('all', msg) if type(msg) is dict: # System messages are received as json await self._ws_system_handler(socketId, msg) @@ -495,7 +495,7 @@ class BfxWebsocket(GenericWebsocket): self.logger.info("Websocket opened.") if len(self.sockets) == 1: ## only call on first connection - await self._emit('connected') + self._emit('connected') # Orders are simulated in backtest mode if self.API_KEY and self.API_SECRET and self.get_authenticated_socket() == None: await self._ws_authenticate_socket(socket_id) diff --git a/bfxapi/websockets/generic_websocket.py b/bfxapi/websockets/generic_websocket.py index 91a9f0a..f155278 100644 --- a/bfxapi/websockets/generic_websocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -189,9 +189,9 @@ class GenericWebsocket: return self.events.once(event) self.events.once(event, func) - async def _emit(self, event, *args, **kwargs): + def _emit(self, event, *args, **kwargs): if type(event) == Exception: - await self.on_error(event) + self.logger.error(event) self.events.emit(event, *args, **kwargs) async def on_error(self, error):