refactoring

This commit is contained in:
itsdeka
2021-11-29 09:49:33 +01:00
parent e05e1522c4
commit d1308dad7b
2 changed files with 30 additions and 30 deletions

View File

@@ -266,13 +266,13 @@ class BfxWebsocket(GenericWebsocket):
socketId, socketId,
ERRORS[data.get('code', 10000)], ERRORS[data.get('code', 10000)],
data.get("msg", "")) data.get("msg", ""))
await self._emit(Exception(err_string)) self._emit(Exception(err_string))
async def _system_auth_handler(self, socketId, data): async def _system_auth_handler(self, socketId, data):
if data.get('status') == 'FAILED': if data.get('status') == 'FAILED':
raise AuthError(ERRORS[data.get('code')]) raise AuthError(ERRORS[data.get('code')])
else: else:
await self._emit('authenticated', data) self._emit('authenticated', data)
self.logger.info("Authentication successful.") self.logger.info("Authentication successful.")
async def _trade_update_handler(self, data): async def _trade_update_handler(self, data):
@@ -281,7 +281,7 @@ class BfxWebsocket(GenericWebsocket):
if self.subscriptionManager.is_subscribed(data[0]): if self.subscriptionManager.is_subscribed(data[0]):
symbol = self.subscriptionManager.get(data[0]).symbol symbol = self.subscriptionManager.get(data[0]).symbol
tradeObj = _parse_trade(tData, symbol) tradeObj = _parse_trade(tData, symbol)
await self._emit('trade_update', tradeObj) self._emit('trade_update', tradeObj)
async def _trade_executed_handler(self, data): async def _trade_executed_handler(self, data):
tData = data[2] tData = data[2]
@@ -289,28 +289,28 @@ class BfxWebsocket(GenericWebsocket):
if self.subscriptionManager.is_subscribed(data[0]): if self.subscriptionManager.is_subscribed(data[0]):
symbol = self.subscriptionManager.get(data[0]).symbol symbol = self.subscriptionManager.get(data[0]).symbol
tradeObj = _parse_trade(tData, symbol) tradeObj = _parse_trade(tData, symbol)
await self._emit('new_trade', tradeObj) self._emit('new_trade', tradeObj)
async def _wallet_update_handler(self, data): async def _wallet_update_handler(self, data):
# [0,"wu",["exchange","USD",89134.66933283,0]] # [0,"wu",["exchange","USD",89134.66933283,0]]
uw = self.wallets._update_from_event(data) uw = self.wallets._update_from_event(data)
await self._emit('wallet_update', uw) self._emit('wallet_update', uw)
self.logger.info("Wallet update: {}".format(uw)) self.logger.info("Wallet update: {}".format(uw))
async def _heart_beat_handler(self, data): async def _heart_beat_handler(self, data):
self.logger.debug("Heartbeat - {}".format(self.host)) self.logger.debug("Heartbeat - {}".format(self.host))
async def _margin_info_update_handler(self, data): async def _margin_info_update_handler(self, data):
await self._emit('margin_info_update', data) self._emit('margin_info_update', data)
self.logger.info("Margin info update: {}".format(data)) self.logger.info("Margin info update: {}".format(data))
async def _funding_info_update_handler(self, data): async def _funding_info_update_handler(self, data):
await self._emit('funding_info_update', data) self._emit('funding_info_update', data)
self.logger.info("Funding info update: {}".format(data)) self.logger.info("Funding info update: {}".format(data))
async def _notification_handler(self, data): async def _notification_handler(self, data):
nInfo = data[2] nInfo = data[2]
await self._emit('notification', nInfo) self._emit('notification', nInfo)
notificationType = nInfo[6] notificationType = nInfo[6]
notificationText = nInfo[7] notificationText = nInfo[7]
if notificationType == 'ERROR': if notificationType == 'ERROR':
@@ -324,7 +324,7 @@ class BfxWebsocket(GenericWebsocket):
async def _balance_update_handler(self, data): async def _balance_update_handler(self, data):
self.logger.info('Balance update: {}'.format(data[2])) self.logger.info('Balance update: {}'.format(data[2]))
await self._emit('balance_update', data[2]) self._emit('balance_update', data[2])
async def _order_closed_handler(self, data): async def _order_closed_handler(self, data):
await self.orderManager.confirm_order_closed(data) await self.orderManager.confirm_order_closed(data)
@@ -343,34 +343,34 @@ class BfxWebsocket(GenericWebsocket):
async def _wallet_snapshot_handler(self, data): async def _wallet_snapshot_handler(self, data):
wallets = self.wallets._update_from_snapshot(data) wallets = self.wallets._update_from_snapshot(data)
await self._emit('wallet_snapshot', wallets) self._emit('wallet_snapshot', wallets)
async def _position_snapshot_handler(self, data): async def _position_snapshot_handler(self, data):
await self._emit('position_snapshot', data) self._emit('position_snapshot', data)
self.logger.info("Position snapshot: {}".format(data)) self.logger.info("Position snapshot: {}".format(data))
async def _position_update_handler(self, data): async def _position_update_handler(self, data):
await self._emit('position_update', data) self._emit('position_update', data)
self.logger.info("Position update: {}".format(data)) self.logger.info("Position update: {}".format(data))
async def _position_close_handler(self, data): async def _position_close_handler(self, data):
await self._emit('position_close', data) self._emit('position_close', data)
self.logger.info("Position close: {}".format(data)) self.logger.info("Position close: {}".format(data))
async def _position_new_handler(self, data): async def _position_new_handler(self, data):
await self._emit('position_new', data) self._emit('position_new', data)
self.logger.info("Position new: {}".format(data)) self.logger.info("Position new: {}".format(data))
async def _funding_offer_snapshot_handler(self, data): async def _funding_offer_snapshot_handler(self, data):
await self._emit('funding_offer_snapshot', data) self._emit('funding_offer_snapshot', data)
self.logger.info("Funding offer snapshot: {}".format(data)) self.logger.info("Funding offer snapshot: {}".format(data))
async def _funding_load_snapshot_handler(self, data): async def _funding_load_snapshot_handler(self, data):
await self._emit('funding_loan_snapshot', data[2]) self._emit('funding_loan_snapshot', data[2])
self.logger.info("Funding loan snapshot: {}".format(data)) self.logger.info("Funding loan snapshot: {}".format(data))
async def _funding_credit_snapshot_handler(self, data): async def _funding_credit_snapshot_handler(self, data):
await self._emit('funding_credit_snapshot', data[2]) self._emit('funding_credit_snapshot', data[2])
self.logger.info("Funding credit snapshot: {}".format(data)) self.logger.info("Funding credit snapshot: {}".format(data))
async def _status_handler(self, data): async def _status_handler(self, data):
@@ -381,7 +381,7 @@ class BfxWebsocket(GenericWebsocket):
if status_type == "deriv": if status_type == "deriv":
status = _parse_deriv_status_update(rstatus, symbol) status = _parse_deriv_status_update(rstatus, symbol)
if status: if status:
await self._emit('status_update', status) self._emit('status_update', status)
else: else:
self.logger.warn('Unknown status data type: {}'.format(data)) self.logger.warn('Unknown status data type: {}'.format(data))
@@ -392,13 +392,13 @@ class BfxWebsocket(GenericWebsocket):
t = None t = None
if symbol[0] == 't': if symbol[0] == 't':
t = Ticker.from_raw_ticker(raw_ticker, symbol) t = Ticker.from_raw_ticker(raw_ticker, symbol)
await self._emit('new_trading_ticker', t) self._emit('new_trading_ticker', t)
elif symbol[0] == 'f': elif symbol[0] == 'f':
t = FundingTicker.from_raw_ticker(raw_ticker, symbol) t = FundingTicker.from_raw_ticker(raw_ticker, symbol)
await self._emit('new_funding_ticker', t) self._emit('new_funding_ticker', t)
else: else:
self.logger.warn('Unknown ticker type: {}'.format(raw_ticker)) self.logger.warn('Unknown ticker type: {}'.format(raw_ticker))
await self._emit('new_ticker', t) self._emit('new_ticker', t)
async def _trade_handler(self, data): async def _trade_handler(self, data):
symbol = self.subscriptionManager.get(data[0]).symbol symbol = self.subscriptionManager.get(data[0]).symbol
@@ -414,7 +414,7 @@ class BfxWebsocket(GenericWebsocket):
'price': t[3], 'price': t[3],
'symbol': symbol 'symbol': symbol
} }
await self._emit('seed_trade', trade) self._emit('seed_trade', trade)
async def _candle_handler(self, data): async def _candle_handler(self, data):
subscription = self.subscriptionManager.get(data[0]) subscription = self.subscriptionManager.get(data[0])
@@ -429,11 +429,11 @@ class BfxWebsocket(GenericWebsocket):
for c in candlesSnapshot: for c in candlesSnapshot:
candle = _parse_candle( candle = _parse_candle(
c, subscription.symbol, subscription.timeframe) c, subscription.symbol, subscription.timeframe)
await self._emit('seed_candle', candle) self._emit('seed_candle', candle)
else: else:
candle = _parse_candle( candle = _parse_candle(
data[1], subscription.symbol, subscription.timeframe) data[1], subscription.symbol, subscription.timeframe)
await self._emit('new_candle', candle) self._emit('new_candle', candle)
async def _order_book_handler(self, data, orig_raw_message): async def _order_book_handler(self, data, orig_raw_message):
obInfo = data[1] obInfo = data[1]
@@ -461,17 +461,17 @@ class BfxWebsocket(GenericWebsocket):
if isSnapshot: if isSnapshot:
self.orderBooks[symbol] = OrderBook() self.orderBooks[symbol] = OrderBook()
self.orderBooks[symbol].update_from_snapshot(obInfo, orig_raw_message) self.orderBooks[symbol].update_from_snapshot(obInfo, orig_raw_message)
await self._emit('order_book_snapshot', { self._emit('order_book_snapshot', {
'symbol': symbol, 'data': obInfo}) 'symbol': symbol, 'data': obInfo})
else: else:
self.orderBooks[symbol].update_with(obInfo, orig_raw_message) self.orderBooks[symbol].update_with(obInfo, orig_raw_message)
await self._emit('order_book_update', {'symbol': symbol, 'data': obInfo}) self._emit('order_book_update', {'symbol': symbol, 'data': obInfo})
async def on_message(self, socketId, message): async def on_message(self, socketId, message):
self.logger.debug(message) self.logger.debug(message)
# convert float values to decimal # convert float values to decimal
msg = json.loads(message, parse_float=self.parse_float) msg = json.loads(message, parse_float=self.parse_float)
await self._emit('all', msg) self._emit('all', msg)
if type(msg) is dict: if type(msg) is dict:
# System messages are received as json # System messages are received as json
await self._ws_system_handler(socketId, msg) await self._ws_system_handler(socketId, msg)
@@ -495,7 +495,7 @@ class BfxWebsocket(GenericWebsocket):
self.logger.info("Websocket opened.") self.logger.info("Websocket opened.")
if len(self.sockets) == 1: if len(self.sockets) == 1:
## only call on first connection ## only call on first connection
await self._emit('connected') self._emit('connected')
# Orders are simulated in backtest mode # Orders are simulated in backtest mode
if self.API_KEY and self.API_SECRET and self.get_authenticated_socket() == None: if self.API_KEY and self.API_SECRET and self.get_authenticated_socket() == None:
await self._ws_authenticate_socket(socket_id) await self._ws_authenticate_socket(socket_id)

View File

@@ -189,9 +189,9 @@ class GenericWebsocket:
return self.events.once(event) return self.events.once(event)
self.events.once(event, func) self.events.once(event, func)
async def _emit(self, event, *args, **kwargs): def _emit(self, event, *args, **kwargs):
if type(event) == Exception: if type(event) == Exception:
await self.on_error(event) self.logger.error(event)
self.events.emit(event, *args, **kwargs) self.events.emit(event, *args, **kwargs)
async def on_error(self, error): async def on_error(self, error):