import asyncio import json import time import hashlib import hmac import random from .GenericWebsocket import GenericWebsocket, AuthError from ..models import Order, Trade def _parse_candle(cData, symbol, tf): return { 'mts': cData[0], 'open': cData[1], 'close': cData[2], 'high': cData[3], 'low': cData[4], 'volume': cData[5], 'symbol': symbol, 'tf': tf } def _parse_trade_snapshot_item(tData, symbol): return { 'mts': tData[3], 'price': tData[4], 'amount': tData[5], 'symbol': symbol } def _parse_trade(tData, symbol): return { 'mts': tData[1], 'price': tData[3], 'amount': tData[2], 'symbol': symbol } class LiveBfxWebsocket(GenericWebsocket): ''' More complex websocket that heavily relies on the btfxwss module. This websocket requires authentication and is capable of handling orders. https://github.com/Crypto-toolbox/btfxwss Translation names: translation table for channel names: Data Channels os - Orders hos - Historical Orders ps - Positions hts - Trades (snapshot) te - Trade Executed tu - Trade Execution update ws - Wallets bu - Balance Info miu - Margin Info fiu - Funding Info fos - Offers hfos - Historical Offers fcs - Credits hfcs - Historical Credits fls - Loans hfls - Historical Loans htfs - Funding Trades n - Notifications (WIP) Events: - all: listen for all messages coming through - connected: called when a connection is made - authenticated: called when the websocket passes authentication - notification (array): incoming account notification - error (string): error from the websocket - order_closed (string): when an order confirmation is recieved - wallet_snapshot (string): Initial wallet balances (Fired once) - order_snapshot (string): Initial open orders (Fired once) - positions_snapshot (string): Initial open positions (Fired once) - wallet_update (string): changes to the balance of wallets - seed_candle (candleArray): initial past candle to prime strategy - seed_trade (tradeArray): initial past trade to prime strategy - funding_offer_snapshot: - funding_loan_snapshot: - funding_credit_snapshot: - balance_update when the state of a balance is changed - new_trade: a new trade on the market has been executed - new_candle: a new candle has been produced ''' ERRORS = { 10000: 'Unknown event', 10001: 'Generic error', 10008: 'Concurrency error', 10020: 'Request parameters error', 10050: 'Configuration setup failed', 10100: 'Failed authentication', 10111: 'Error in authentication request payload', 10112: 'Error in authentication request signature', 10113: 'Error in authentication request encryption', 10114: 'Error in authentication request nonce', 10200: 'Error in un-authentication request', 10300: 'Subscription Failed (generic)', 10301: 'Already Subscribed', 10302: 'Unknown channel', 10400: 'Subscription Failed (generic)', 10401: 'Not subscribed', 11000: 'Not ready, try again later', 20000: 'User is invalid!', 20051: 'Websocket server stopping', 20060: 'Websocket server resyncing', 20061: 'Websocket server resync complete' } def __init__(self, API_KEY=None, API_SECRET=None, backtest=False, host='wss://api.bitfinex.com/ws/2', onSeedCandleHook=None, onSeedTradeHook=None, *args, **kwargs): self.channels = {} self.API_KEY=API_KEY self.API_SECRET=API_SECRET self.backtest=backtest self.pendingOrders = {} super(LiveBfxWebsocket, self).__init__(host, *args, **kwargs) self._WS_DATA_HANDLERS = { 'tu': self._trade_update_handler, 'wu': self._wallet_update_handler, 'hb': self._heart_beat_handler, 'te': self._trade_event_handler, 'oc': self._order_closed_handler, 'os': self._order_snapshot_handler, 'ws': self._wallet_snapshot_handler, 'ps': self._position_snapshot_handler, 'fos': self._funding_offer_snapshot_handler, 'fcs': self._funding_credit_snapshot_handler, 'fls': self._funding_load_snapshot_handler, 'bu': self._balance_update_handler, 'n': self._notification_handler } self._WS_SYSTEM_HANDLERS = { 'info': self._system_info_handler, 'subscribed': self._system_subscribed_handler, 'error': self._system_error_handler, 'auth': self._system_auth_handler } async def _ws_system_handler(self, msg): eType = msg.get('event') if eType in self._WS_SYSTEM_HANDLERS: await self._WS_SYSTEM_HANDLERS[eType](msg) else: self.logger.warn('Unknown websocket event: {}'.format(eType)) async def _ws_data_handler(self, data): dataEvent = data[1] chanId = data[0] if type(dataEvent) is str and dataEvent in self._WS_DATA_HANDLERS: return await self._WS_DATA_HANDLERS[dataEvent](data) elif chanId in self.channels: # candles do not have an event if self.channels[chanId].get('channel') == 'candles': await self._candle_handler(data) else: self.logger.warn("Unknow data event: '{}' {}".format(dataEvent, data)) async def _system_info_handler(self, data): self.logger.info(data) if data.get('serverId', None): ## connection has been established await self.on_open() async def _system_subscribed_handler(self, data): chanEvent = data.get('channel') self.logger.info("Subscribed to channel '{}'".format(chanEvent)) ## add channel to known list chanId = data.get('chanId') ## if is a candles subscribption, then get the symbol ## from the key if data.get('key'): kd = data.get('key').split(':') data['tf'] = kd[1] data['symbol'] = kd[2] self.channels[chanId] = data async def _system_error_handler(self, data): self._emit('error', data) async def _system_auth_handler(self, data): if data.get('status') == 'FAILED': raise AuthError(self.ERRORS[data.get('code')]) else: self._emit('authenticated', data) self.logger.info("Authentication successful.") async def _trade_update_handler(self, data): tData = data[2] # [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]] channelData = self.channels[data[0]] tradeObj = _parse_trade(tData, channelData.get('symbol')) self._emit('new_trade', tradeObj) async def _trade_event_handler(self, data): tData = data[2] # [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]] channelData = self.channels[data[0]] tradeObj = _parse_trade(tData, channelData.get('symbol')) self._emit('new_trade', tradeObj) async def _wallet_update_handler(self, data): # [0,"wu",["exchange","USD",89134.66933283,0]] wu = data[2] self._emit('wallet_update', data) self.logger.info("Wallet update: {}({}) = {}".format(wu[1], wu[0], wu[2])) async def _heart_beat_handler(self, data): self.logger.debug("Heartbeat - {}".format(self.host)) async def _notification_handler(self, data): # [0, 'n', [1542289340429, 'on-req', None, None, # [1151350600, None, 1542289341196, 'tBTCUSD', None, None, 0.01, None, 'EXCHANGE MARKET', # None, None, None, None, None, None, None, 18970, None, 0, 0, None, None, None, 0, None, # None, None, None, None, None, None, None], None, 'SUCCESS', 'Submitting exchange market buy order for 0.01 BTC.']] nInfo = data[2] self._emit('notification', nInfo) notificationType = nInfo[6] notificationText = nInfo[7] if notificationType == 'ERROR': self._emit('error', notificationText) self.logger.error("Notification ERROR: {}".format(notificationText)) else: self.logger.info("Notification SUCCESS: {}".format(notificationText)) async def _balance_update_handler(self, data): self.logger.info('Balance update: {}'.format(data[2])) self._emit('balance_update', data[2]) async def _order_closed_handler(self, data): # [0,"oc",[1151349678,null,1542203391995,"tBTCUSD",1542203389940,1542203389966,0,0.1, # "EXCHANGE MARKET",null,null,null,0,"EXECUTED @ 18922.0(0.03299997): was PARTIALLY FILLED # @ 18909.0(0.06700003)",null,null,18909,18913.2899961,0,0,null,null,null,0,0,null,null,null, # "API>BFX",null,null,null]] tInfo = data[2] order = Order(tInfo) trade = Trade(order) self.logger.info("Order closed: {} {}".format(order.symbol, order.status)) self._emit('order_closed', order, trade) if order.cId in self.pendingOrders: if self.pendingOrders[order.cId][0]: await self.pendingOrders[order.cId][0](order, trade) del self.pendingOrders[order.cId] async def _order_snapshot_handler(self, data): self._emit('order_snapshot', data) self.logger.info("Position snapshot: {}".format(data)) async def _wallet_snapshot_handler(self, data): self._emit('wallet_snapshot', data[2]) self.logger.info("Wallet snapshot: {}".format(data)) async def _position_snapshot_handler(self, data): self._emit('position_snapshot', data) self.logger.info("Position snapshot: {}".format(data)) async def _funding_offer_snapshot_handler(self, data): self._emit('funding_offer_snapshot', data) self.logger.info("Funding offer snapshot: {}".format(data)) async def _funding_load_snapshot_handler(self, data): self._emit('funding_loan_snapshot', data[2]) self.logger.info("Funding loan snapshot: {}".format(data)) async def _funding_credit_snapshot_handler(self, data): self._emit('funding_credit_snapshot', data[2]) self.logger.info("Funding credit snapshot: {}".format(data)) async def _trade_handler(self, data): channelData = self.channels[data[0]] if type(data[1]) is list: data = data[1] # Process the batch of seed trades on # connection data.reverse() for t in data: trade = { 'mts': t[1], 'price': t[2], 'amount': t[3], 'symbol': channelData['symbol'] } self._emit('seed_trade', trade) else: tradeObj = _parse_trade_snapshot_item(data, channelData['symbol']) self._emit('new_trade', tradeObj) async def _candle_handler(self, data): chanId = data[0] channelData = self.channels[chanId] if type(data[1][0]) is list: # Process the batch of seed candles on # websocket subscription candlesSnapshot = data[1] candlesSnapshot.reverse() for c in candlesSnapshot: candle = _parse_candle(c, channelData['symbol'], channelData['tf']) self._emit('seed_candle', candle) else: candle = _parse_candle(data[1], channelData['symbol'], channelData['tf']) self._emit('new_candle', candle) async def on_message(self, message): self.logger.debug(message) msg = json.loads(message) self._emit('all', msg) if type(msg) is dict: # System messages are received as json await self._ws_system_handler(msg) elif type(msg) is list: # All data messages are received as a list await self._ws_data_handler(msg) else: self.logger.warn('Unknown websocket response: {}'.format(msg)) async def _ws_authenticate_socket(self): nonce = int(round(time.time() * 1000000)) authMsg = 'AUTH{}'.format(nonce) secret = self.API_SECRET.encode() sig = hmac.new(secret, authMsg.encode(), hashlib.sha384).hexdigest() hmac.new(secret, self.API_SECRET.encode('utf'), hashlib.sha384).hexdigest() jdata = { 'apiKey': self.API_KEY, 'authSig': sig, 'authNonce': nonce, 'authPayload': authMsg, 'event': 'auth' } await self.ws.send(json.dumps(jdata)) async def on_open(self): self.logger.info("Websocket opened.") self._emit('connected') # Orders are simulated in backtest mode if not self.backtest and self.API_KEY and self.API_SECRET: await self._ws_authenticate_socket() # subscribe to data feed # TODO: allow for multiple subscriptions # await self._subscribe('trades', symbol=self.symbol) # key = 'trade:1m:{}'.format(self.symbol) # await self._subscribe('candles', key=key, symbol=self.symbol) async def send_auth_command(self, channel_name, data): payload = [0, channel_name, None, data] await self.ws.send(json.dumps(payload)) def subscribe(self, channel_name, symbol, timeframe=None, **kwargs): q = {'event': 'subscribe', 'channel': channel_name, 'symbol': symbol} if timeframe: q['key'] = 'trade:{}:{}'.format(timeframe, symbol) q.update(**kwargs) self.logger.info("Subscribing to channel {}".format(channel_name)) # tmp = self.ws.send(json.dumps(q)) asyncio.ensure_future(self.ws.send(json.dumps(q))) async def submit_order(self, symbol, price, amount, market_type, hidden=False, onComplete=None, onError=None, *args, **kwargs): order_id = int(round(time.time() * 1000)) # send order over websocket payload = { "cid": order_id, "type": str(market_type), "symbol": symbol, "amount": str(amount), "price": str(price) } self.pendingOrders[order_id] = payload await self.send_auth_command('on', payload) self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( order_id, symbol, amount, price)) self.pendingOrders[order_id] = (onComplete, onError)