From 088198e4c43ed4fde538f541df79a6cf1a8ca039 Mon Sep 17 00:00:00 2001 From: Jacob Plaster Date: Thu, 22 Nov 2018 11:49:44 +0000 Subject: [PATCH] Adds checksum orderbook validation --- README.md | 2 + bfxapi/Client.py | 4 +- bfxapi/examples/subscribe_orderbook.py | 31 ++++++++++ bfxapi/models/OrderBook.py | 84 ++++++++++++++++++++++++++ bfxapi/models/__init__.py | 1 + bfxapi/rest/BfxRest.py | 3 +- bfxapi/websockets/BfxWebsocket.py | 79 ++++++++++++++++++++++-- 7 files changed, 195 insertions(+), 9 deletions(-) create mode 100644 bfxapi/examples/subscribe_orderbook.py create mode 100644 bfxapi/models/OrderBook.py diff --git a/README.md b/README.md index 8f96896..d4a48fd 100644 --- a/README.md +++ b/README.md @@ -36,6 +36,8 @@ The websocket exposes a collection of events that are triggered when certain dat - `new_candle` (array): a new candle has been produced - `margin_info_updates` (array): new margin information has been broadcasted - `funding_info_updates` (array): new funding information has been broadcasted +- `order_book_snapshot` (array): initial snapshot of the order book on connection +- `order_book_update` (array): a new order has been placed into the ordebrook For example. If you wanted to subscribe to all of the trades on the `tBTCUSD` market, then you can simply listen to the `new_trade` event. For Example: diff --git a/bfxapi/Client.py b/bfxapi/Client.py index 8524ab4..9253d0d 100644 --- a/bfxapi/Client.py +++ b/bfxapi/Client.py @@ -8,6 +8,6 @@ class Client: ws_host='wss://api.bitfinex.com/ws/2', loop=None, logLevel='INFO', *args, **kwargs): self.loop = loop or asyncio.get_event_loop() self.ws = BfxWebsocket(API_KEY=API_KEY, API_SECRET=API_SECRET, host=ws_host, - loop=self.loop, *args, **kwargs) + loop=self.loop, logLevel=logLevel, *args, **kwargs) self.rest = BfxRest(API_KEY=API_KEY, API_SECRET=API_SECRET, host=rest_host, - loop=self.loop, *args, **kwargs) + loop=self.loop, logLevel=logLevel, *args, **kwargs) diff --git a/bfxapi/examples/subscribe_orderbook.py b/bfxapi/examples/subscribe_orderbook.py new file mode 100644 index 0000000..d8e94d3 --- /dev/null +++ b/bfxapi/examples/subscribe_orderbook.py @@ -0,0 +1,31 @@ +import os +import sys +sys.path.append('../') + +from bfxapi import Client + +bfx = Client( + logLevel='INFO', + # Verifies that the local orderbook is up to date + # with the bitfinex servers + manageOrderBooks=True +) + +@bfx.ws.on('error') +def log_error(err): + print ("Error: {}".format(err)) + +@bfx.ws.on('order_book_update') +def log_update(data): + print ("Book update: {}".format(data)) + +@bfx.ws.on('order_book_snapshot') +def log_snapshot(data): + print ("Initial book: {}".format(data)) + +def start(): + bfx.ws.subscribe('book', 'tBTCUSD') + bfx.ws.subscribe('book', 'tETHUSD') + +bfx.ws.on('connected', start) +bfx.ws.run() diff --git a/bfxapi/models/OrderBook.py b/bfxapi/models/OrderBook.py new file mode 100644 index 0000000..93ca4d6 --- /dev/null +++ b/bfxapi/models/OrderBook.py @@ -0,0 +1,84 @@ +import zlib + +def preparePrice(price): + # convert to 4 significant figures + prepPrice = '{0:.4f}'.format(price) + # remove decimal place if zero float + return '{0:g}'.format(float(prepPrice)) + +class OrderBook: + + def __init__(self): + self.asks = [] + self.bids = [] + + def updateFromSnapshot(self, data): + # [[4642.3, 1, 4.192], [4641.5, 1, 1]] + for order in data: + if len(order) is 4: + if order[3] < 0: + self.bids += [order] + else: + self.asks += [order] + else: + if order[2] < 0: + self.asks += [order] + else: + self.bids += [order] + + def updateWith(self, order): + if len(order) is 4: + amount = order[3] + count = order[2] + side = self.bids if amount < 0 else self.asks + else: + amount = order[2] + side = self.asks if amount < 0 else self.bids + count = order[1] + price = order[0] + + # if first item in ordebook + if len(side) is 0: + side += [order] + return + + # match price level + for index, sOrder in enumerate(side): + sPrice = sOrder[0] + if sPrice == price: + if count is 0: + del side[index] + return + else: + # remove but add as new below + del side[index] + + # if ob is initialised w/o all price levels + if count is 0: + return + + # add to book and sort lowest to highest + side += [order] + side.sort(key=lambda x: x[0], reverse=not amount < 0) + return + + def checksum(self): + data = [] + # take set of top 25 bids/asks + for index in range(0, 25): + if index < len(self.bids): + bid = self.bids[index] + price = bid[0] + amount = bid[3] if len(bid) is 4 else bid[2] + data += [preparePrice(price)] + data += [str(amount)] + if index < len(self.asks): + ask = self.asks[index] + price = ask[0] + amount = ask[3] if len(ask) is 4 else ask[2] + data += [preparePrice(price)] + data += [str(amount)] + checksumStr = ':'.join(data) + # calculate checksum and force signed integer + checksum = zlib.crc32(checksumStr.encode('utf8')) & 0xffffffff + return checksum diff --git a/bfxapi/models/__init__.py b/bfxapi/models/__init__.py index c9662e0..2d2ba86 100644 --- a/bfxapi/models/__init__.py +++ b/bfxapi/models/__init__.py @@ -2,3 +2,4 @@ name = 'models' from .Order import * from .Trade import * +from .OrderBook import * diff --git a/bfxapi/rest/BfxRest.py b/bfxapi/rest/BfxRest.py index 4918407..aad0f97 100644 --- a/bfxapi/rest/BfxRest.py +++ b/bfxapi/rest/BfxRest.py @@ -7,7 +7,8 @@ from ..utils.CustomLogger import CustomLogger class BfxRest: - def __init__(self, API_KEY, API_SECRET, host='https://api.bitfinex.com/v2', loop=None, logLevel='INFO'): + def __init__(self, API_KEY, API_SECRET, host='https://api.bitfinex.com/v2', loop=None, + logLevel='INFO', *args, **kwargs): self.loop = loop or asyncio.get_event_loop() self.host = host self.logger = CustomLogger('BfxRest', logLevel=logLevel) diff --git a/bfxapi/websockets/BfxWebsocket.py b/bfxapi/websockets/BfxWebsocket.py index aae34ac..dca1fac 100644 --- a/bfxapi/websockets/BfxWebsocket.py +++ b/bfxapi/websockets/BfxWebsocket.py @@ -6,7 +6,22 @@ import hmac import random from .GenericWebsocket import GenericWebsocket, AuthError -from ..models import Order, Trade +from ..models import Order, Trade, OrderBook + +class Flags: + DEC_S = 9 + TIME_S = 32 + TIMESTAMP = 32768 + SEQ_ALL = 65536 + CHECKSUM = 131072 + + strings = { + 9: 'DEC_S', + 32: 'TIME_S', + 32768: 'TIMESTAMP', + 65536: 'SEQ_ALL', + 131072: 'CHECKSUM' + } def _parse_candle(cData, symbol, tf): return { @@ -117,11 +132,13 @@ class BfxWebsocket(GenericWebsocket): } def __init__(self, API_KEY=None, API_SECRET=None, host='wss://api.bitfinex.com/ws/2', - onSeedCandleHook=None, onSeedTradeHook=None, *args, **kwargs): + onSeedCandleHook=None, onSeedTradeHook=None, manageOrderBooks=False, *args, **kwargs): self.channels = {} self.API_KEY=API_KEY self.API_SECRET=API_SECRET + self.manageOrderBooks = manageOrderBooks self.pendingOrders = {} + self.orderBooks = {} super(BfxWebsocket, self).__init__(host, *args, **kwargs) @@ -149,7 +166,8 @@ class BfxWebsocket(GenericWebsocket): 'info': self._system_info_handler, 'subscribed': self._system_subscribed_handler, 'error': self._system_error_handler, - 'auth': self._system_auth_handler + 'auth': self._system_auth_handler, + 'conf': self._system_conf_handler } async def _ws_system_handler(self, msg): @@ -157,7 +175,7 @@ class BfxWebsocket(GenericWebsocket): if eType in self._WS_SYSTEM_HANDLERS: await self._WS_SYSTEM_HANDLERS[eType](msg) else: - self.logger.warn('Unknown websocket event: {}'.format(eType)) + self.logger.warn("Unknown websocket event: '{}' {}".format(eType, msg)) async def _ws_data_handler(self, data): dataEvent = data[1] @@ -169,6 +187,8 @@ class BfxWebsocket(GenericWebsocket): # candles do not have an event if self.channels[chanId].get('channel') == 'candles': await self._candle_handler(data) + if self.channels[chanId].get('channel') == 'book': + await self._order_book_handler(data) else: self.logger.warn("Unknow data event: '{}' {}".format(dataEvent, data)) @@ -177,6 +197,18 @@ class BfxWebsocket(GenericWebsocket): if data.get('serverId', None): ## connection has been established await self.on_open() + + async def _system_conf_handler(self, data): + flag = data.get('flags') + status = data.get('status') + if flag not in Flags.strings: + self.logger.warn("Unknown config value set {}".format(flag)) + return + flagString = Flags.strings[flag] + if status == "OK": + self.logger.info("Enabled config flag {}".format(flagString)) + else: + self.logger.error("Unable to enable config flag {}".format(flagString)) async def _system_subscribed_handler(self, data): chanEvent = data.get('channel') @@ -210,7 +242,6 @@ class BfxWebsocket(GenericWebsocket): tradeObj = _parse_trade(tData, channelData.get('symbol')) self._emit('new_trade', tradeObj) - async def _trade_executed_handler(self, data): tData = data[2] # [209, 'te', [312372989, 1542303108930, 0.35, 5688.61834032]] @@ -363,6 +394,32 @@ class BfxWebsocket(GenericWebsocket): else: candle = _parse_candle(data[1], channelData['symbol'], channelData['tf']) self._emit('new_candle', candle) + + async def _order_book_handler(self, data): + obInfo = data[1] + channelData = self.channels[data[0]] + symbol = channelData.get('symbol') + if data[1] == "cs": + dChecksum = data[2] & 0xffffffff # force to signed int + checksum = self.orderBooks[symbol].checksum() + # force checksums to signed integers + isValid = (dChecksum) == (checksum) + if isValid: + self.logger.debug("Checksum orderbook validation for '{}' successful." + .format(symbol)) + else: + # TODO: resync with snapshot + self.logger.warn("Checksum orderbook invalid for '{}'. Orderbook out of syc." + .format(symbol)) + return + isSnapshot = type(obInfo[0]) is list + if isSnapshot: + self.orderBooks[symbol] = OrderBook() + self.orderBooks[symbol].updateFromSnapshot(obInfo) + self._emit('order_book_snapshot', { 'symbol': symbol, 'data': obInfo }) + else: + self.orderBooks[symbol].updateWith(obInfo) + self._emit('order_book_update', { 'symbol': symbol, 'data': obInfo }) async def on_message(self, message): self.logger.debug(message) @@ -396,13 +453,23 @@ class BfxWebsocket(GenericWebsocket): self.logger.info("Websocket opened.") self._emit('connected') # Orders are simulated in backtest mode - if not self.API_KEY and self.API_SECRET: + if self.API_KEY and self.API_SECRET: await self._ws_authenticate_socket() + # enable order book checksums + if self.manageOrderBooks: + await self.enable_flag(Flags.CHECKSUM) async def send_auth_command(self, channel_name, data): payload = [0, channel_name, None, data] await self.ws.send(json.dumps(payload)) + async def enable_flag(self, flag): + payload = { + "event": 'conf', + "flags": flag + } + await self.ws.send(json.dumps(payload)) + def subscribe(self, channel_name, symbol, timeframe=None, **kwargs): q = {'event': 'subscribe', 'channel': channel_name, 'symbol': symbol} if timeframe: