Create Client module

This commit is contained in:
Jacob Plaster
2018-11-19 16:51:00 +00:00
parent 02e232362b
commit aa50a399dd
5 changed files with 471 additions and 470 deletions

7
bfxapi/Client.py Normal file
View File

@@ -0,0 +1,7 @@
from .websockets.BfxWebsocket import BfxWebsocket
class Client:
def __init__(self, API_KEY=None, API_SECRET=None,
host='wss://test.bitfinex.com/ws/2', *args, **kwargs):
self.ws = BfxWebsocket(API_KEY=API_KEY, API_SECRET=API_SECRET, host=host, *args, **kwargs)
self.rest = None # Eventually will be the rest interface

View File

@@ -1,4 +1,4 @@
name = 'bfxapi' name = 'bfxapi'
from bfxapi.websockets.LiveWebsocket import LiveBfxWebsocket from bfxapi.Client import Client
from bfxapi.websockets.GenericWebsocket import GenericWebsocket from bfxapi.websockets.GenericWebsocket import GenericWebsocket

View File

@@ -47,5 +47,5 @@ class Order:
def __str__(self): def __str__(self):
''' Allow us to print the Order object in a pretty format ''' ''' Allow us to print the Order object in a pretty format '''
return "Order <'{0}' mtsCreate={1} {}>".format(self.symbol, self.mtsCreate, return "Order <'{0}' mtsCreate={1} {2}>".format(self.symbol, self.mtsCreate,
self.status) self.status)

View File

