diff --git a/bfxapi/websockets/DataServerWebsocket.py b/bfxapi/websockets/DataServerWebsocket.py index 464180a..8d46a0a 100644 --- a/bfxapi/websockets/DataServerWebsocket.py +++ b/bfxapi/websockets/DataServerWebsocket.py @@ -7,6 +7,10 @@ class DataServerWebsocket(GenericWebsocket): Basic websocket client that simply reads data from the DataServer. This instance of the websocket should only ever be used in backtest mode since it isnt capable of handling orders. + + Events: + - connected: called when a connection is made + - done: fires when the backtest has finished running ''' WS_END = 'bt.end' WS_CANDLE = 'bt.candle' @@ -46,9 +50,9 @@ class DataServerWebsocket(GenericWebsocket): self.logger.info("Backtest data stream complete.") await self.on_close() elif eType == self.WS_CANDLE: - self._onCandle(msg) + await self._onCandle(msg) elif eType == self.WS_TRADE: - self._onTrade(msg) + await self._onTrade(msg) elif eType == self.WS_CONNECT: await self.on_open() else: @@ -61,13 +65,14 @@ class DataServerWebsocket(GenericWebsocket): return data async def on_open(self): + self._emit('connected') data = self._exec_bt_string() await self.ws.send(data) async def _onCandle(self, data): candle = data[3] - await self.onCandleHook(candle) + self._emit('new_canlde', candle) async def _onTrade(self, data): trade = data[2] - await self.onTradeHook(trade) + self._emit('new_trade', trade) diff --git a/bfxapi/websockets/GenericWebsocket.py b/bfxapi/websockets/GenericWebsocket.py index 851a15b..80b3e35 100644 --- a/bfxapi/websockets/GenericWebsocket.py +++ b/bfxapi/websockets/GenericWebsocket.py @@ -2,6 +2,7 @@ import asyncio import websockets import json +from pyee import EventEmitter from ..utils.CustomLogger import CustomLogger class AuthError(Exception): pass @@ -14,23 +15,17 @@ def is_json(myjson): return True class GenericWebsocket(object): - def __init__(self, host, symbol='tBTCUSD', onCandleHook=None, onTradeHook=None, onCompleteHook=None): - if not onCandleHook: - raise KeyError("Expected `onCandleHook` in parameters.") - if not onTradeHook: - raise KeyError("Expected `onTradeHook` in parameters.") - if not onCompleteHook: - raise KeyError("Expected `onCompleteHook` in parameters.") - self.onCandleHook = onCandleHook - self.onTradeHook = onTradeHook - self.onCompleteHook = onCompleteHook + + def __init__(self, host, symbol='tBTCUSD', onCandleHook=None, onTradeHook=None, + logLevel='ERROR'): self.symbol = symbol self.host = host self.awaiting_request = False - - self.logger = CustomLogger('HFWebSocket', logLevel='INFO') + self.onCandleHook = onCandleHook + self.onTradeHook = onTradeHook + self.logger = CustomLogger('HFWebSocket', logLevel=logLevel) self.loop = asyncio.get_event_loop() - # self.events = EventEmitter() + self.events = EventEmitter(scheduler=asyncio.ensure_future, loop=self.loop) def run(self): self.loop.run_until_complete(self._main(self.host)) @@ -43,13 +38,25 @@ class GenericWebsocket(object): message = await websocket.recv() await self.on_message(message) + def on(self, event, func=None): + if not func: + return self.events.on(event) + self.events.on(event, func) + + def _emit(self, event, *args, **kwargs): + self.events.emit(event, *args, **kwargs) + + def once(self, event, func): + self.events.once(event, func) + async def on_error(self, error): self.logger.error(error) + self.events.emit('error', error) async def on_close(self): self.logger.info("Websocket closed.") await self.ws.close() - self.onCompleteHook() + self._emit('done') async def on_open(self): pass diff --git a/bfxapi/websockets/LiveWebsocket.py b/bfxapi/websockets/LiveWebsocket.py index 2b23a0d..9f76825 100644 --- a/bfxapi/websockets/LiveWebsocket.py +++ b/bfxapi/websockets/LiveWebsocket.py @@ -6,6 +6,7 @@ import hmac import random from .GenericWebsocket import GenericWebsocket, AuthError +from ..models import Order, Trade def _parse_candle(cData, symbol, tf): return { @@ -19,8 +20,7 @@ def _parse_candle(cData, symbol, tf): 'tf': tf } -def _parse_trade(tData, symbol): - print (tData) +def _parse_trade_snapshot_item(tData, symbol): return { 'mts': tData[3], 'price': tData[4], @@ -28,6 +28,15 @@ def _parse_trade(tData, symbol): 'symbol': symbol } + +def _parse_trade(tData, symbol): + return { + 'mts': tData[1], + 'price': tData[3], + 'amount': tData[2], + 'symbol': symbol + } + class LiveBfxWebsocket(GenericWebsocket): ''' More complex websocket that heavily relies on the btfxwss module. This websocket requires @@ -42,8 +51,8 @@ class LiveBfxWebsocket(GenericWebsocket): hos - Historical Orders ps - Positions hts - Trades (snapshot) - te - Trade Event - tu - Trade Update + te - Trade Executed + tu - Trade Execution update ws - Wallets bu - Balance Info miu - Margin Info @@ -56,6 +65,27 @@ class LiveBfxWebsocket(GenericWebsocket): hfls - Historical Loans htfs - Funding Trades n - Notifications (WIP) + + Events: + - all: listen for all messages coming through + - connected: called when a connection is made + - authenticated: called when the websocket passes authentication + - message (string): new incoming message from the websocket + - notification (array): incoming account notification + - error (string): error from the websocket + - order_closed (string): when an order confirmation is recieved + - wallet_snapshot (string): Initial wallet balances (Fired once) + - order_snapshot (string): Initial open orders (Fired once) + - positions_snapshot (string): Initial open positions (Fired once) + - wallet_update (string): changes to the balance of wallets + - seed_candle (candleArray): initial past candle to prime strategy + - seed_trade (tradeArray): initial past trade to prime strategy + - funding_offer_snapshot: + - funding_loan_snapshot: + - funding_credit_snapshot: + - balance_update when the state of a balance is changed + - new_trade: a new trade on the market has been executed + - new_candle: a new candle has been produced ''' ERRORS = { @@ -82,17 +112,9 @@ class LiveBfxWebsocket(GenericWebsocket): 20061: 'Websocket server resync complete' } - def __init__(self, API_KEY=None, API_SECRET=None, backtest=False, host='wss://test.bitfinex.com/ws', + def __init__(self, API_KEY=None, API_SECRET=None, backtest=False, host='wss://api.bitfinex.com/ws/2', onSeedCandleHook=None, onSeedTradeHook=None, *args, **kwargs): self.channels = {} - self.tf = '1m' - if not onSeedCandleHook: - raise KeyError("Expected `onSeedCandleHook` in parameters.") - if not onSeedTradeHook: - raise KeyError("Expected `onSeedTradeHook` in parameters.") - - self.onSeedCandleHook = onSeedCandleHook - self.onSeedTradeHook = onSeedTradeHook self.API_KEY=API_KEY self.API_SECRET=API_SECRET self.backtest=backtest @@ -105,10 +127,15 @@ class LiveBfxWebsocket(GenericWebsocket): 'wu': self._wallet_update_handler, 'hb': self._heart_beat_handler, 'te': self._trade_event_handler, - 'oc': self._order_confirmed_handler, + 'oc': self._order_closed_handler, 'os': self._order_snapshot_handler, 'ws': self._wallet_snapshot_handler, - 'ps': self._position_snapshot_handler + 'ps': self._position_snapshot_handler, + 'fos': self._funding_offer_snapshot_handler, + 'fcs': self._funding_credit_snapshot_handler, + 'fls': self._funding_load_snapshot_handler, + 'bu': self._balance_update_handler, + 'n': self._notification_handler } self._WS_SYSTEM_HANDLERS = { @@ -132,13 +159,11 @@ class LiveBfxWebsocket(GenericWebsocket): if type(dataEvent) is str and dataEvent in self._WS_DATA_HANDLERS: return await self._WS_DATA_HANDLERS[dataEvent](data) elif chanId in self.channels: - if self.channels[chanId] == 'trades': - await self._trade_handler(data) - elif self.channels[chanId] == 'candles': - candle = data[1] - await self._candle_handler(candle) + # candles do not have an event + if self.channels[chanId].get('channel') == 'candles': + await self._candle_handler(data) else: - self.logger.warn("Unknow data event: {}".format(dataEvent)) + self.logger.warn("Unknow data event: '{}' {}".format(dataEvent, data)) async def _system_info_handler(self, data): self.logger.info(data) @@ -151,52 +176,109 @@ class LiveBfxWebsocket(GenericWebsocket): self.logger.info("Subscribed to channel '{}'".format(chanEvent)) ## add channel to known list chanId = data.get('chanId') - self.channels[chanId] = chanEvent + ## if is a candles subscribption, then get the symbol + ## from the key + if data.get('key'): + kd = data.get('key').split(':') + data['tf'] = kd[1] + data['symbol'] = kd[2] + + self.channels[chanId] = data async def _system_error_handler(self, data): - code = data.get('code') - if code in self.ERRORS: - raise Exception(self.ERRORS[code]) - else: - raise Exception(data) + self._emit('error', data) async def _system_auth_handler(self, data): if data.get('status') == 'FAILED': raise AuthError(self.ERRORS[data.get('code')]) else: + self._emit('authenticated', data) self.logger.info("Authentication successful.") async def _trade_update_handler(self, data): - pass + tData = data[2] + # [209, 'tu', [312372989, 1542303108930, 0.35, 5688.61834032]] + channelData = self.channels[data[0]] + tradeObj = _parse_trade(tData, channelData.get('symbol')) + self._emit('new_trade', tradeObj) + + + async def _trade_event_handler(self, data): + tData = data[2] + # [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]] + channelData = self.channels[data[0]] + tradeObj = _parse_trade(tData, channelData.get('symbol')) + self._emit('new_trade', tradeObj) async def _wallet_update_handler(self, data): # [0,"wu",["exchange","USD",89134.66933283,0]] wu = data[2] + self._emit('wallet_update', data) self.logger.info("Wallet update: {}({}) = {}".format(wu[1], wu[0], wu[2])) async def _heart_beat_handler(self, data): self.logger.debug("Heartbeat - {}".format(self.host)) - async def _trade_event_handler(self, data): - pass + async def _notification_handler(self, data): + # [0, 'n', [1542289340429, 'on-req', None, None, + # [1151350600, None, 1542289341196, 'tBTCUSD', None, None, 0.01, None, 'EXCHANGE MARKET', + # None, None, None, None, None, None, None, 18970, None, 0, 0, None, None, None, 0, None, + # None, None, None, None, None, None, None], None, 'SUCCESS', 'Submitting exchange market buy order for 0.01 BTC.']] + nInfo = data[2] + self._emit('notification', nInfo) + notificationType = nInfo[6] + notificationText = nInfo[7] + if notificationType == 'ERROR': + self._emit('error', notificationText) + self.logger.error("Notification ERROR: {}".format(notificationText)) + else: + self.logger.info("Notification SUCCESS: {}".format(notificationText)) - async def _order_confirmed_handler(self, data): - # [0,"oc",[1151345759,"BTCUSD",0,-0.1,"EXCHANGE MARKET", - # "EXECUTED @ 16956.0(-0.05): was PARTIALLY FILLED @ 17051.0(-0.05)" - # ,17051,17003.5,"2018-11-13T14:54:29Z",0,0,0]] + async def _balance_update_handler(self, data): + self.logger.info('Balance update: {}'.format(data[2])) + self._emit('balance_update', data[2]) + + async def _order_closed_handler(self, data): + # [0,"oc",[1151349678,null,1542203391995,"tBTCUSD",1542203389940,1542203389966,0,0.1, + # "EXCHANGE MARKET",null,null,null,0,"EXECUTED @ 18922.0(0.03299997): was PARTIALLY FILLED + # @ 18909.0(0.06700003)",null,null,18909,18913.2899961,0,0,null,null,null,0,0,null,null,null, + # "API>BFX",null,null,null]] tInfo = data[2] - self.logger.info("Order status: {} {}".format(tInfo[1], tInfo[5])) + order = Order(tInfo) + trade = Trade(order) + self.logger.info("Order closed: {} {}".format(order.symbol, order.status)) + self._emit('order_closed', order, trade) + if order.cId in self.pendingOrders: + if self.pendingOrders[order.cId][0]: + await self.pendingOrders[order.cId][0](order, trade) + del self.pendingOrders[order.cId] async def _order_snapshot_handler(self, data): - self.logger.info("Position snapshot update: {}".format(data)) + self._emit('order_snapshot', data) + self.logger.info("Position snapshot: {}".format(data)) async def _wallet_snapshot_handler(self, data): - self.logger.info("Wallet snapshot update: {}".format(data)) + self._emit('wallet_snapshot', data[2]) + self.logger.info("Wallet snapshot: {}".format(data)) async def _position_snapshot_handler(self, data): - self.logger.info("Position snapshot update: {}".format(data)) + self._emit('position_snapshot', data) + self.logger.info("Position snapshot: {}".format(data)) + + async def _funding_offer_snapshot_handler(self, data): + self._emit('funding_offer_snapshot', data) + self.logger.info("Funding offer snapshot: {}".format(data)) + + async def _funding_load_snapshot_handler(self, data): + self._emit('funding_loan_snapshot', data[2]) + self.logger.info("Funding loan snapshot: {}".format(data)) + + async def _funding_credit_snapshot_handler(self, data): + self._emit('funding_credit_snapshot', data[2]) + self.logger.info("Funding credit snapshot: {}".format(data)) async def _trade_handler(self, data): + channelData = self.channels[data[0]] if type(data[1]) is list: data = data[1] # Process the batch of seed trades on @@ -207,28 +289,32 @@ class LiveBfxWebsocket(GenericWebsocket): 'mts': t[1], 'price': t[2], 'amount': t[3], - 'symbol': self.symbol + 'symbol': channelData['symbol'] } - self.onSeedTradeHook(trade) + self._emit('seed_trade', trade) else: - tradeObj = _parse_trade(data, self.symbol) - await self.onTradeHook(tradeObj) + tradeObj = _parse_trade_snapshot_item(data, channelData['symbol']) + self._emit('new_trade', tradeObj) async def _candle_handler(self, data): - if type(data[0]) is list: + chanId = data[0] + channelData = self.channels[chanId] + if type(data[1][0]) is list: # Process the batch of seed candles on # websocket subscription - data.reverse() - for c in data: - candle = _parse_candle(c, self.symbol, self.tf) - self.onSeedCandleHook(candle) + candlesSnapshot = data[1] + candlesSnapshot.reverse() + for c in candlesSnapshot: + candle = _parse_candle(c, channelData['symbol'], channelData['tf']) + self._emit('seed_candle', candle) else: - candle = _parse_candle(data, self.symbol, self.tf) - await self.onCandleHook(candle) + candle = _parse_candle(data[1], channelData['symbol'], channelData['tf']) + self._emit('new_candle', candle) async def on_message(self, message): - self.logger.info(message) + self.logger.debug(message) msg = json.loads(message) + self._emit('all', msg) if type(msg) is dict: # System messages are received as json await self._ws_system_handler(msg) @@ -255,41 +341,42 @@ class LiveBfxWebsocket(GenericWebsocket): async def on_open(self): self.logger.info("Websocket opened.") + self._emit('connected') # Orders are simulated in backtest mode - if not self.backtest: + if not self.backtest and self.API_KEY and self.API_SECRET: await self._ws_authenticate_socket() # subscribe to data feed # TODO: allow for multiple subscriptions - await self._subscribe('trades', symbol=self.symbol) - key = 'trade:1m:{}'.format(self.symbol) - await self._subscribe('candles', key=key, symbol=self.symbol) + # await self._subscribe('trades', symbol=self.symbol) + # key = 'trade:1m:{}'.format(self.symbol) + # await self._subscribe('candles', key=key, symbol=self.symbol) async def send_auth_command(self, channel_name, data): payload = [0, channel_name, None, data] - await self.ws.send(json.dumps(payload)) - print ("Order sent") - print (json.dumps(payload)) + await self.ws.send(json.dumps(payload)) - async def _subscribe(self, channel_name, **kwargs): - q = {'event': 'subscribe', 'channel': channel_name} + def subscribe(self, channel_name, symbol, timeframe=None, **kwargs): + q = {'event': 'subscribe', 'channel': channel_name, 'symbol': symbol} + if timeframe: + q['key'] = 'trade:{}:{}'.format(timeframe, symbol) q.update(**kwargs) self.logger.info("Subscribing to channel {}".format(channel_name)) - await self.ws.send(json.dumps(q)) + # tmp = self.ws.send(json.dumps(q)) + asyncio.ensure_future(self.ws.send(json.dumps(q))) - async def submit_order(self, symbol, price, amount, mtsCreate, market_type, - hidden=False, *args, **kwargs): - order_id = random.randint(1,999999999) + async def submit_order(self, symbol, price, amount, market_type, + hidden=False, onComplete=None, onError=None, *args, **kwargs): + order_id = int(round(time.time() * 1000)) # send order over websocket payload = { "cid": order_id, - "type": market_type, + "type": str(market_type), "symbol": symbol, "amount": str(amount), - "price": str(price), - "hidden": 1 if hidden else 0 + "price": str(price) } self.pendingOrders[order_id] = payload await self.send_auth_command('on', payload) - # wait for order confirmation - # while True: - # message = await websocket.recv() + self.logger.info("Order cid={} ({} {} @ {}) dispatched".format( + order_id, symbol, amount, price)) + self.pendingOrders[order_id] = (onComplete, onError)