Files
bitfinex-api-py/bfxapi/websockets/BfxWebsocket.py
2018-12-07 10:24:22 +00:00

402 lines
14 KiB
Python

import asyncio
import json
import time
import random
from .GenericWebsocket import GenericWebsocket, AuthError
from .SubscriptionManager import SubscriptionManager
from .WalletManager import WalletManager
from .OrderManager import OrderManager
from ..utils.auth import generate_auth_payload
from ..models import Order, Trade, OrderBook
class Flags:
DEC_S = 9
TIME_S = 32
TIMESTAMP = 32768
SEQ_ALL = 65536
CHECKSUM = 131072
strings = {
9: 'DEC_S',
32: 'TIME_S',
32768: 'TIMESTAMP',
65536: 'SEQ_ALL',
131072: 'CHECKSUM'
}
def _parse_candle(cData, symbol, tf):
return {
'mts': cData[0],
'open': cData[1],
'close': cData[2],
'high': cData[3],
'low': cData[4],
'volume': cData[5],
'symbol': symbol,
'tf': tf
}
def _parse_trade_snapshot_item(tData, symbol):
return {
'mts': tData[3],
'price': tData[4],
'amount': tData[5],
'symbol': symbol
}
def _parse_trade(tData, symbol):
return {
'mts': tData[1],
'price': tData[3],
'amount': tData[2],
'symbol': symbol
}
class BfxWebsocket(GenericWebsocket):
"""
More complex websocket that heavily relies on the btfxwss module. This websocket requires
authentication and is capable of handling orders.
https://github.com/Crypto-toolbox/btfxwss
"""
ERRORS = {
10000: 'Unknown event',
10001: 'Generic error',
10008: 'Concurrency error',
10020: 'Request parameters error',
10050: 'Configuration setup failed',
10100: 'Failed authentication',
10111: 'Error in authentication request payload',
10112: 'Error in authentication request signature',
10113: 'Error in authentication request encryption',
10114: 'Error in authentication request nonce',
10200: 'Error in un-authentication request',
10300: 'Subscription Failed (generic)',
10301: 'Already Subscribed',
10302: 'Unknown channel',
10400: 'Subscription Failed (generic)',
10401: 'Not subscribed',
11000: 'Not ready, try again later',
20000: 'User is invalid!',
20051: 'Websocket server stopping',
20060: 'Websocket server resyncing',
20061: 'Websocket server resync complete'
}
def __init__(self, API_KEY=None, API_SECRET=None, host='wss://api.bitfinex.com/ws/2', manageOrderBooks=False,
dead_man_switch=False, logLevel='INFO', *args, **kwargs):
self.API_KEY=API_KEY
self.API_SECRET=API_SECRET
self.manageOrderBooks = manageOrderBooks
self.dead_man_switch = dead_man_switch
self.pendingOrders = {}
self.orderBooks = {}
super(BfxWebsocket, self).__init__(host, logLevel=logLevel, *args, **kwargs)
self.subscriptionManager = SubscriptionManager(self, logLevel=logLevel)
self.orderManager = OrderManager(self, logLevel=logLevel)
self.wallets = WalletManager()
self._WS_DATA_HANDLERS = {
'tu': self._trade_update_handler,
'wu': self._wallet_update_handler,
'hb': self._heart_beat_handler,
'te': self._trade_executed_handler,
'oc': self._order_closed_handler,
'ou': self._order_update_handler,
'on': self._order_new_handler,
'os': self._order_snapshot_handler,
'ws': self._wallet_snapshot_handler,
'ps': self._position_snapshot_handler,
'fos': self._funding_offer_snapshot_handler,
'fcs': self._funding_credit_snapshot_handler,
'fls': self._funding_load_snapshot_handler,
'bu': self._balance_update_handler,
'n': self._notification_handler,
'miu': self._margin_info_update_handler,
'fiu': self._funding_info_update_handler
}
self._WS_SYSTEM_HANDLERS = {
'info': self._system_info_handler,
'subscribed': self._system_subscribed_handler,
'unsubscribed': self._system_unsubscribe_handler,
'error': self._system_error_handler,
'auth': self._system_auth_handler,
'conf': self._system_conf_handler
}
async def _ws_system_handler(self, msg):
eType = msg.get('event')
if eType in self._WS_SYSTEM_HANDLERS:
await self._WS_SYSTEM_HANDLERS[eType](msg)
else:
self.logger.warn("Unknown websocket event: '{}' {}".format(eType, msg))
async def _ws_data_handler(self, data):
dataEvent = data[1]
chanId = data[0]
if type(dataEvent) is str and dataEvent in self._WS_DATA_HANDLERS:
return await self._WS_DATA_HANDLERS[dataEvent](data)
elif self.subscriptionManager.is_subscribed(chanId):
subscription = self.subscriptionManager.get(chanId)
# candles do not have an event
if subscription.channel_name == 'candles':
await self._candle_handler(data)
if subscription.channel_name == 'book':
await self._order_book_handler(data)
else:
self.logger.warn("Unknown data event: '{}' {}".format(dataEvent, data))
async def _system_info_handler(self, data):
self.logger.info(data)
if data.get('serverId', None):
## connection has been established
await self.on_open()
async def _system_conf_handler(self, data):
flag = data.get('flags')
status = data.get('status')
if flag not in Flags.strings:
self.logger.warn("Unknown config value set {}".format(flag))
return
flagString = Flags.strings[flag]
if status == "OK":
self.logger.info("Enabled config flag {}".format(flagString))
else:
self.logger.error("Unable to enable config flag {}".format(flagString))
async def _system_subscribed_handler(self, data):
await self.subscriptionManager.confirm_subscription(data)
async def _system_unsubscribe_handler(self, data):
await self.subscriptionManager.confirm_unsubscribe(data)
async def _system_error_handler(self, data):
self._emit('error', data)
async def _system_auth_handler(self, data):
if data.get('status') == 'FAILED':
raise AuthError(self.ERRORS[data.get('code')])
else:
self._emit('authenticated', data)
self.logger.info("Authentication successful.")
async def _trade_update_handler(self, data):
tData = data[2]
# [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]]
if self.subscriptionManager.is_subscribed(data[0]):
symbol = self.subscriptionManager.get(data[0]).symbol
tradeObj = _parse_trade(tData, symbol)
self._emit('new_trade', tradeObj)
async def _trade_executed_handler(self, data):
tData = data[2]
# [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]]
if self.subscriptionManager.is_subscribed(data[0]):
symbol = self.subscriptionManager.get(data[0]).symbol
tradeObj = _parse_trade(tData, symbol)
self._emit('new_trade', tradeObj)
async def _wallet_update_handler(self, data):
# [0,"wu",["exchange","USD",89134.66933283,0]]
uw = self.wallets._update_from_event(data)
self._emit('wallet_update', uw)
self.logger.info("Wallet update: {}".format(uw))
async def _heart_beat_handler(self, data):
self.logger.debug("Heartbeat - {}".format(self.host))
async def _margin_info_update_handler(self, data):
self._emit('margin_info_update', data)
self.logger.info("Margin info update: {}".format(data))
async def _funding_info_update_handler(self, data):
self._emit('funding_info_update', data)
self.logger.info("Funding info update: {}".format(data))
async def _notification_handler(self, data):
# [0, 'n', [1542289340429, 'on-req', None, None,
# [1151350600, None, 1542289341196, 'tBTCUSD', None, None, 0.01, None, 'EXCHANGE MARKET',
# None, None, None, None, None, None, None, 18970, None, 0, 0, None, None, None, 0, None,
# None, None, None, None, None, None, None], None, 'SUCCESS', 'Submitting exchange market buy order for 0.01 BTC.']]
nInfo = data[2]
self._emit('notification', nInfo)
notificationType = nInfo[6]
notificationText = nInfo[7]
if notificationType == 'ERROR':
self._emit('error', notificationText)
self.logger.error("Notification ERROR: {}".format(notificationText))
else:
self.logger.info("Notification SUCCESS: {}".format(notificationText))
async def _balance_update_handler(self, data):
self.logger.info('Balance update: {}'.format(data[2]))
self._emit('balance_update', data[2])
async def _order_closed_handler(self, data):
await self.orderManager.confirm_order_closed(data)
async def _order_update_handler(self, data):
await self.orderManager.confirm_order_update(data)
async def _order_new_handler(self, data):
await self.orderManager.confirm_order_new(data)
async def _order_snapshot_handler(self, data):
await self.orderManager.build_from_order_snapshot(data)
async def _wallet_snapshot_handler(self, data):
wallets = self.wallets._update_from_snapshot(data)
self._emit('wallet_snapshot', wallets)
async def _position_snapshot_handler(self, data):
self._emit('position_snapshot', data)
self.logger.info("Position snapshot: {}".format(data))
async def _funding_offer_snapshot_handler(self, data):
self._emit('funding_offer_snapshot', data)
self.logger.info("Funding offer snapshot: {}".format(data))
async def _funding_load_snapshot_handler(self, data):
self._emit('funding_loan_snapshot', data[2])
self.logger.info("Funding loan snapshot: {}".format(data))
async def _funding_credit_snapshot_handler(self, data):
self._emit('funding_credit_snapshot', data[2])
self.logger.info("Funding credit snapshot: {}".format(data))
async def _trade_handler(self, data):
symbol = self.subscriptionManager.get(data[0]).symbol
if type(data[1]) is list:
data = data[1]
# Process the batch of seed trades on
# connection
data.reverse()
for t in data:
trade = {
'mts': t[1],
'price': t[2],
'amount': t[3],
'symbol': symbol
}
self._emit('seed_trade', trade)
else:
tradeObj = _parse_trade_snapshot_item(data, symbol)
self._emit('new_trade', tradeObj)
async def _candle_handler(self, data):
subscription = self.subscriptionManager.get(data[0])
if type(data[1][0]) is list:
# Process the batch of seed candles on
# websocket subscription
candlesSnapshot = data[1]
candlesSnapshot.reverse()
for c in candlesSnapshot:
candle = _parse_candle(c, subscription.symbol, subscription.timeframe)
self._emit('seed_candle', candle)
else:
candle = _parse_candle(data[1], subscription.symbol, subscription.timeframe)
self._emit('new_candle', candle)
async def _order_book_handler(self, data):
obInfo = data[1]
chanId = data[0]
subscription = self.subscriptionManager.get(data[0])
symbol = subscription.symbol
if data[1] == "cs":
dChecksum = data[2] & 0xffffffff # force to signed int
checksum = self.orderBooks[symbol].checksum()
# force checksums to signed integers
isValid = (dChecksum) == (checksum)
if isValid:
self.logger.debug("Checksum orderbook validation for '{}' successful."
.format(symbol))
else:
self.logger.warn("Checksum orderbook invalid for '{}'. Resetting subscription."
.format(symbol))
# re-build orderbook with snapshot
await self.subscriptionManager.resubscribe(chanId)
return
isSnapshot = type(obInfo[0]) is list
if isSnapshot:
self.orderBooks[symbol] = OrderBook()
self.orderBooks[symbol].updateFromSnapshot(obInfo)
self._emit('order_book_snapshot', { 'symbol': symbol, 'data': obInfo })
else:
self.orderBooks[symbol].updateWith(obInfo)
self._emit('order_book_update', { 'symbol': symbol, 'data': obInfo })
async def on_message(self, message):
self.logger.debug(message)
msg = json.loads(message)
self._emit('all', msg)
if type(msg) is dict:
# System messages are received as json
await self._ws_system_handler(msg)
elif type(msg) is list:
# All data messages are received as a list
await self._ws_data_handler(msg)
else:
self.logger.warn('Unknown websocket response: {}'.format(msg))
async def _ws_authenticate_socket(self):
jdata = generate_auth_payload(self.API_KEY, self.API_SECRET)
if self.dead_man_switch:
jdata['dms'] = 4
await self.ws.send(json.dumps(jdata))
async def on_open(self):
self.logger.info("Websocket opened.")
self._emit('connected')
# Orders are simulated in backtest mode
if self.API_KEY and self.API_SECRET:
await self._ws_authenticate_socket()
# enable order book checksums
if self.manageOrderBooks:
await self.enable_flag(Flags.CHECKSUM)
async def _send_auth_command(self, channel_name, data):
payload = [0, channel_name, None, data]
await self.ws.send(json.dumps(payload))
async def enable_flag(self, flag):
payload = {
"event": 'conf',
"flags": flag
}
await self.ws.send(json.dumps(payload))
async def subscribe(self, *args, **kwargs):
return await self.subscriptionManager.subscribe(*args, **kwargs)
async def unsubscribe(self, *args, **kwargs):
return await self.subscriptionManager.unsubscribe(*args, **kwargs)
async def resubscribe(self, *args, **kwargs):
return await self.subscriptionManager.resubscribe(*args, **kwargs)
async def unsubscribe_all(self, *args, **kwargs):
return await self.subscriptionManager.unsubscribe_all(*args, **kwargs)
async def resubscribe_all(self, *args, **kwargs):
return await self.subscriptionManager.resubscribe_all(*args, **kwargs)
async def submit_order(self, *args, **kwargs):
return await self.orderManager.submit_order(*args, **kwargs)
async def update_order(self, *args, **kwargs):
return await self.orderManager.update_order(*args, **kwargs)
async def cancel_order(self, *args, **kwargs):
return await self.orderManager.cancel_order(*args, **kwargs)
async def cancel_all_orders(self, *args, **kwargs):
return await self.orderManager.cancel_all_orders(*args, **kwargs)
async def cancel_order_multi(self, *args, **kwargs):
return await self.cancel_order_multi(*args, **kwargs)