@@ -1,462 +1,461 @@
import asyncio import asyncio
import json import json
import time import time
import hashlib import hashlib
import hmac import hmac
import random import random
from .GenericWebsocket import GenericWebsocket, AuthError from .GenericWebsocket import GenericWebsocket, AuthError
from ..models import Order, Trade from ..models import Order, Trade
def _parse_candle(cData, symbol, tf): def _parse_candle(cData, symbol, tf):
return { return {
'mts': cData[0], 'mts': cData[0],
'open': cData[1], 'open': cData[1],
'close': cData[2], 'close': cData[2],
'high': cData[3], 'high': cData[3],
'low': cData[4], 'low': cData[4],
'volume': cData[5], 'volume': cData[5],
'symbol': symbol, 'symbol': symbol,
'tf': tf 'tf': tf
} }
def _parse_trade_snapshot_item(tData, symbol): def _parse_trade_snapshot_item(tData, symbol):
return { return {
'mts': tData[3], 'mts': tData[3],
'price': tData[4], 'price': tData[4],
'amount': tData[5], 'amount': tData[5],
'symbol': symbol 'symbol': symbol
} }
def _parse_trade(tData, symbol): def _parse_trade(tData, symbol):
return { return {
'mts': tData[1], 'mts': tData[1],
'price': tData[3], 'price': tData[3],
'amount': tData[2], 'amount': tData[2],
'symbol': symbol 'symbol': symbol
} }
class LiveBfxWebsocket(GenericWebsocket): class BfxWebsocket(GenericWebsocket):
''' '''
More complex websocket that heavily relies on the btfxwss module. This websocket requires More complex websocket that heavily relies on the btfxwss module. This websocket requires
authentication and is capable of handling orders. authentication and is capable of handling orders.
https://github.com/Crypto-toolbox/btfxwss https://github.com/Crypto-toolbox/btfxwss
Translation names: Translation names:
translation table for channel names: translation table for channel names:
Data Channels Data Channels
os - Orders os - Orders
hos - Historical Orders hos - Historical Orders
ps - Positions ps - Positions
hts - Trades (snapshot) hts - Trades (snapshot)
te - Trade Executed te - Trade Executed
tu - Trade Execution update tu - Trade Execution update
ws - Wallets ws - Wallets
bu - Balance Info bu - Balance Info
miu - Margin Info miu - Margin Info
fiu - Funding Info fiu - Funding Info
fos - Offers fos - Offers
hfos - Historical Offers hfos - Historical Offers
fcs - Credits fcs - Credits
hfcs - Historical Credits hfcs - Historical Credits
fls - Loans fls - Loans
hfls - Historical Loans hfls - Historical Loans
htfs - Funding Trades htfs - Funding Trades
n - Notifications (WIP) n - Notifications (WIP)
Events: Events:
- all: listen for all messages coming through - all: listen for all messages coming through
- connected: called when a connection is made - connected: called when a connection is made
- authenticated: called when the websocket passes authentication - authenticated: called when the websocket passes authentication
- notification (array): incoming account notification - notification (array): incoming account notification
- error (string): error from the websocket - error (string): error from the websocket
- order_closed (Order, Trade): when an order has been closed - order_closed (Order, Trade): when an order has been closed
- order_new (Order, Trade): when an order has been created but not closed. Note: will - order_new (Order, Trade): when an order has been created but not closed. Note: will
not be called if order is executed and filled instantly not be called if order is executed and filled instantly
- order_confirmed (Order, Trade): when an order has been submitted and received - order_confirmed (Order, Trade): when an order has been submitted and received
- wallet_snapshot (string): Initial wallet balances (Fired once) - wallet_snapshot (string): Initial wallet balances (Fired once)
- order_snapshot (string): Initial open orders (Fired once) - order_snapshot (string): Initial open orders (Fired once)
- positions_snapshot (string): Initial open positions (Fired once) - positions_snapshot (string): Initial open positions (Fired once)
- wallet_update (string): changes to the balance of wallets - wallet_update (string): changes to the balance of wallets
- seed_candle (candleArray): initial past candle to prime strategy - seed_candle (candleArray): initial past candle to prime strategy
- seed_trade (tradeArray): initial past trade to prime strategy - seed_trade (tradeArray): initial past trade to prime strategy
- funding_offer_snapshot: - funding_offer_snapshot:
- funding_loan_snapshot: - funding_loan_snapshot:
- funding_credit_snapshot: - funding_credit_snapshot:
- balance_update when the state of a balance is changed - balance_update when the state of a balance is changed
- new_trade: a new trade on the market has been executed - new_trade: a new trade on the market has been executed
- new_candle: a new candle has been produced - new_candle: a new candle has been produced
- margin_info_update: new margin information has been broadcasted - margin_info_update: new margin information has been broadcasted
- funding_info_update: new funding information has been broadcasted - funding_info_update: new funding information has been broadcasted
''' '''
ERRORS = { ERRORS = {
10000: 'Unknown event', 10000: 'Unknown event',
10001: 'Generic error', 10001: 'Generic error',
10008: 'Concurrency error', 10008: 'Concurrency error',
10020: 'Request parameters error', 10020: 'Request parameters error',
10050: 'Configuration setup failed', 10050: 'Configuration setup failed',
10100: 'Failed authentication', 10100: 'Failed authentication',
10111: 'Error in authentication request payload', 10111: 'Error in authentication request payload',
10112: 'Error in authentication request signature', 10112: 'Error in authentication request signature',
10113: 'Error in authentication request encryption', 10113: 'Error in authentication request encryption',
10114: 'Error in authentication request nonce', 10114: 'Error in authentication request nonce',
10200: 'Error in un-authentication request', 10200: 'Error in un-authentication request',
10300: 'Subscription Failed (generic)', 10300: 'Subscription Failed (generic)',
10301: 'Already Subscribed', 10301: 'Already Subscribed',
10302: 'Unknown channel', 10302: 'Unknown channel',
10400: 'Subscription Failed (generic)', 10400: 'Subscription Failed (generic)',
10401: 'Not subscribed', 10401: 'Not subscribed',
11000: 'Not ready, try again later', 11000: 'Not ready, try again later',
20000: 'User is invalid!', 20000: 'User is invalid!',
20051: 'Websocket server stopping', 20051: 'Websocket server stopping',
20060: 'Websocket server resyncing', 20060: 'Websocket server resyncing',
20061: 'Websocket server resync complete' 20061: 'Websocket server resync complete'
} }
def __init__(self, API_KEY=None, API_SECRET=None, backtest=False, host='wss://test.bitfinex.com/ws/2', def __init__(self, API_KEY=None, API_SECRET=None, host='wss://test.bitfinex.com/ws/2',
onSeedCandleHook=None, onSeedTradeHook=None, *args, **kwargs): onSeedCandleHook=None, onSeedTradeHook=None, *args, **kwargs):
self.channels = {} self.channels = {}
self.API_KEY=API_KEY self.API_KEY=API_KEY
self.API_SECRET=API_SECRET self.API_SECRET=API_SECRET
self.backtest=backtest self.pendingOrders = {}
self.pendingOrders = {}
super(BfxWebsocket, self).__init__(host, *args, **kwargs)
super(LiveBfxWebsocket, self).__init__(host, *args, **kwargs)
self._WS_DATA_HANDLERS = {
self._WS_DATA_HANDLERS = { 'tu': self._trade_update_handler,
'tu': self._trade_update_handler, 'wu': self._wallet_update_handler,
'wu': self._wallet_update_handler, 'hb': self._heart_beat_handler,
'hb': self._heart_beat_handler, 'te': self._trade_executed_handler,
'te': self._trade_executed_handler, 'oc': self._order_closed_handler,
'oc': self._order_closed_handler, 'ou': self._order_update_handler,
'ou': self._order_update_handler, 'on': self._order_new_handler,
'on': self._order_new_handler, 'os': self._order_snapshot_handler,
'os': self._order_snapshot_handler, 'ws': self._wallet_snapshot_handler,
'ws': self._wallet_snapshot_handler, 'ps': self._position_snapshot_handler,
'ps': self._position_snapshot_handler, 'fos': self._funding_offer_snapshot_handler,
'fos': self._funding_offer_snapshot_handler, 'fcs': self._funding_credit_snapshot_handler,
'fcs': self._funding_credit_snapshot_handler, 'fls': self._funding_load_snapshot_handler,
'fls': self._funding_load_snapshot_handler, 'bu': self._balance_update_handler,
'bu': self._balance_update_handler, 'n': self._notification_handler,
'n': self._notification_handler, 'miu': self._margin_info_update_handler,
'miu': self._margin_info_update_handler, 'fiu': self._funding_info_update_handler
'fiu': self._funding_info_update_handler }
}
self._WS_SYSTEM_HANDLERS = {
self._WS_SYSTEM_HANDLERS = { 'info': self._system_info_handler,
'info': self._system_info_handler, 'subscribed': self._system_subscribed_handler,
'subscribed': self._system_subscribed_handler, 'error': self._system_error_handler,
'error': self._system_error_handler, 'auth': self._system_auth_handler
'auth': self._system_auth_handler }
}
async def _ws_system_handler(self, msg):
async def _ws_system_handler(self, msg): eType = msg.get('event')
eType = msg.get('event') if eType in self._WS_SYSTEM_HANDLERS:
if eType in self._WS_SYSTEM_HANDLERS: await self._WS_SYSTEM_HANDLERS[eType](msg)
await self._WS_SYSTEM_HANDLERS[eType](msg) else:
else: self.logger.warn('Unknown websocket event: {}'.format(eType))
self.logger.warn('Unknown websocket event: {}'.format(eType))
async def _ws_data_handler(self, data):
async def _ws_data_handler(self, data): dataEvent = data[1]
dataEvent = data[1] chanId = data[0]
chanId = data[0]
if type(dataEvent) is str and dataEvent in self._WS_DATA_HANDLERS:
if type(dataEvent) is str and dataEvent in self._WS_DATA_HANDLERS: return await self._WS_DATA_HANDLERS[dataEvent](data)
return await self._WS_DATA_HANDLERS[dataEvent](data) elif chanId in self.channels:
elif chanId in self.channels: # candles do not have an event
# candles do not have an event if self.channels[chanId].get('channel') == 'candles':
if self.channels[chanId].get('channel') == 'candles': await self._candle_handler(data)
await self._candle_handler(data) else:
else: self.logger.warn("Unknow data event: '{}' {}".format(dataEvent, data))
self.logger.warn("Unknow data event: '{}' {}".format(dataEvent, data))
async def _system_info_handler(self, data):
async def _system_info_handler(self, data): self.logger.info(data)
self.logger.info(data) if data.get('serverId', None):
if data.get('serverId', None): ## connection has been established
## connection has been established await self.on_open()
await self.on_open()
async def _system_subscribed_handler(self, data):
async def _system_subscribed_handler(self, data): chanEvent = data.get('channel')
chanEvent = data.get('channel') self.logger.info("Subscribed to channel '{}'".format(chanEvent))
self.logger.info("Subscribed to channel '{}'".format(chanEvent)) ## add channel to known list
## add channel to known list chanId = data.get('chanId')
chanId = data.get('chanId') ## if is a candles subscribption, then get the symbol
## if is a candles subscribption, then get the symbol ## from the key
## from the key if data.get('key'):
if data.get('key'): kd = data.get('key').split(':')
kd = data.get('key').split(':') data['tf'] = kd[1]
data['tf'] = kd[1] data['symbol'] = kd[2]
data['symbol'] = kd[2]
self.channels[chanId] = data
self.channels[chanId] = data
async def _system_error_handler(self, data):
async def _system_error_handler(self, data): self._emit('error', data)
self._emit('error', data)
async def _system_auth_handler(self, data):
async def _system_auth_handler(self, data): if data.get('status') == 'FAILED':
if data.get('status') == 'FAILED': raise AuthError(self.ERRORS[data.get('code')])
raise AuthError(self.ERRORS[data.get('code')]) else:
else: self._emit('authenticated', data)
self._emit('authenticated', data) self.logger.info("Authentication successful.")
self.logger.info("Authentication successful.")
async def _trade_update_handler(self, data):
async def _trade_update_handler(self, data): tData = data[2]
tData = data[2] # [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]]
# [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]] if data[0] in self.channels:
if data[0] in self.channels: channelData = self.channels[data[0]]
channelData = self.channels[data[0]] tradeObj = _parse_trade(tData, channelData.get('symbol'))
tradeObj = _parse_trade(tData, channelData.get('symbol')) self._emit('new_trade', tradeObj)
self._emit('new_trade', tradeObj)
async def _trade_executed_handler(self, data):
async def _trade_executed_handler(self, data): tData = data[2]
tData = data[2] # [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]]
# [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]] if data[0] in self.channels:
if data[0] in self.channels: channelData = self.channels[data[0]]
channelData = self.channels[data[0]] tradeObj = _parse_trade(tData, channelData.get('symbol'))
tradeObj = _parse_trade(tData, channelData.get('symbol')) self._emit('new_trade', tradeObj)
self._emit('new_trade', tradeObj)
async def _wallet_update_handler(self, data):
async def _wallet_update_handler(self, data): # [0,"wu",["exchange","USD",89134.66933283,0]]
# [0,"wu",["exchange","USD",89134.66933283,0]] wu = data[2]
wu = data[2] self._emit('wallet_update', data)
self._emit('wallet_update', data) self.logger.info("Wallet update: {}({}) = {}".format(wu[1], wu[0], wu[2]))
self.logger.info("Wallet update: {}({}) = {}".format(wu[1], wu[0], wu[2]))
async def _heart_beat_handler(self, data):
async def _heart_beat_handler(self, data): self.logger.debug("Heartbeat - {}".format(self.host))
self.logger.debug("Heartbeat - {}".format(self.host))
async def _margin_info_update_handler(self, data):
async def _margin_info_update_handler(self, data): self._emit('margin_info_update', data)
self._emit('margin_info_update', data) self.logger.info("Margin info update: {}".format(data))
self.logger.info("Margin info update: {}".format(data))
async def _funding_info_update_handler(self, data):
async def _funding_info_update_handler(self, data): self._emit('funding_info_update', data)
self._emit('funding_info_update', data) self.logger.info("Funding info update: {}".format(data))
self.logger.info("Funding info update: {}".format(data))
async def _notification_handler(self, data):
async def _notification_handler(self, data): # [0, 'n', [1542289340429, 'on-req', None, None,
# [0, 'n', [1542289340429, 'on-req', None, None, # [1151350600, None, 1542289341196, 'tBTCUSD', None, None, 0.01, None, 'EXCHANGE MARKET',
# [1151350600, None, 1542289341196, 'tBTCUSD', None, None, 0.01, None, 'EXCHANGE MARKET', # None, None, None, None, None, None, None, 18970, None, 0, 0, None, None, None, 0, None,
# None, None, None, None, None, None, None, 18970, None, 0, 0, None, None, None, 0, None, # None, None, None, None, None, None, None], None, 'SUCCESS', 'Submitting exchange market buy order for 0.01 BTC.']]
# None, None, None, None, None, None, None], None, 'SUCCESS', 'Submitting exchange market buy order for 0.01 BTC.']] nInfo = data[2]
nInfo = data[2] self._emit('notification', nInfo)
self._emit('notification', nInfo) notificationType = nInfo[6]
notificationType = nInfo[6] notificationText = nInfo[7]
notificationText = nInfo[7] if notificationType == 'ERROR':
if notificationType == 'ERROR': self._emit('error', notificationText)
self._emit('error', notificationText) self.logger.error("Notification ERROR: {}".format(notificationText))
self.logger.error("Notification ERROR: {}".format(notificationText)) else:
else: self.logger.info("Notification SUCCESS: {}".format(notificationText))
self.logger.info("Notification SUCCESS: {}".format(notificationText))
async def _balance_update_handler(self, data):
async def _balance_update_handler(self, data): self.logger.info('Balance update: {}'.format(data[2]))
self.logger.info('Balance update: {}'.format(data[2])) self._emit('balance_update', data[2])
self._emit('balance_update', data[2])
async def _order_closed_handler(self, data):
async def _order_closed_handler(self, data): # order created and executed
# order created and executed # [0,"oc",[1151349678,null,1542203391995,"tBTCUSD",1542203389940,1542203389966,0,0.1,
# [0,"oc",[1151349678,null,1542203391995,"tBTCUSD",1542203389940,1542203389966,0,0.1, # "EXCHANGE MARKET",null,null,null,0,"EXECUTED @ 18922.0(0.03299997): was PARTIALLY FILLED
# "EXCHANGE MARKET",null,null,null,0,"EXECUTED @ 18922.0(0.03299997): was PARTIALLY FILLED # @ 18909.0(0.06700003)",null,null,18909,18913.2899961,0,0,null,null,null,0,0,null,null,null,
# @ 18909.0(0.06700003)",null,null,18909,18913.2899961,0,0,null,null,null,0,0,null,null,null, # "API>BFX",null,null,null]]
# "API>BFX",null,null,null]] tInfo = data[2]
tInfo = data[2] order = Order(tInfo)
order = Order(tInfo) trade = Trade(order)
trade = Trade(order) self.logger.info("Order closed: {} {}".format(order.symbol, order.status))
self.logger.info("Order closed: {} {}".format(order.symbol, order.status)) self._emit('order_closed', order, trade)
self._emit('order_closed', order, trade) if order.cId in self.pendingOrders:
if order.cId in self.pendingOrders: if self.pendingOrders[order.cId][0]:
if self.pendingOrders[order.cId][0]: await self.pendingOrders[order.cId][0](order, trade)
await self.pendingOrders[order.cId][0](order, trade) del self.pendingOrders[order.cId]
del self.pendingOrders[order.cId] self._emit('order_confirmed', order, trade)
self._emit('order_confirmed', order, trade)
async def _order_update_handler(self, data):
async def _order_update_handler(self, data): # order created but partially filled
# order created but partially filled # [0, 'ou', [1151351581, None, 1542629457873, 'tBTCUSD', 1542629458071,
# [0, 'ou', [1151351581, None, 1542629457873, 'tBTCUSD', 1542629458071, # 1542629458189, 0.01, 0.01, 'EXCHANGE LIMIT', None, None, None, 0, 'ACTIVE',
# 1542629458189, 0.01, 0.01, 'EXCHANGE LIMIT', None, None, None, 0, 'ACTIVE', # None, None, 100, 0, 0, 0, None, None, None, 0, 0, None, None, None, 'API>BFX',
# None, None, 100, 0, 0, 0, None, None, None, 0, 0, None, None, None, 'API>BFX', # None, None, None]]
# None, None, None]] tInfo = data[2]
tInfo = data[2] order = Order(tInfo)
order = Order(tInfo) trade = Trade(order)
trade = Trade(order) self.logger.info("Order update: {} {}".format(order, trade))
self.logger.info("Order update: {} {}".format(order, trade)) self._emit('order_update', order, trade)
self._emit('order_update', order, trade) if order.cId in self.pendingOrders:
if order.cId in self.pendingOrders: if self.pendingOrders[order.cId][0]:
if self.pendingOrders[order.cId][0]: await self.pendingOrders[order.cId][0](order, trade)
await self.pendingOrders[order.cId][0](order, trade) del self.pendingOrders[order.cId]
del self.pendingOrders[order.cId] self._emit('order_confirmed', order, trade)
self._emit('order_confirmed', order, trade)
async def _order_new_handler(self, data):
async def _order_new_handler(self, data): # order created but not executed / created but partially filled
# order created but not executed / created but partially filled # [0, 'on', [1151351563, None, 1542624024383, 'tBTCUSD', 1542624024596,
# [0, 'on', [1151351563, None, 1542624024383, 'tBTCUSD', 1542624024596, # 1542624024617, 0.01, 0.01, 'EXCHANGE LIMIT', None, None, None, 0, 'ACTIVE',
# 1542624024617, 0.01, 0.01, 'EXCHANGE LIMIT', None, None, None, 0, 'ACTIVE', # None, None, 100, 0, 0, 0, None, None, None, 0, 0, None, None, None, 'API>BFX',
# None, None, 100, 0, 0, 0, None, None, None, 0, 0, None, None, None, 'API>BFX', # None, None, None]]
# None, None, None]] tInfo = data[2]
tInfo = data[2] order = Order(tInfo)
order = Order(tInfo) trade = Trade(order)
trade = Trade(order) self.logger.info("Order new: {} {}".format(order, trade))
self.logger.info("Order new: {} {}".format(order, trade)) self._emit('order_new', order, trade)
self._emit('order_new', order, trade) if order.cId in self.pendingOrders:
if order.cId in self.pendingOrders: if self.pendingOrders[order.cId][0]:
if self.pendingOrders[order.cId][0]: await self.pendingOrders[order.cId][0](order, trade)
await self.pendingOrders[order.cId][0](order, trade) self._emit('order_confirmed', order, trade)
self._emit('order_confirmed', order, trade) del self.pendingOrders[order.cId]
del self.pendingOrders[order.cId]
async def _order_snapshot_handler(self, data):
async def _order_snapshot_handler(self, data): self._emit('order_snapshot', data)
self._emit('order_snapshot', data) self.logger.info("Position snapshot: {}".format(data))
self.logger.info("Position snapshot: {}".format(data))
async def _wallet_snapshot_handler(self, data):
async def _wallet_snapshot_handler(self, data): self._emit('wallet_snapshot', data[2])
self._emit('wallet_snapshot', data[2]) self.logger.info("Wallet snapshot: {}".format(data))
self.logger.info("Wallet snapshot: {}".format(data))
async def _position_snapshot_handler(self, data):
async def _position_snapshot_handler(self, data): self._emit('position_snapshot', data)
self._emit('position_snapshot', data) self.logger.info("Position snapshot: {}".format(data))
self.logger.info("Position snapshot: {}".format(data))
async def _funding_offer_snapshot_handler(self, data):
async def _funding_offer_snapshot_handler(self, data): self._emit('funding_offer_snapshot', data)
self._emit('funding_offer_snapshot', data) self.logger.info("Funding offer snapshot: {}".format(data))
self.logger.info("Funding offer snapshot: {}".format(data))
async def _funding_load_snapshot_handler(self, data):
async def _funding_load_snapshot_handler(self, data): self._emit('funding_loan_snapshot', data[2])
self._emit('funding_loan_snapshot', data[2]) self.logger.info("Funding loan snapshot: {}".format(data))
self.logger.info("Funding loan snapshot: {}".format(data))
async def _funding_credit_snapshot_handler(self, data):
async def _funding_credit_snapshot_handler(self, data): self._emit('funding_credit_snapshot', data[2])
self._emit('funding_credit_snapshot', data[2]) self.logger.info("Funding credit snapshot: {}".format(data))
self.logger.info("Funding credit snapshot: {}".format(data))
async def _trade_handler(self, data):
async def _trade_handler(self, data): channelData = self.channels[data[0]]
channelData = self.channels[data[0]] if type(data[1]) is list:
if type(data[1]) is list: data = data[1]
data = data[1] # Process the batch of seed trades on
# Process the batch of seed trades on # connection
# connection data.reverse()
data.reverse() for t in data:
for t in data: trade = {
trade = { 'mts': t[1],
'mts': t[1], 'price': t[2],
'price': t[2], 'amount': t[3],
'amount': t[3], 'symbol': channelData['symbol']
'symbol': channelData['symbol'] }
} self._emit('seed_trade', trade)
self._emit('seed_trade', trade) else:
else: tradeObj = _parse_trade_snapshot_item(data, channelData['symbol'])
tradeObj = _parse_trade_snapshot_item(data, channelData['symbol']) self._emit('new_trade', tradeObj)
self._emit('new_trade', tradeObj)
async def _candle_handler(self, data):
async def _candle_handler(self, data): chanId = data[0]
chanId = data[0] channelData = self.channels[chanId]
channelData = self.channels[chanId] if type(data[1][0]) is list:
if type(data[1][0]) is list: # Process the batch of seed candles on
# Process the batch of seed candles on # websocket subscription
# websocket subscription candlesSnapshot = data[1]
candlesSnapshot = data[1] candlesSnapshot.reverse()
candlesSnapshot.reverse() for c in candlesSnapshot:
for c in candlesSnapshot: candle = _parse_candle(c, channelData['symbol'], channelData['tf'])
candle = _parse_candle(c, channelData['symbol'], channelData['tf']) self._emit('seed_candle', candle)
self._emit('seed_candle', candle) else:
else: candle = _parse_candle(data[1], channelData['symbol'], channelData['tf'])
candle = _parse_candle(data[1], channelData['symbol'], channelData['tf']) self._emit('new_candle', candle)
self._emit('new_candle', candle)
async def on_message(self, message):
async def on_message(self, message): self.logger.debug(message)
self.logger.debug(message) msg = json.loads(message)
msg = json.loads(message) self._emit('all', msg)
self._emit('all', msg) if type(msg) is dict:
if type(msg) is dict: # System messages are received as json
# System messages are received as json await self._ws_system_handler(msg)
await self._ws_system_handler(msg) elif type(msg) is list:
elif type(msg) is list: # All data messages are received as a list
# All data messages are received as a list await self._ws_data_handler(msg)
await self._ws_data_handler(msg) else:
else: self.logger.warn('Unknown websocket response: {}'.format(msg))
self.logger.warn('Unknown websocket response: {}'.format(msg))
async def _ws_authenticate_socket(self):
async def _ws_authenticate_socket(self): nonce = int(round(time.time() * 1000000))
nonce = int(round(time.time() * 1000000)) authMsg = 'AUTH{}'.format(nonce)
authMsg = 'AUTH{}'.format(nonce) secret = self.API_SECRET.encode()
secret = self.API_SECRET.encode() sig = hmac.new(secret, authMsg.encode(), hashlib.sha384).hexdigest()
sig = hmac.new(secret, authMsg.encode(), hashlib.sha384).hexdigest() hmac.new(secret, self.API_SECRET.encode('utf'), hashlib.sha384).hexdigest()
hmac.new(secret, self.API_SECRET.encode('utf'), hashlib.sha384).hexdigest() jdata = {
jdata = { 'apiKey': self.API_KEY,
'apiKey': self.API_KEY, 'authSig': sig,
'authSig': sig, 'authNonce': nonce,
'authNonce': nonce, 'authPayload': authMsg,
'authPayload': authMsg, 'event': 'auth'
'event': 'auth' }
} await self.ws.send(json.dumps(jdata))
await self.ws.send(json.dumps(jdata))
async def on_open(self):
async def on_open(self): self.logger.info("Websocket opened.")
self.logger.info("Websocket opened.") self._emit('connected')
self._emit('connected') # Orders are simulated in backtest mode
# Orders are simulated in backtest mode if not self.API_KEY and self.API_SECRET:
if not self.backtest and self.API_KEY and self.API_SECRET: await self._ws_authenticate_socket()
await self._ws_authenticate_socket()
async def send_auth_command(self, channel_name, data):
async def send_auth_command(self, channel_name, data): payload = [0, channel_name, None, data]
payload = [0, channel_name, None, data] await self.ws.send(json.dumps(payload))
await self.ws.send(json.dumps(payload))
def subscribe(self, channel_name, symbol, timeframe=None, **kwargs):
def subscribe(self, channel_name, symbol, timeframe=None, **kwargs): q = {'event': 'subscribe', 'channel': channel_name, 'symbol': symbol}
q = {'event': 'subscribe', 'channel': channel_name, 'symbol': symbol} if timeframe:
if timeframe: q['key'] = 'trade:{}:{}'.format(timeframe, symbol)
q['key'] = 'trade:{}:{}'.format(timeframe, symbol) q.update(**kwargs)
q.update(**kwargs) self.logger.info("Subscribing to channel {}".format(channel_name))
self.logger.info("Subscribing to channel {}".format(channel_name)) # tmp = self.ws.send(json.dumps(q))
# tmp = self.ws.send(json.dumps(q)) asyncio.ensure_future(self.ws.send(json.dumps(q)))
asyncio.ensure_future(self.ws.send(json.dumps(q)))
async def submit_order(self, symbol, price, amount, market_type,
async def submit_order(self, symbol, price, amount, market_type, hidden=False, onComplete=None, onError=None, *args, **kwargs):
hidden=False, onComplete=None, onError=None, *args, **kwargs): order_id = int(round(time.time() * 1000))
order_id = int(round(time.time() * 1000)) # send order over websocket
# send order over websocket payload = {
payload = { "cid": order_id,
"cid": order_id, "type": str(market_type),
"type": str(market_type), "symbol": symbol,
"symbol": symbol, "amount": str(amount),
"amount": str(amount), "price": str(price)
"price": str(price) }
} self.pendingOrders[order_id] = (onComplete, onError)
self.pendingOrders[order_id] = (onComplete, onError) await self.send_auth_command('on', payload)
await self.send_auth_command('on', payload) self.logger.info("Order cid={} ({} {} @ {}) dispatched".format(
self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( order_id, symbol, amount, price))
order_id, symbol, amount, price))
async def update_order(self, orderId, price=None, amount=None, delta=None,
async def update_order(self, orderId, price=None, amount=None, delta=None, price_aux_limit=None, price_trailing=None, flags=None, time_in_force=None,
price_aux_limit=None, price_trailing=None, flags=None, time_in_force=None, onComplete=None, onError=None):
onComplete=None, onError=None): payload = { "id": orderId }
payload = { "id": orderId } if price is not None:
if price is not None: payload['price'] = str(price)
payload['price'] = str(price) if amount is not None:
if amount is not None: payload['amount'] = str(amount)
payload['amount'] = str(amount) if delta is not None:
if delta is not None: payload['delta'] = str(delta)
payload['delta'] = str(delta) if price_aux_limit is not None:
if price_aux_limit is not None: payload['price_aux_limit'] = str(price_aux_limit)
payload['price_aux_limit'] = str(price_aux_limit) if price_trailing is not None:
if price_trailing is not None: payload['price_trailing'] = str(price_trailing)
payload['price_trailing'] = str(price_trailing) if flags is not None:
if flags is not None: payload['flags'] = str(flags)
payload['flags'] = str(flags) if time_in_force is not None:
if time_in_force is not None: payload['time_in_force'] = str(time_in_force)
payload['time_in_force'] = str(time_in_force) self.pendingOrders[orderId] = (onComplete, onError)
self.pendingOrders[orderId] = (onComplete, onError) await self.send_auth_command('ou', payload)
await self.send_auth_command('ou', payload) self.logger.info("Update Order order_id={} dispatched".format(orderId))
self.logger.info("Update Order order_id={} dispatched".format(orderId))
async def cancel_order(self, orderId, onComplete=None, onError=None):
async def cancel_order(self, orderId, onComplete=None, onError=None): self.pendingOrders[orderId] = (onComplete, onError)
self.pendingOrders[orderId] = (onComplete, onError) await self.send_auth_command('oc', { 'id': orderId })
await self.send_auth_command('oc', { 'id': orderId }) self.logger.info("Order cancel order_id={} dispatched".format(orderId))
self.logger.info("Order cancel order_id={} dispatched".format(orderId))
async def cancel_order_multi(self, orderIds, onComplete=None, onError=None):
async def cancel_order_multi(self, orderIds, onComplete=None, onError=None): self.pendingOrders[orderIds[0]] = (onComplete, onError)
self.pendingOrders[orderIds[0]] = (onComplete, onError) await self.send_auth_command('oc', { 'id': orderIds })
await self.send_auth_command('oc', { 'id': orderIds }) self.logger.info("Order cancel order_ids={} dispatched".format(orderIds))
self.logger.info("Order cancel order_ids={} dispatched".format(orderIds))

View File

@@ -16,13 +16,8 @@ def is_json(myjson):
class GenericWebsocket(object): class GenericWebsocket(object):
def __init__(self, host, symbol='tBTCUSD', onCandleHook=None, onTradeHook=None, def __init__(self, host, logLevel='ERROR'):
logLevel='ERROR'):
self.symbol = symbol
self.host = host self.host = host
self.awaiting_request = False
self.onCandleHook = onCandleHook
self.onTradeHook = onTradeHook
self.logger = CustomLogger('HFWebSocket', logLevel=logLevel) self.logger = CustomLogger('HFWebSocket', logLevel=logLevel)
self.loop = asyncio.get_event_loop() self.loop = asyncio.get_event_loop()
self.events = EventEmitter(scheduler=asyncio.ensure_future, loop=self.loop) self.events = EventEmitter(scheduler=asyncio.ensure_future, loop=self.loop)