Files
bitfinex-api-py/bfxapi/websockets/bfx_websocket.py

694 lines
27 KiB
Python

"""
Module used to house the bitfine websocket client
"""
import asyncio
import json
import time
import random
from .generic_websocket import GenericWebsocket, AuthError
from .subscription_manager import SubscriptionManager
from .wallet_manager import WalletManager
from .order_manager import OrderManager
from ..utils.auth import generate_auth_payload
from ..models import Order, Trade, OrderBook
class Flags:
"""
Enum used to index the available flags used in the authentication
websocket packet
"""
DEC_S = 9
TIME_S = 32
TIMESTAMP = 32768
SEQ_ALL = 65536
CHECKSUM = 131072
strings = {
9: 'DEC_S',
32: 'TIME_S',
32768: 'TIMESTAMP',
65536: 'SEQ_ALL',
131072: 'CHECKSUM'
}
def _parse_candle(cData, symbol, tf):
return {
'mts': cData[0],
'open': cData[1],
'close': cData[2],
'high': cData[3],
'low': cData[4],
'volume': cData[5],
'symbol': symbol,
'tf': tf
}
def _parse_trade_snapshot_item(tData, symbol):
return {
'mts': tData[3],
'price': tData[4],
'amount': tData[5],
'symbol': symbol
}
def _parse_trade(tData, symbol):
return {
'mts': tData[1],
'price': tData[3],
'amount': tData[2],
'symbol': symbol
}
def _parse_deriv_status_update(sData, symbol):
return {
'symbol': symbol,
'status_type': 'deriv',
'mts': sData[0],
# placeholder
'deriv_price': sData[2],
'spot_price': sData[3],
# placeholder
'insurance_fund_balance': sData[5],
# placeholder
# placeholder
'funding_accrued': sData[8],
'funding_step': sData[9],
# placeholder
}
ERRORS = {
10000: 'Unknown event',
10001: 'Generic error',
10008: 'Concurrency error',
10020: 'Request parameters error',
10050: 'Configuration setup failed',
10100: 'Failed authentication',
10111: 'Error in authentication request payload',
10112: 'Error in authentication request signature',
10113: 'Error in authentication request encryption',
10114: 'Error in authentication request nonce',
10200: 'Error in un-authentication request',
10300: 'Subscription Failed (generic)',
10301: 'Already Subscribed',
10302: 'Unknown channel',
10400: 'Subscription Failed (generic)',
10401: 'Not subscribed',
11000: 'Not ready, try again later',
20000: 'User is invalid!',
20051: 'Websocket server stopping',
20060: 'Websocket server resyncing',
20061: 'Websocket server resync complete'
}
class BfxWebsocket(GenericWebsocket):
"""
More complex websocket that heavily relies on the btfxwss module.
This websocket requires authentication and is capable of handling orders.
https://github.com/Crypto-toolbox/btfxwss
### Emitter events:
- `all` (array|Object): listen for all messages coming through
- `connected:` () called when a connection is made
- `disconnected`: () called when a connection is ended (A reconnect attempt may follow)
- `stopped`: () called when max amount of connection retries is met and the socket is closed
- `authenticated` (): called when the websocket passes authentication
- `notification` (Notification): incoming account notification
- `error` (array): error from the websocket
- `order_closed` (Order, Trade): when an order has been closed
- `order_new` (Order, Trade): when an order has been created but not closed. Note: will not be called if order is executed and filled instantly
- `order_confirmed` (Order, Trade): When an order has been submitted and received
- `wallet_snapshot` (array[Wallet]): Initial wallet balances (Fired once)
- `order_snapshot` (array[Order]): Initial open orders (Fired once)
- `positions_snapshot` (array): Initial open positions (Fired once)
- `wallet_update` (Wallet): changes to the balance of wallets
- `status_update` (Object): new platform status info
- `seed_candle` (Object): initial past candle to prime strategy
- `seed_trade` (Object): initial past trade to prime strategy
- `funding_offer_snapshot` (array): opening funding offer balances
- `funding_loan_snapshot` (array): opening funding loan balances
- `funding_credit_snapshot` (array): opening funding credit balances
- `balance_update` (array): when the state of a balance is changed
- `new_trade` (array): a new trade on the market has been executed
- `trade_update` (array): a trade on the market has been updated
- `new_candle` (array): a new candle has been produced
- `margin_info_updates` (array): new margin information has been broadcasted
- `funding_info_updates` (array): new funding information has been broadcasted
- `order_book_snapshot` (array): initial snapshot of the order book on connection
- `order_book_update` (array): a new order has been placed into the ordebrook
- `subscribed` (Subscription): a new channel has been subscribed to
- `unsubscribed` (Subscription): a channel has been un-subscribed
"""
def __init__(self, API_KEY=None, API_SECRET=None, host='wss://api-pub.bitfinex.com/ws/2',
manageOrderBooks=False, dead_man_switch=False, ws_capacity=25, logLevel='INFO', parse_float=float,
*args, **kwargs):
self.API_KEY = API_KEY
self.API_SECRET = API_SECRET
self.manageOrderBooks = manageOrderBooks
self.dead_man_switch = dead_man_switch
self.pendingOrders = {}
self.orderBooks = {}
self.ws_capacity = ws_capacity
# How should we store float values? could also be bfxapi.decimal
# which is slower but has higher precision.
self.parse_float = parse_float
super(BfxWebsocket, self).__init__(host, logLevel=logLevel, *args, **kwargs)
self.subscriptionManager = SubscriptionManager(self, logLevel=logLevel)
self.orderManager = OrderManager(self, logLevel=logLevel)
self.wallets = WalletManager()
self._WS_DATA_HANDLERS = {
'tu': self._trade_update_handler,
'wu': self._wallet_update_handler,
'hb': self._heart_beat_handler,
'te': self._trade_executed_handler,
'oc': self._order_closed_handler,
'ou': self._order_update_handler,
'on': self._order_new_handler,
'os': self._order_snapshot_handler,
'ws': self._wallet_snapshot_handler,
'ps': self._position_snapshot_handler,
'fos': self._funding_offer_snapshot_handler,
'fcs': self._funding_credit_snapshot_handler,
'fls': self._funding_load_snapshot_handler,
'bu': self._balance_update_handler,
'n': self._notification_handler,
'miu': self._margin_info_update_handler,
'fiu': self._funding_info_update_handler
}
self._WS_SYSTEM_HANDLERS = {
'info': self._system_info_handler,
'subscribed': self._system_subscribed_handler,
'unsubscribed': self._system_unsubscribe_handler,
'error': self._system_error_handler,
'auth': self._system_auth_handler,
'conf': self._system_conf_handler
}
async def _ws_system_handler(self, socketId, msg):
eType = msg.get('event')
if eType in self._WS_SYSTEM_HANDLERS:
await self._WS_SYSTEM_HANDLERS[eType](socketId, msg)
else:
self.logger.warn(
"Unknown websocket event (socketId={}): '{}' {}".format(socketId, eType, msg))
async def _ws_data_handler(self, socketId, data, raw_message_str):
dataEvent = data[1]
chan_id = data[0]
if type(dataEvent) is str and dataEvent in self._WS_DATA_HANDLERS:
return await self._WS_DATA_HANDLERS[dataEvent](data)
elif self.subscriptionManager.is_subscribed(chan_id):
subscription = self.subscriptionManager.get(chan_id)
# candles do not have an event
if subscription.channel_name == 'candles':
await self._candle_handler(data)
if subscription.channel_name == 'book':
await self._order_book_handler(data, raw_message_str)
if subscription.channel_name == 'trades':
await self._trade_handler(data)
if subscription.channel_name == 'status':
await self._status_handler(data)
else:
self.logger.warn(
"Unknown data event: '{}' {}".format(dataEvent, data))
async def _system_info_handler(self, socketId, data):
self.logger.info(data)
if data.get('serverId', None):
# connection has been established
await self.on_open(socketId)
async def _system_conf_handler(self, socketId, data):
flag = data.get('flags')
status = data.get('status')
if flag not in Flags.strings:
self.logger.warn("Unknown config value set {}".format(flag))
return
flagString = Flags.strings[flag]
if status == "OK":
self.logger.info("Enabled config flag {}".format(flagString))
else:
self.logger.error(
"Unable to enable config flag {}".format(flagString))
async def _system_subscribed_handler(self, socket_id, data):
await self.subscriptionManager.confirm_subscription(socket_id, data)
async def _system_unsubscribe_handler(self, socket_id, data):
await self.subscriptionManager.confirm_unsubscribe(socket_id, data)
async def _system_error_handler(self, socketId, data):
err_string = ERRORS[data.get('code', 10000)]
err_string = "(socketId={}) {} - {}".format(
socketId,
ERRORS[data.get('code', 10000)],
data.get("msg", ""))
self._emit('error', err_string)
async def _system_auth_handler(self, socketId, data):
if data.get('status') == 'FAILED':
raise AuthError(ERRORS[data.get('code')])
else:
self._emit('authenticated', data)
self.logger.info("Authentication successful.")
async def _trade_update_handler(self, data):
tData = data[2]
# [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]]
if self.subscriptionManager.is_subscribed(data[0]):
symbol = self.subscriptionManager.get(data[0]).symbol
tradeObj = _parse_trade(tData, symbol)
self._emit('trade_update', tradeObj)
async def _trade_executed_handler(self, data):
tData = data[2]
# [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]]
if self.subscriptionManager.is_subscribed(data[0]):
symbol = self.subscriptionManager.get(data[0]).symbol
tradeObj = _parse_trade(tData, symbol)
self._emit('new_trade', tradeObj)
async def _wallet_update_handler(self, data):
# [0,"wu",["exchange","USD",89134.66933283,0]]
uw = self.wallets._update_from_event(data)
self._emit('wallet_update', uw)
self.logger.info("Wallet update: {}".format(uw))
async def _heart_beat_handler(self, data):
self.logger.debug("Heartbeat - {}".format(self.host))
async def _margin_info_update_handler(self, data):
self._emit('margin_info_update', data)
self.logger.info("Margin info update: {}".format(data))
async def _funding_info_update_handler(self, data):
self._emit('funding_info_update', data)
self.logger.info("Funding info update: {}".format(data))
async def _notification_handler(self, data):
nInfo = data[2]
self._emit('notification', nInfo)
notificationType = nInfo[6]
notificationText = nInfo[7]
if notificationType == 'ERROR':
# self._emit('error', notificationText)
self.logger.error(
"Notification ERROR: {}".format(notificationText))
else:
self.logger.info(
"Notification SUCCESS: {}".format(notificationText))
async def _balance_update_handler(self, data):
self.logger.info('Balance update: {}'.format(data[2]))
self._emit('balance_update', data[2])
async def _order_closed_handler(self, data):
await self.orderManager.confirm_order_closed(data)
async def _order_update_handler(self, data):
await self.orderManager.confirm_order_update(data)
async def _order_new_handler(self, data):
await self.orderManager.confirm_order_new(data)
async def _order_snapshot_handler(self, data):
await self.orderManager.build_from_order_snapshot(data)
async def _wallet_snapshot_handler(self, data):
wallets = self.wallets._update_from_snapshot(data)
self._emit('wallet_snapshot', wallets)
async def _position_snapshot_handler(self, data):
self._emit('position_snapshot', data)
self.logger.info("Position snapshot: {}".format(data))
async def _funding_offer_snapshot_handler(self, data):
self._emit('funding_offer_snapshot', data)
self.logger.info("Funding offer snapshot: {}".format(data))
async def _funding_load_snapshot_handler(self, data):
self._emit('funding_loan_snapshot', data[2])
self.logger.info("Funding loan snapshot: {}".format(data))
async def _funding_credit_snapshot_handler(self, data):
self._emit('funding_credit_snapshot', data[2])
self.logger.info("Funding credit snapshot: {}".format(data))
async def _status_handler(self, data):
sub = self.subscriptionManager.get(data[0])
symbol = sub.symbol
status_type = sub.key.split(":")[0]
rstatus = data[1]
if status_type == "deriv":
status = _parse_deriv_status_update(rstatus, symbol)
if status:
self._emit('status_update', status)
else:
self.logger.warn('Unknown status data type: {}'.format(data))
async def _trade_handler(self, data):
symbol = self.subscriptionManager.get(data[0]).symbol
if type(data[1]) is list:
data = data[1]
# Process the batch of seed trades on
# connection
data.reverse()
for t in data:
trade = {
'mts': t[1],
'amount': t[2],
'price': t[3],
'symbol': symbol
}
self._emit('seed_trade', trade)
async def _candle_handler(self, data):
subscription = self.subscriptionManager.get(data[0])
# if candle data is empty
if data[1] == []:
return
if type(data[1][0]) is list:
# Process the batch of seed candles on
# websocket subscription
candlesSnapshot = data[1]
candlesSnapshot.reverse()
for c in candlesSnapshot:
candle = _parse_candle(
c, subscription.symbol, subscription.timeframe)
self._emit('seed_candle', candle)
else:
candle = _parse_candle(
data[1], subscription.symbol, subscription.timeframe)
self._emit('new_candle', candle)
async def _order_book_handler(self, data, orig_raw_message):
obInfo = data[1]
chan_id = data[0]
subscription = self.subscriptionManager.get(data[0])
symbol = subscription.symbol
if data[1] == "cs":
dChecksum = data[2] & 0xffffffff # force to signed int
checksum = self.orderBooks[symbol].checksum()
# force checksums to signed integers
isValid = (dChecksum) == (checksum)
if isValid:
msg = "Checksum orderbook validation for '{}' successful."
self.logger.debug(msg.format(symbol))
else:
msg = "Checksum orderbook invalid for '{}'. Resetting subscription."
self.logger.warn(msg.format(symbol))
# re-build orderbook with snapshot
await self.subscriptionManager.resubscribe(chan_id)
return
if obInfo == []:
self.orderBooks[symbol] = OrderBook()
return
isSnapshot = type(obInfo[0]) is list
if isSnapshot:
self.orderBooks[symbol] = OrderBook()
self.orderBooks[symbol].update_from_snapshot(obInfo, orig_raw_message)
self._emit('order_book_snapshot', {
'symbol': symbol, 'data': obInfo})
else:
self.orderBooks[symbol].update_with(obInfo, orig_raw_message)
self._emit('order_book_update', {'symbol': symbol, 'data': obInfo})
async def on_message(self, socketId, message):
self.logger.debug(message)
# convert float values to decimal
msg = json.loads(message, parse_float=self.parse_float)
self._emit('all', msg)
if type(msg) is dict:
# System messages are received as json
await self._ws_system_handler(socketId, msg)
elif type(msg) is list:
# All data messages are received as a list
await self._ws_data_handler(socketId, msg, message)
else:
self.logger.warn('Unknown (socketId={}) websocket response: {}'.format(socketId, msg))
async def _ws_authenticate_socket(self, socketId):
socket = self.sockets[socketId]
socket.set_authenticated()
jdata = generate_auth_payload(self.API_KEY, self.API_SECRET)
if self.dead_man_switch:
jdata['dms'] = 4
await socket.ws.send(json.dumps(jdata))
async def on_open(self, socket_id):
self.logger.info("Websocket opened.")
if len(self.sockets) == 1:
## only call on first connection
self._emit('connected')
# Orders are simulated in backtest mode
if self.API_KEY and self.API_SECRET and self.get_authenticated_socket() == None:
await self._ws_authenticate_socket(socket_id)
# enable order book checksums
if self.manageOrderBooks:
await self.enable_flag(Flags.CHECKSUM)
# set any existing subscriptions to not subscribed
self.subscriptionManager.set_unsubscribed_by_socket(socket_id)
# re-subscribe to existing channels
await self.subscriptionManager.resubscribe_by_socket(socket_id)
async def _send_auth_command(self, channel_name, data):
payload = [0, channel_name, None, data]
socket = self.get_authenticated_socket()
if socket == None:
raise ValueError("authenticated socket connection not found")
if not socket.isConnected:
raise ValueError("authenticated socket not connected")
await socket.ws.send(json.dumps(payload))
def get_orderbook(self, symbol):
return self.orderBooks.get(symbol, None)
def get_socket_capacity(self, socket_id):
return self.ws_capacity - self.subscriptionManager.get_sub_count_by_socket(socket_id)
def get_most_available_socket(self):
bestId = None
bestCount = 0
for socketId in self.sockets:
cap = self.get_socket_capacity(socketId)
if bestId == None or cap > bestCount:
bestId = socketId
bestCount = cap
return self.sockets[socketId]
def get_total_available_capcity(self):
total = 0
for socketId in self.sockets:
total += self.get_socket_capacity(socketId)
return total
async def enable_flag(self, flag):
"""
Enable flag on websocket connection
# Attributes
flag (int): int flag value
"""
payload = {
"event": 'conf',
"flags": flag
}
# enable on all sockets
for socket in self.sockets.values():
if socket.isConnected:
await socket.ws.send(json.dumps(payload))
async def subscribe_order_book(self, symbol):
"""
Subscribe to an orderbook data feed
# Attributes
@param symbol: the trading symbol i.e 'tBTCUSD'
"""
return await self.subscribe('book', symbol)
async def subscribe_candles(self, symbol, timeframe):
"""
Subscribe to a candle data feed
# Attributes
@param symbol: the trading symbol i.e 'tBTCUSD'
@param timeframe: resolution of the candle i.e 15m, 1h
"""
return await self.subscribe('candles', symbol, timeframe=timeframe)
async def subscribe_trades(self, symbol):
"""
Subscribe to a trades data feed
# Attributes
@param symbol: the trading symbol i.e 'tBTCUSD'
"""
return await self.subscribe('trades', symbol)
async def subscribe_ticker(self, symbol):
"""
Subscribe to a ticker data feed
# Attributes
@param symbol: the trading symbol i.e 'tBTCUSD'
"""
return await self.subscribe('ticker', symbol)
async def subscribe_derivative_status(self, symbol):
"""
Subscribe to a status data feed
# Attributes
@param symbol: the trading symbol i.e 'tBTCUSD'
"""
key = 'deriv:{}'.format(symbol)
return await self.subscribe('status', symbol, key=key)
async def subscribe(self, *args, **kwargs):
"""
Subscribe to a new channel
# Attributes
@param channel_name: the name of the channel i.e 'books', 'candles'
@param symbol: the trading symbol i.e 'tBTCUSD'
@param timeframe: sepecifies the data timeframe between each candle (only required
for the candles channel)
"""
return await self.subscriptionManager.subscribe(*args, **kwargs)
async def unsubscribe(self, *args, **kwargs):
"""
Unsubscribe from the channel with the given chanId
# Attributes
@param onComplete: function called when the bitfinex websocket resoponds with
a signal that confirms the subscription has been unsubscribed to
"""
return await self.subscriptionManager.unsubscribe(*args, **kwargs)
async def resubscribe(self, *args, **kwargs):
"""
Unsubscribes and then subscribes to the channel with the given Id
This function is mostly used to force the channel to produce a fresh snapshot.
"""
return await self.subscriptionManager.resubscribe(*args, **kwargs)
async def unsubscribe_all(self, *args, **kwargs):
"""
Unsubscribe from all channels.
"""
return await self.subscriptionManager.unsubscribe_all(*args, **kwargs)
async def resubscribe_all(self, *args, **kwargs):
"""
Unsubscribe and then subscribe to all channels
"""
return await self.subscriptionManager.resubscribe_all(*args, **kwargs)
async def submit_order(self, *args, **kwargs):
"""
Submit a new order
# Attributes
@param gid: assign the order to a group identifier
@param symbol: the name of the symbol i.e 'tBTCUSD
@param price: the price you want to buy/sell at (must be positive)
@param amount: order size: how much you want to buy/sell,
a negative amount indicates a sell order and positive a buy order
@param market_type Order.Type: please see Order.Type enum
amount decimal string Positive for buy, Negative for sell
@param hidden: if True, order should be hidden from orderbooks
@param price_trailing: decimal trailing price
@param price_aux_limit: decimal auxiliary Limit price (only for STOP LIMIT)
@param oco_stop_price: set the oco stop price (requires oco = True)
@param close: if True, close position if position present
@param reduce_only: if True, ensures that the executed order does not flip the opened position
@param post_only: if True, ensures the limit order will be added to the order book and not
match with a pre-existing order
@param oco: cancels other order option allows you to place a pair of orders stipulating
that if one order is executed fully or partially, then the other is automatically canceled
@param time_in_force: datetime for automatic order cancellation ie. 2020-01-01 10:45:23
@param leverage: the amount of leverage to apply to the order as an integer
@param onConfirm: function called when the bitfinex websocket receives signal that the order
was confirmed
@param onClose: function called when the bitfinex websocket receives signal that the order
was closed due to being filled or cancelled
"""
return await self.orderManager.submit_order(*args, **kwargs)
async def update_order(self, *args, **kwargs):
"""
Update an existing order
# Attributes
@param orderId: the id of the order that you want to update
@param price: the price you want to buy/sell at (must be positive)
@param amount: order size: how much you want to buy/sell,
a negative amount indicates a sell order and positive a buy order
@param delta: change of amount
@param price_trailing: decimal trailing price
@param price_aux_limit: decimal auxiliary Limit price (only for STOP LIMIT)
@param hidden: if True, order should be hidden from orderbooks
@param close: if True, close position if position present
@param reduce_only: if True, ensures that the executed order does not flip the opened position
@param post_only: if True, ensures the limit order will be added to the order book and not
match with a pre-existing order
@param time_in_force: datetime for automatic order cancellation ie. 2020-01-01 10:45:23
@param leverage: the amount of leverage to apply to the order as an integer
@param onConfirm: function called when the bitfinex websocket receives signal that the order
was confirmed
@param onClose: function called when the bitfinex websocket receives signal that the order
was closed due to being filled or cancelled
"""
return await self.orderManager.update_order(*args, **kwargs)
async def cancel_order(self, *args, **kwargs):
"""
Cancel an existing open order
# Attributes
@param orderId: the id of the order that you want to update
@param onConfirm: function called when the bitfinex websocket receives signal that the
order
was confirmed
@param onClose: function called when the bitfinex websocket receives signal that the order
was closed due to being filled or cancelled
"""
return await self.orderManager.cancel_order(*args, **kwargs)
async def cancel_order_group(self, *args, **kwargs):
"""
Cancel a set of orders using a single group id.
"""
return await self.orderManager.cancel_order_group(*args, **kwargs)
async def cancel_all_orders(self, *args, **kwargs):
"""
Cancel all existing open orders
This function closes all open orders.
"""
return await self.orderManager.cancel_all_orders(*args, **kwargs)
async def cancel_order_multi(self, *args, **kwargs):
"""
Cancel existing open orders as a batch
# Attributes
@param ids: an array of order ids
@param gids: an array of group ids
"""
return await self.orderManager.cancel_order_multi(*args, **kwargs)