websockets: adds event driven

This commit is contained in:
Jacob Plaster
2018-11-16 14:38:41 +00:00
parent f48ba44c79
commit fa7af23e36
3 changed files with 186 additions and 87 deletions

View File

@@ -7,6 +7,10 @@ class DataServerWebsocket(GenericWebsocket):
Basic websocket client that simply reads data from the DataServer. This instance
of the websocket should only ever be used in backtest mode since it isnt capable
of handling orders.
Events:
- connected: called when a connection is made
- done: fires when the backtest has finished running
'''
WS_END = 'bt.end'
WS_CANDLE = 'bt.candle'
@@ -46,9 +50,9 @@ class DataServerWebsocket(GenericWebsocket):
self.logger.info("Backtest data stream complete.")
await self.on_close()
elif eType == self.WS_CANDLE:
self._onCandle(msg)
await self._onCandle(msg)
elif eType == self.WS_TRADE:
self._onTrade(msg)
await self._onTrade(msg)
elif eType == self.WS_CONNECT:
await self.on_open()
else:
@@ -61,13 +65,14 @@ class DataServerWebsocket(GenericWebsocket):
return data
async def on_open(self):
self._emit('connected')
data = self._exec_bt_string()
await self.ws.send(data)
async def _onCandle(self, data):
candle = data[3]
await self.onCandleHook(candle)
self._emit('new_canlde', candle)
async def _onTrade(self, data):
trade = data[2]
await self.onTradeHook(trade)
self._emit('new_trade', trade)

View File

@@ -2,6 +2,7 @@ import asyncio
import websockets
import json
from pyee import EventEmitter
from ..utils.CustomLogger import CustomLogger
class AuthError(Exception): pass
@@ -14,23 +15,17 @@ def is_json(myjson):
return True
class GenericWebsocket(object):
def __init__(self, host, symbol='tBTCUSD', onCandleHook=None, onTradeHook=None, onCompleteHook=None):
if not onCandleHook:
raise KeyError("Expected `onCandleHook` in parameters.")
if not onTradeHook:
raise KeyError("Expected `onTradeHook` in parameters.")
if not onCompleteHook:
raise KeyError("Expected `onCompleteHook` in parameters.")
self.onCandleHook = onCandleHook
self.onTradeHook = onTradeHook
self.onCompleteHook = onCompleteHook
def __init__(self, host, symbol='tBTCUSD', onCandleHook=None, onTradeHook=None,
logLevel='ERROR'):
self.symbol = symbol
self.host = host
self.awaiting_request = False
self.logger = CustomLogger('HFWebSocket', logLevel='INFO')
self.onCandleHook = onCandleHook
self.onTradeHook = onTradeHook
self.logger = CustomLogger('HFWebSocket', logLevel=logLevel)
self.loop = asyncio.get_event_loop()
# self.events = EventEmitter()
self.events = EventEmitter(scheduler=asyncio.ensure_future, loop=self.loop)
def run(self):
self.loop.run_until_complete(self._main(self.host))
@@ -43,13 +38,25 @@ class GenericWebsocket(object):
message = await websocket.recv()
await self.on_message(message)
def on(self, event, func=None):
if not func:
return self.events.on(event)
self.events.on(event, func)
def _emit(self, event, *args, **kwargs):
self.events.emit(event, *args, **kwargs)
def once(self, event, func):
self.events.once(event, func)
async def on_error(self, error):
self.logger.error(error)
self.events.emit('error', error)
async def on_close(self):
self.logger.info("Websocket closed.")
await self.ws.close()
self.onCompleteHook()
self._emit('done')
async def on_open(self):
pass

View File

@@ -6,6 +6,7 @@ import hmac
import random
from .GenericWebsocket import GenericWebsocket, AuthError
from ..models import Order, Trade
def _parse_candle(cData, symbol, tf):
return {
@@ -19,8 +20,7 @@ def _parse_candle(cData, symbol, tf):
'tf': tf
}
def _parse_trade(tData, symbol):
print (tData)
def _parse_trade_snapshot_item(tData, symbol):
return {
'mts': tData[3],
'price': tData[4],
@@ -28,6 +28,15 @@ def _parse_trade(tData, symbol):
'symbol': symbol
}
def _parse_trade(tData, symbol):
return {
'mts': tData[1],
'price': tData[3],
'amount': tData[2],
'symbol': symbol
}
class LiveBfxWebsocket(GenericWebsocket):
'''
More complex websocket that heavily relies on the btfxwss module. This websocket requires
@@ -42,8 +51,8 @@ class LiveBfxWebsocket(GenericWebsocket):
hos - Historical Orders
ps - Positions
hts - Trades (snapshot)
te - Trade Event
tu - Trade Update
te - Trade Executed
tu - Trade Execution update
ws - Wallets
bu - Balance Info
miu - Margin Info
@@ -56,6 +65,27 @@ class LiveBfxWebsocket(GenericWebsocket):
hfls - Historical Loans
htfs - Funding Trades
n - Notifications (WIP)
Events:
- all: listen for all messages coming through
- connected: called when a connection is made
- authenticated: called when the websocket passes authentication
- message (string): new incoming message from the websocket
- notification (array): incoming account notification
- error (string): error from the websocket
- order_closed (string): when an order confirmation is recieved
- wallet_snapshot (string): Initial wallet balances (Fired once)
- order_snapshot (string): Initial open orders (Fired once)
- positions_snapshot (string): Initial open positions (Fired once)
- wallet_update (string): changes to the balance of wallets
- seed_candle (candleArray): initial past candle to prime strategy
- seed_trade (tradeArray): initial past trade to prime strategy
- funding_offer_snapshot:
- funding_loan_snapshot:
- funding_credit_snapshot:
- balance_update when the state of a balance is changed
- new_trade: a new trade on the market has been executed
- new_candle: a new candle has been produced
'''
ERRORS = {
@@ -82,17 +112,9 @@ class LiveBfxWebsocket(GenericWebsocket):
20061: 'Websocket server resync complete'
}
def __init__(self, API_KEY=None, API_SECRET=None, backtest=False, host='wss://test.bitfinex.com/ws',
def __init__(self, API_KEY=None, API_SECRET=None, backtest=False, host='wss://api.bitfinex.com/ws/2',
onSeedCandleHook=None, onSeedTradeHook=None, *args, **kwargs):
self.channels = {}
self.tf = '1m'
if not onSeedCandleHook:
raise KeyError("Expected `onSeedCandleHook` in parameters.")
if not onSeedTradeHook:
raise KeyError("Expected `onSeedTradeHook` in parameters.")
self.onSeedCandleHook = onSeedCandleHook
self.onSeedTradeHook = onSeedTradeHook
self.API_KEY=API_KEY
self.API_SECRET=API_SECRET
self.backtest=backtest
@@ -105,10 +127,15 @@ class LiveBfxWebsocket(GenericWebsocket):
'wu': self._wallet_update_handler,
'hb': self._heart_beat_handler,
'te': self._trade_event_handler,
'oc': self._order_confirmed_handler,
'oc': self._order_closed_handler,
'os': self._order_snapshot_handler,
'ws': self._wallet_snapshot_handler,
'ps': self._position_snapshot_handler
'ps': self._position_snapshot_handler,
'fos': self._funding_offer_snapshot_handler,
'fcs': self._funding_credit_snapshot_handler,
'fls': self._funding_load_snapshot_handler,
'bu': self._balance_update_handler,
'n': self._notification_handler
}
self._WS_SYSTEM_HANDLERS = {
@@ -132,13 +159,11 @@ class LiveBfxWebsocket(GenericWebsocket):
if type(dataEvent) is str and dataEvent in self._WS_DATA_HANDLERS:
return await self._WS_DATA_HANDLERS[dataEvent](data)
elif chanId in self.channels:
if self.channels[chanId] == 'trades':
await self._trade_handler(data)
elif self.channels[chanId] == 'candles':
candle = data[1]
await self._candle_handler(candle)
# candles do not have an event
if self.channels[chanId].get('channel') == 'candles':
await self._candle_handler(data)
else:
self.logger.warn("Unknow data event: {}".format(dataEvent))
self.logger.warn("Unknow data event: '{}' {}".format(dataEvent, data))
async def _system_info_handler(self, data):
self.logger.info(data)
@@ -151,52 +176,109 @@ class LiveBfxWebsocket(GenericWebsocket):
self.logger.info("Subscribed to channel '{}'".format(chanEvent))
## add channel to known list
chanId = data.get('chanId')
self.channels[chanId] = chanEvent
## if is a candles subscribption, then get the symbol
## from the key
if data.get('key'):
kd = data.get('key').split(':')
data['tf'] = kd[1]
data['symbol'] = kd[2]
self.channels[chanId] = data
async def _system_error_handler(self, data):
code = data.get('code')
if code in self.ERRORS:
raise Exception(self.ERRORS[code])
else:
raise Exception(data)
self._emit('error', data)
async def _system_auth_handler(self, data):
if data.get('status') == 'FAILED':
raise AuthError(self.ERRORS[data.get('code')])
else:
self._emit('authenticated', data)
self.logger.info("Authentication successful.")
async def _trade_update_handler(self, data):
pass
tData = data[2]
# [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]]
channelData = self.channels[data[0]]
tradeObj = _parse_trade(tData, channelData.get('symbol'))
self._emit('new_trade', tradeObj)
async def _trade_event_handler(self, data):
tData = data[2]
# [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]]
channelData = self.channels[data[0]]
tradeObj = _parse_trade(tData, channelData.get('symbol'))
self._emit('new_trade', tradeObj)
async def _wallet_update_handler(self, data):
# [0,"wu",["exchange","USD",89134.66933283,0]]
wu = data[2]
self._emit('wallet_update', data)
self.logger.info("Wallet update: {}({}) = {}".format(wu[1], wu[0], wu[2]))
async def _heart_beat_handler(self, data):
self.logger.debug("Heartbeat - {}".format(self.host))
async def _trade_event_handler(self, data):
pass
async def _notification_handler(self, data):
# [0, 'n', [1542289340429, 'on-req', None, None,
# [1151350600, None, 1542289341196, 'tBTCUSD', None, None, 0.01, None, 'EXCHANGE MARKET',
# None, None, None, None, None, None, None, 18970, None, 0, 0, None, None, None, 0, None,
# None, None, None, None, None, None, None], None, 'SUCCESS', 'Submitting exchange market buy order for 0.01 BTC.']]
nInfo = data[2]
self._emit('notification', nInfo)
notificationType = nInfo[6]
notificationText = nInfo[7]
if notificationType == 'ERROR':
self._emit('error', notificationText)
self.logger.error("Notification ERROR: {}".format(notificationText))
else:
self.logger.info("Notification SUCCESS: {}".format(notificationText))
async def _order_confirmed_handler(self, data):
# [0,"oc",[1151345759,"BTCUSD",0,-0.1,"EXCHANGE MARKET",
# "EXECUTED @ 16956.0(-0.05): was PARTIALLY FILLED @ 17051.0(-0.05)"
# ,17051,17003.5,"2018-11-13T14:54:29Z",0,0,0]]
async def _balance_update_handler(self, data):
self.logger.info('Balance update: {}'.format(data[2]))
self._emit('balance_update', data[2])
async def _order_closed_handler(self, data):
# [0,"oc",[1151349678,null,1542203391995,"tBTCUSD",1542203389940,1542203389966,0,0.1,
# "EXCHANGE MARKET",null,null,null,0,"EXECUTED @ 18922.0(0.03299997): was PARTIALLY FILLED
# @ 18909.0(0.06700003)",null,null,18909,18913.2899961,0,0,null,null,null,0,0,null,null,null,
# "API>BFX",null,null,null]]
tInfo = data[2]
self.logger.info("Order status: {} {}".format(tInfo[1], tInfo[5]))
order = Order(tInfo)
trade = Trade(order)
self.logger.info("Order closed: {} {}".format(order.symbol, order.status))
self._emit('order_closed', order, trade)
if order.cId in self.pendingOrders:
if self.pendingOrders[order.cId][0]:
await self.pendingOrders[order.cId][0](order, trade)
del self.pendingOrders[order.cId]
async def _order_snapshot_handler(self, data):
self.logger.info("Position snapshot update: {}".format(data))
self._emit('order_snapshot', data)
self.logger.info("Position snapshot: {}".format(data))
async def _wallet_snapshot_handler(self, data):
self.logger.info("Wallet snapshot update: {}".format(data))
self._emit('wallet_snapshot', data[2])
self.logger.info("Wallet snapshot: {}".format(data))
async def _position_snapshot_handler(self, data):
self.logger.info("Position snapshot update: {}".format(data))
self._emit('position_snapshot', data)
self.logger.info("Position snapshot: {}".format(data))
async def _funding_offer_snapshot_handler(self, data):
self._emit('funding_offer_snapshot', data)
self.logger.info("Funding offer snapshot: {}".format(data))
async def _funding_load_snapshot_handler(self, data):
self._emit('funding_loan_snapshot', data[2])
self.logger.info("Funding loan snapshot: {}".format(data))
async def _funding_credit_snapshot_handler(self, data):
self._emit('funding_credit_snapshot', data[2])
self.logger.info("Funding credit snapshot: {}".format(data))
async def _trade_handler(self, data):
channelData = self.channels[data[0]]
if type(data[1]) is list:
data = data[1]
# Process the batch of seed trades on
@@ -207,28 +289,32 @@ class LiveBfxWebsocket(GenericWebsocket):
'mts': t[1],
'price': t[2],
'amount': t[3],
'symbol': self.symbol
'symbol': channelData['symbol']
}
self.onSeedTradeHook(trade)
self._emit('seed_trade', trade)
else:
tradeObj = _parse_trade(data, self.symbol)
await self.onTradeHook(tradeObj)
tradeObj = _parse_trade_snapshot_item(data, channelData['symbol'])
self._emit('new_trade', tradeObj)
async def _candle_handler(self, data):
if type(data[0]) is list:
chanId = data[0]
channelData = self.channels[chanId]
if type(data[1][0]) is list:
# Process the batch of seed candles on
# websocket subscription
data.reverse()
for c in data:
candle = _parse_candle(c, self.symbol, self.tf)
self.onSeedCandleHook(candle)
candlesSnapshot = data[1]
candlesSnapshot.reverse()
for c in candlesSnapshot:
candle = _parse_candle(c, channelData['symbol'], channelData['tf'])
self._emit('seed_candle', candle)
else:
candle = _parse_candle(data, self.symbol, self.tf)
await self.onCandleHook(candle)
candle = _parse_candle(data[1], channelData['symbol'], channelData['tf'])
self._emit('new_candle', candle)
async def on_message(self, message):
self.logger.info(message)
self.logger.debug(message)
msg = json.loads(message)
self._emit('all', msg)
if type(msg) is dict:
# System messages are received as json
await self._ws_system_handler(msg)
@@ -255,41 +341,42 @@ class LiveBfxWebsocket(GenericWebsocket):
async def on_open(self):
self.logger.info("Websocket opened.")
self._emit('connected')
# Orders are simulated in backtest mode
if not self.backtest:
if not self.backtest and self.API_KEY and self.API_SECRET:
await self._ws_authenticate_socket()
# subscribe to data feed
# TODO: allow for multiple subscriptions
await self._subscribe('trades', symbol=self.symbol)
key = 'trade:1m:{}'.format(self.symbol)
await self._subscribe('candles', key=key, symbol=self.symbol)
# await self._subscribe('trades', symbol=self.symbol)
# key = 'trade:1m:{}'.format(self.symbol)
# await self._subscribe('candles', key=key, symbol=self.symbol)
async def send_auth_command(self, channel_name, data):
payload = [0, channel_name, None, data]
await self.ws.send(json.dumps(payload))
print ("Order sent")
print (json.dumps(payload))
await self.ws.send(json.dumps(payload))
async def _subscribe(self, channel_name, **kwargs):
q = {'event': 'subscribe', 'channel': channel_name}
def subscribe(self, channel_name, symbol, timeframe=None, **kwargs):
q = {'event': 'subscribe', 'channel': channel_name, 'symbol': symbol}
if timeframe:
q['key'] = 'trade:{}:{}'.format(timeframe, symbol)
q.update(**kwargs)
self.logger.info("Subscribing to channel {}".format(channel_name))
await self.ws.send(json.dumps(q))
# tmp = self.ws.send(json.dumps(q))
asyncio.ensure_future(self.ws.send(json.dumps(q)))
async def submit_order(self, symbol, price, amount, mtsCreate, market_type,
hidden=False, *args, **kwargs):
order_id = random.randint(1,999999999)
async def submit_order(self, symbol, price, amount, market_type,
hidden=False, onComplete=None, onError=None, *args, **kwargs):
order_id = int(round(time.time() * 1000))
# send order over websocket
payload = {
"cid": order_id,
"type": market_type,
"type": str(market_type),
"symbol": symbol,
"amount": str(amount),
"price": str(price),
"hidden": 1 if hidden else 0
"price": str(price)
}
self.pendingOrders[order_id] = payload
await self.send_auth_command('on', payload)
# wait for order confirmation
# while True:
# message = await websocket.recv()
self.logger.info("Order cid={} ({} {} @ {}) dispatched".format(
order_id, symbol, amount, price))
self.pendingOrders[order_id] = (onComplete, onError)