Merge pull request #9 from JacobPlaster/checksum-fix

Convert prices/amounts to Decimals rather than floats
This commit is contained in:
Paolo Ardoino
2019-01-25 15:16:29 +00:00
committed by GitHub
19 changed files with 602 additions and 61 deletions

View File

@@ -6,5 +6,7 @@ from .client import Client
from .models import (Order, Trade, OrderBook, Subscription, Wallet, from .models import (Order, Trade, OrderBook, Subscription, Wallet,
Position, FundingLoan, FundingOffer, FundingCredit) Position, FundingLoan, FundingOffer, FundingCredit)
from .websockets.GenericWebsocket import GenericWebsocket from .websockets.GenericWebsocket import GenericWebsocket
from .websockets.BfxWebsocket import BfxWebsocket
from .utils.Decimal import Decimal
NAME = 'bfxapi' NAME = 'bfxapi'

View File

@@ -5,7 +5,6 @@ Module used to describe all of the different data types
import time import time
import datetime import datetime
class OrderType: class OrderType:
""" """
Enum used to describe all of the different order types available for use Enum used to describe all of the different order types available for use

View File

@@ -4,7 +4,15 @@ Module used to describe all of the different data types
import time import time
import json import json
from random import randint
def generate_sub_id():
"""
Generates a unique id in the form of 12345566-12334556
"""
prefix = str(int(round(time.time() * 1000)))
suffix = str(randint(0, 9999999))
return "{}-{}".format(prefix, suffix)
class Subscription: class Subscription:
""" """
@@ -13,8 +21,8 @@ class Subscription:
such as unsibscribe and subscribe. such as unsibscribe and subscribe.
""" """
def __init__(self, ws, channel_name, symbol, timeframe=None, **kwargs): def __init__(self, bfxapi, channel_name, symbol, timeframe=None, **kwargs):
self._ws = ws self.bfxapi = bfxapi
self.channel_name = channel_name self.channel_name = channel_name
self.symbol = symbol self.symbol = symbol
self.timeframe = timeframe self.timeframe = timeframe
@@ -23,7 +31,7 @@ class Subscription:
self.chan_id = None self.chan_id = None
if timeframe: if timeframe:
self.key = 'trade:{}:{}'.format(self.timeframe, self.symbol) self.key = 'trade:{}:{}'.format(self.timeframe, self.symbol)
self.sub_id = int(round(time.time() * 1000)) self.sub_id = generate_sub_id()
self.send_payload = self._generate_payload(**kwargs) self.send_payload = self._generate_payload(**kwargs)
def confirm_subscription(self, chan_id): def confirm_subscription(self, chan_id):
@@ -40,13 +48,13 @@ class Subscription:
if not self.is_subscribed(): if not self.is_subscribed():
raise Exception("Subscription is not subscribed to websocket") raise Exception("Subscription is not subscribed to websocket")
payload = {'event': 'unsubscribe', 'chanId': self.chan_id} payload = {'event': 'unsubscribe', 'chanId': self.chan_id}
await self._ws.send(json.dumps(payload)) await self.bfxapi.get_ws().send(json.dumps(payload))
async def subscribe(self): async def subscribe(self):
""" """
Send a subscription request to the bitfinex socket Send a subscription request to the bitfinex socket
""" """
await self._ws.send(json.dumps(self._get_send_payload())) await self.bfxapi.get_ws().send(json.dumps(self._get_send_payload()))
def confirm_unsubscribe(self): def confirm_unsubscribe(self):
""" """

View File

@@ -5,7 +5,6 @@ Module used to describe all of the different data types
import time import time
import datetime import datetime
class OrderType: class OrderType:
""" """
Enum used to describe all of the different order types available for use Enum used to describe all of the different order types available for use

View File

@@ -3,17 +3,7 @@ Module used to describe all of the different data types
""" """
import zlib import zlib
import json
def prepare_price(price):
"""
Convert the price to an acceptable format
"""
# convert to 4 significant figures
prep_price = '{0:.4f}'.format(price)
# remove decimal place if zero float
return '{0:g}'.format(float(prep_price))
class OrderBook: class OrderBook:
""" """
@@ -42,26 +32,37 @@ class OrderBook:
""" """
return self.asks return self.asks
def update_from_snapshot(self, data): def update_from_snapshot(self, data, orig_raw_msg):
""" """
Update the orderbook with a raw orderbook snapshot Update the orderbook with a raw orderbook snapshot
""" """
for order in data: # we need to keep the original string values that are sent to use
if len(order) == 4: # this avoids any problems with floats
if order[3] < 0: orig_raw = json.loads(orig_raw_msg, parse_float=str, parse_int=str)[1]
zip_data = []
# zip both the float values and string values together
for index, order in enumerate(data):
zip_data += [(order, orig_raw[index])]
## build our bids and asks
for order in zip_data:
if len(order[0]) == 4:
if order[0][3] < 0:
self.bids += [order] self.bids += [order]
else: else:
self.asks += [order] self.asks += [order]
else: else:
if order[2] < 0: if order[0][2] < 0:
self.asks += [order] self.asks += [order]
else: else:
self.bids += [order] self.bids += [order]
def update_with(self, order): def update_with(self, order, orig_raw_msg):
""" """
Update the orderbook with a single update Update the orderbook with a single update
""" """
# keep orginal string vlues to avoid checksum float errors
orig_raw = json.loads(orig_raw_msg, parse_float=str, parse_int=str)[1]
zip_order = (order, orig_raw)
if len(order) == 4: if len(order) == 4:
amount = order[3] amount = order[3]
count = order[2] count = order[2]
@@ -74,12 +75,12 @@ class OrderBook:
# if first item in ordebook # if first item in ordebook
if len(side) == 0: if len(side) == 0:
side += [order] side += [zip_order]
return return
# match price level # match price level but use the float parsed object
for index, s_order in enumerate(side): for index, s_order in enumerate(side):
s_price = s_order[0] s_price = s_order[0][0]
if s_price == price: if s_price == price:
if count == 0: if count == 0:
del side[index] del side[index]
@@ -92,8 +93,8 @@ class OrderBook:
return return
# add to book and sort lowest to highest # add to book and sort lowest to highest
side += [order] side += [zip_order]
side.sort(key=lambda x: x[0], reverse=not amount < 0) side.sort(key=lambda x: x[0][0], reverse=not amount < 0)
return return
def checksum(self): def checksum(self):
@@ -104,17 +105,19 @@ class OrderBook:
# take set of top 25 bids/asks # take set of top 25 bids/asks
for index in range(0, 25): for index in range(0, 25):
if index < len(self.bids): if index < len(self.bids):
bid = self.bids[index] # use the string parsed array
bid = self.bids[index][1]
price = bid[0] price = bid[0]
amount = bid[3] if len(bid) == 4 else bid[2] amount = bid[3] if len(bid) == 4 else bid[2]
data += [prepare_price(price)] data += [price]
data += [str(amount)] data += [amount]
if index < len(self.asks): if index < len(self.asks):
ask = self.asks[index] # use the string parsed array
ask = self.asks[index][1]
price = ask[0] price = ask[0]
amount = ask[3] if len(ask) == 4 else ask[2] amount = ask[3] if len(ask) == 4 else ask[2]
data += [prepare_price(price)] data += [price]
data += [str(amount)] data += [amount]
checksum_str = ':'.join(data) checksum_str = ':'.join(data)
# calculate checksum and force signed integer # calculate checksum and force signed integer
checksum = zlib.crc32(checksum_str.encode('utf8')) & 0xffffffff checksum = zlib.crc32(checksum_str.encode('utf8')) & 0xffffffff

View File

@@ -4,7 +4,15 @@ Module used to describe all of the different data types
import time import time
import json import json
from random import randint
def generate_sub_id():
"""
Generates a unique id in the form of 12345566-12334556
"""
prefix = str(int(round(time.time() * 1000)))
suffix = str(randint(0, 9999999))
return "{}-{}".format(prefix, suffix)
class Subscription: class Subscription:
""" """
@@ -13,8 +21,8 @@ class Subscription:
such as unsibscribe and subscribe. such as unsibscribe and subscribe.
""" """
def __init__(self, ws, channel_name, symbol, timeframe=None, **kwargs): def __init__(self, bfxapi, channel_name, symbol, timeframe=None, **kwargs):
self._ws = ws self.bfxapi = bfxapi
self.channel_name = channel_name self.channel_name = channel_name
self.symbol = symbol self.symbol = symbol
self.timeframe = timeframe self.timeframe = timeframe
@@ -23,7 +31,7 @@ class Subscription:
self.chan_id = None self.chan_id = None
if timeframe: if timeframe:
self.key = 'trade:{}:{}'.format(self.timeframe, self.symbol) self.key = 'trade:{}:{}'.format(self.timeframe, self.symbol)
self.sub_id = int(round(time.time() * 1000)) self.sub_id = generate_sub_id()
self.send_payload = self._generate_payload(**kwargs) self.send_payload = self._generate_payload(**kwargs)
def confirm_subscription(self, chan_id): def confirm_subscription(self, chan_id):
@@ -40,13 +48,13 @@ class Subscription:
if not self.is_subscribed(): if not self.is_subscribed():
raise Exception("Subscription is not subscribed to websocket") raise Exception("Subscription is not subscribed to websocket")
payload = {'event': 'unsubscribe', 'chanId': self.chan_id} payload = {'event': 'unsubscribe', 'chanId': self.chan_id}
await self._ws.send(json.dumps(payload)) await self.bfxapi.get_ws().send(json.dumps(payload))
async def subscribe(self): async def subscribe(self):
""" """
Send a subscription request to the bitfinex socket Send a subscription request to the bitfinex socket
""" """
await self._ws.send(json.dumps(self._get_send_payload())) await self.bfxapi.get_ws().send(json.dumps(self._get_send_payload()))
def confirm_unsubscribe(self): def confirm_unsubscribe(self):
""" """

View File

@@ -22,11 +22,13 @@ class BfxRest:
""" """
def __init__(self, API_KEY, API_SECRET, host='https://api.bitfinex.com/v2', loop=None, def __init__(self, API_KEY, API_SECRET, host='https://api.bitfinex.com/v2', loop=None,
logLevel='INFO', *args, **kwargs): logLevel='INFO', parse_float=float, *args, **kwargs):
self.loop = loop or asyncio.get_event_loop() self.loop = loop or asyncio.get_event_loop()
self.API_KEY = API_KEY self.API_KEY = API_KEY
self.API_SECRET = API_SECRET self.API_SECRET = API_SECRET
self.host = host self.host = host
# this value can also be set to bfxapi.Decimal for much higher precision
self.parse_float = parse_float
self.logger = CustomLogger('BfxRest', logLevel=logLevel) self.logger = CustomLogger('BfxRest', logLevel=logLevel)
async def fetch(self, endpoint, params=""): async def fetch(self, endpoint, params=""):
@@ -42,7 +44,8 @@ class BfxRest:
if resp.status is not 200: if resp.status is not 200:
raise Exception('GET {} failed with status {} - {}' raise Exception('GET {} failed with status {} - {}'
.format(url, resp.status, text)) .format(url, resp.status, text))
return await resp.json() parsed = json.loads(text, parse_float=self.parse_float)
return parsed
async def post(self, endpoint, data={}, params=""): async def post(self, endpoint, data={}, params=""):
""" """
@@ -61,7 +64,8 @@ class BfxRest:
if resp.status is not 200: if resp.status is not 200:
raise Exception('POST {} failed with status {} - {}' raise Exception('POST {} failed with status {} - {}'
.format(url, resp.status, text)) .format(url, resp.status, text))
return await resp.json() parsed = json.loads(text, parse_float=self.parse_float)
return parsed
################################################## ##################################################
# Public Data # # Public Data #

0
bfxapi/tests/__init__.py Normal file
View File

90
bfxapi/tests/helpers.py Normal file
View File

@@ -0,0 +1,90 @@
import time
import json
import asyncio
from .. import Client, BfxWebsocket
def get_now():
return int(round(time.time() * 1000))
class StubbedWebsocket(BfxWebsocket):
def __new__(cls, *args, **kwargs):
instance = super(StubbedWebsocket, cls).__new__(cls, *args, **kwargs)
instance.sent_items = []
instance.published_items = []
return instance
async def _main(self, host):
print ("Faking wesocket connection to {}".format(host))
def get_ws(self):
return self
async def publish(self, data, is_json=True):
self.published_items += [{
'time': get_now(),
'data': data
}]
# convert to string and push through the websocket
data = json.dumps(data) if is_json else data
return await self.on_message(data)
async def publish_auth_confirmation(self):
return self.publish({"event":"auth","status":"OK","chanId":0,"userId":269499,"auth_id":"58aa0472-b1a9-4690-8ab8-300d68e66aaf","caps":{"orders":{"read":1,"write":1},"account":{"read":1,"write":0},"funding":{"read":1,"write":1},"history":{"read":1,"write":0},"wallets":{"read":1,"write":1},"withdraw":{"read":0,"write":1},"positions":{"read":1,"write":1}}})
async def send(self, data_string):
self.sent_items += [{
'time': get_now(),
'data': data_string
}]
def get_published_items(self):
return self.published_items
def get_sent_items(self):
return self.sent_items
def get_last_sent_item(self):
return self.sent_items[-1:][0]
def get_sent_items_count(self):
return len(self.sent_items)
class EventWatcher():
def __init__(self, ws, event):
self.value = None
self.event = event
ws.once(event, self._finish)
def _finish(self, value):
self.value = value or {}
@classmethod
def watch(cls, ws, event):
return EventWatcher(ws, event)
def wait_until_complete(self, max_wait_time=5):
counter = 0
while self.value == None:
if counter > 5:
raise Exception('Wait time limit exceeded for event {}'.format(self.event))
time.sleep(1)
counter += 1
return self.value
def create_stubbed_client(*args, **kwargs):
client = Client(*args, **kwargs)
# no support for rest stubbing yet
client.rest = None
client.ws = StubbedWebsocket(*args, **kwargs)
return client
async def ws_publish_auth_accepted(ws):
return await ws.publish({"event":"auth","status":"OK","chanId":0,"userId":269499,"auth_id":"58aa0472-b1a9-4690-8ab8-300d68e66aaf","caps":{"orders":{"read":1,"write":1},"account":{"read":1,"write":0},"funding":{"read":1,"write":1},"history":{"read":1,"write":0},"wallets":{"read":1,"write":1},"withdraw":{"read":0,"write":1},"positions":{"read":1,"write":1}}})
async def ws_publish_connection_init(ws):
return await ws.publish({"event":"info","version":2,"serverId":"748c00f2-250b-46bb-8519-ce1d7d68e4f0","platform":{"status":1}})
async def ws_publish_conf_accepted(ws, flags_code):
return await ws.publish({"event":"conf","status":"OK","flags":flags_code})

View File

@@ -0,0 +1,24 @@
import sys
sys.path.append('../components')
from bfxapi import Decimal
def test_precision():
assert str(Decimal(0.00000123456789)) == "0.00000123456789"
assert str(Decimal("0.00000123456789")) == "0.00000123456789"
def test_float_operations():
assert str(Decimal(0.0002) * 0.02) == "0.000004"
assert str(0.02 * Decimal(0.0002)) == "0.000004"
assert str(Decimal(0.0002) / 0.02) == "0.01"
assert str(0.02 / Decimal(0.0002)) == "0.01"
assert str(0.02 + Decimal(0.0002)) == "0.0202"
assert str(Decimal(0.0002) + 0.02) == "0.0202"
assert str(0.02 - Decimal(0.0002)) == "-0.0198"
assert str(Decimal(0.0002) - 0.02) == "-0.0198"
assert str(0.01 // Decimal(0.0004)) == "0"
assert str(Decimal(0.0004) // 0.01) == "0"

View File

@@ -0,0 +1,65 @@
import pytest
from .helpers import create_stubbed_client, ws_publish_connection_init, ws_publish_conf_accepted
@pytest.mark.asyncio
async def test_checksum_generation():
client = create_stubbed_client()
symbol = "tXRPBTC"
# publsh connection created message
await ws_publish_connection_init(client.ws)
# publish checksum flag accepted
await ws_publish_conf_accepted(client.ws, 131072)
# subscribe to order book
await client.ws.subscribe('book', symbol)
## send subscription accepted
chanId = 123
await client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol})
## send orderbook snapshot
await client.ws.publish("""[123, [[0.0000886,1,1060.55466114],[0.00008859,1,1000],[0.00008858,1,2713.47159343],[0.00008857,1,4276.92870916],[0.00008856,2,6764.75562319],
[0.00008854,1,5641.48532401],[0.00008853,1,2255.92632223],[0.0000885,1,2256.69584601],[0.00008848,2,3630.3],[0.00008845,1,28195.70625766],
[0.00008844,1,15571.7],[0.00008843,1,2500],[0.00008841,1,64196.16117814],[0.00008838,1,7500],[0.00008837,2,2764.12999012],[0.00008834,2,10886.476298],
[0.00008831,1,20000],[0.0000883,1,1000],[0.00008829,2,2517.22175358],[0.00008828,1,450.45],[0.00008827,1,13000],[0.00008824,1,1500],[0.0000882,1,300],
[0.00008817,1,3000],[0.00008816,1,100],[0.00008864,1,-481.8549041],[0.0000887,2,-2141.77009092],[0.00008871,1,-2256.45433182],[0.00008872,1,-2707.58122743],
[0.00008874,1,-5640.31794092],[0.00008876,1,-29004.93294912],[0.00008878,1,-2500],[0.0000888,1,-20000],[0.00008881,2,-2880.15595827],[0.00008882,1,-27705.42933984],
[0.00008883,1,-4509.83708214],[0.00008884,1,-1500],[0.00008885,1,-2500],[0.00008888,1,-902.91405442],[0.00008889,1,-900],[0.00008891,1,-7500],
[0.00008894,1,-775.08564697],[0.00008896,1,-150],[0.00008899,3,-11628.02590049],[0.000089,2,-1299.7],[0.00008902,2,-4841.8],[0.00008904,3,-25320.46250083],
[0.00008909,1,-14000],[0.00008913,1,-123947.999],[0.00008915,2,-28019.6]]]""", is_json=False)
## send some more price updates
await client.ws.publish("[{},[0.00008915,0,-1]]".format(chanId), is_json=False)
await client.ws.publish("[{},[0.00008837,1,56.54876269]]".format(chanId), is_json=False)
await client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False)
## check checksum is the same as expected
expected_checksum = 30026640
actual_checksum = client.ws.orderBooks[symbol].checksum()
assert expected_checksum == actual_checksum
@pytest.mark.asyncio
async def test_checksum_really_samll_numbers_generation():
client = create_stubbed_client()
symbol = "tVETBTC"
# publsh connection created message
await ws_publish_connection_init(client.ws)
# publish checksum flag accepted
await ws_publish_conf_accepted(client.ws, 131072)
# subscribe to order book
await client.ws.subscribe('book', symbol)
## send subscription accepted
chanId = 123
await client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol})
## send orderbook snapshot
await client.ws.publish("""[123, [[0.00000121,5,249013.0209708],[0.0000012,6,518315.33310128],[0.00000119,4,566200.89],[0.00000118,2,260000],[0.00000117,1,100000],
[0.00000116,2,160000],[0.00000114,1,60000],[0.00000113,2,198500],[0.00000112,1,60000],[0.0000011,1,60000],[0.00000106,2,113868.87735849],[0.00000105,2,105000],
[0.00000103,1,3000],[0.00000102,2,105000],[0.00000101,2,202970],[0.000001,2,21000],[7e-7,1,10000],[6.6e-7,1,10000],[6e-7,1,100000],[4.9e-7,1,10000],[2.5e-7,1,2000],
[6e-8,1,100000],[5e-8,1,200000],[1e-8,4,640000],[0.00000122,7,-312043.19],[0.00000123,6,-415094.8939744],[0.00000124,5,-348181.23],[0.00000125,1,-12000],
[0.00000126,2,-143872.31],[0.00000127,1,-5000],[0.0000013,1,-5000],[0.00000134,1,-8249.18938656],[0.00000135,2,-230043.1337899],[0.00000136,1,-13161.25184766],
[0.00000145,1,-2914],[0.0000015,3,-54448.5],[0.00000152,2,-5538.54849594],[0.00000153,1,-62691.75475079],[0.00000159,1,-2914],[0.0000016,1,-52631.10296831],
[0.00000164,1,-4000],[0.00000166,1,-3831.46784605],[0.00000171,1,-14575.17730379],[0.00000174,1,-3124.81815395],[0.0000018,1,-18000],[0.00000182,1,-16000],
[0.00000186,1,-4000],[0.00000189,1,-10000.686624],[0.00000191,1,-14500]]]""", is_json=False)
## send some more price updates
await client.ws.publish("[{},[0.00000121,4,228442.6609708]]".format(chanId), is_json=False)
await client.ws.publish("[{},[0.00000121,6,304023.8109708]]".format(chanId), is_json=False)
# await client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False)
## check checksum is the same as expected
expected_checksum = 1770440002
actual_checksum = client.ws.orderBooks[symbol].checksum()
assert expected_checksum == actual_checksum

View File

@@ -0,0 +1,112 @@
import pytest
import json
from .helpers import (create_stubbed_client, ws_publish_auth_accepted, ws_publish_connection_init,
EventWatcher)
@pytest.mark.asyncio
async def test_submit_order():
client = create_stubbed_client()
# publsh connection created message
await ws_publish_connection_init(client.ws)
## send auth accepted
await ws_publish_auth_accepted(client.ws)
## send new order
await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET')
last_sent = client.ws.get_last_sent_item()
sent_order_array = json.loads(last_sent['data'])
assert sent_order_array[1] == "on"
sent_order_json = sent_order_array[3]
assert sent_order_json['type'] == "EXCHANGE MARKET"
assert sent_order_json['symbol'] == "tBTCUSD"
assert sent_order_json['amount'] == "0.01"
assert sent_order_json['price'] == "19000"
@pytest.mark.asyncio
async def test_submit_update_order():
client = create_stubbed_client()
# publsh connection created message
await ws_publish_connection_init(client.ws)
## send auth accepted
await ws_publish_auth_accepted(client.ws)
## send new order
await client.ws.update_order(123, price=100, amount=0.01, hidden=True)
last_sent = client.ws.get_last_sent_item()
sent_order_array = json.loads(last_sent['data'])
assert sent_order_array[1] == "ou"
sent_order_json = sent_order_array[3]
# {"id": 123, "price": "100", "amount": "0.01", "flags": 64}
assert sent_order_json['id'] == 123
assert sent_order_json['price'] == "100"
assert sent_order_json['amount'] == "0.01"
assert sent_order_json['flags'] == 64
@pytest.mark.asyncio
async def test_submit_cancel_order():
client = create_stubbed_client()
# publsh connection created message
await ws_publish_connection_init(client.ws)
## send auth accepted
await ws_publish_auth_accepted(client.ws)
## send new order
await client.ws.cancel_order(123)
last_sent = client.ws.get_last_sent_item()
sent_order_array = json.loads(last_sent['data'])
assert sent_order_array[1] == "oc"
sent_order_json = sent_order_array[3]
assert sent_order_json['id'] == 123
@pytest.mark.asyncio
async def test_events_on_new_order():
client = create_stubbed_client()
# publsh connection created message
await ws_publish_connection_init(client.ws)
## send auth accepted
await ws_publish_auth_accepted(client.ws)
## look for new order confirmation
o_new = EventWatcher.watch(client.ws, 'order_new')
await client.ws.publish([0,"on",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]])
new_res = o_new.wait_until_complete()
assert new_res.amount_orig == -1
assert new_res.amount_filled == 0
assert new_res.price == 15980
assert new_res.type == 'EXCHANGE LIMIT'
## look for order update confirmation
o_update = EventWatcher.watch(client.ws, 'order_update')
await client.ws.publish([0,"ou",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]])
update_res = o_update.wait_until_complete()
assert update_res.amount_orig == -1
assert float(update_res.amount_filled) == -0.5
assert update_res.price == 15980
assert update_res.type == 'EXCHANGE LIMIT'
## look for closed notification
o_closed = EventWatcher.watch(client.ws, 'order_closed')
await client.ws.publish([0,"oc",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]])
closed_res = o_closed.wait_until_complete()
assert new_res.amount_orig == -1
assert new_res.amount_filled == 0
assert new_res.price == 15980
assert new_res.type == 'EXCHANGE LIMIT'
@pytest.mark.asyncio
async def test_events_on_cancel_order():
client = create_stubbed_client()
# publsh connection created message
await ws_publish_connection_init(client.ws)
## send auth accepted
await ws_publish_auth_accepted(client.ws)
## Create new order
await client.ws.publish([0,"on",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123460,1,1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]])
## look for order closed confirmation
o_close = EventWatcher.watch(client.ws, 'order_closed')
await client.ws.publish([0,"oc",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123548,1,1,"EXCHANGE LIMIT",None,None,None,0,"CANCELED",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]])
close_res = o_close.wait_until_complete()
assert close_res.amount_orig == 1
assert float(close_res.amount_filled) == 0
assert close_res.price == 10
assert close_res.type == 'EXCHANGE LIMIT'

View File

@@ -0,0 +1,141 @@
import pytest
import json
import asyncio
from .helpers import (create_stubbed_client, ws_publish_connection_init, EventWatcher)
@pytest.mark.asyncio
async def test_submit_subscribe():
client = create_stubbed_client()
symb = 'tXRPBTC'
# publsh connection created message
await ws_publish_connection_init(client.ws)
# Create new subscription to orderbook
await client.ws.subscribe('book', symb)
last_sent = client.ws.get_last_sent_item()
sent_sub = json.loads(last_sent['data'])
# {'time': 1548327054030, 'data': '{"event": "subscribe", "channel": "book", "symbol": "tXRPBTC"}'}
assert sent_sub['event'] == "subscribe"
assert sent_sub['channel'] == "book"
assert sent_sub['symbol'] == symb
# create new subscription to trades
await client.ws.subscribe('trades', symb)
last_sent = client.ws.get_last_sent_item()
sent_sub = json.loads(last_sent['data'])
# {'event': 'subscribe', 'channel': 'trades', 'symbol': 'tBTCUSD'}
assert sent_sub['event'] == 'subscribe'
assert sent_sub['channel'] == 'trades'
assert sent_sub['symbol'] == symb
# create new subscription to candles
await client.ws.subscribe('candles', symb, timeframe='1m')
last_sent = client.ws.get_last_sent_item()
sent_sub = json.loads(last_sent['data'])
#{'event': 'subscribe', 'channel': 'candles', 'symbol': 'tBTCUSD', 'key': 'trade:1m:tBTCUSD'}
assert sent_sub['event'] == 'subscribe'
assert sent_sub['channel'] == 'candles'
assert sent_sub['key'] == 'trade:1m:{}'.format(symb)
@pytest.mark.asyncio
async def test_event_subscribe():
client = create_stubbed_client()
symb = 'tXRPBTC'
pair = 'XRPBTC'
# publsh connection created message
await ws_publish_connection_init(client.ws)
# create a new subscription
await client.ws.subscribe('trades', symb)
# announce subscription was successful
sub_watch = EventWatcher.watch(client.ws, 'subscribed')
await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair})
s_res = sub_watch.wait_until_complete()
assert s_res.channel_name == 'trades'
assert s_res.symbol == symb
assert s_res.is_subscribed_bool == True
assert s_res.chan_id == 2
@pytest.mark.asyncio
async def test_submit_unsubscribe():
client = create_stubbed_client()
symb = 'tXRPBTC'
pair = 'XRPBTC'
# publsh connection created message
await ws_publish_connection_init(client.ws)
# create new subscription to trades
await client.ws.subscribe('trades', symb)
# announce subscription was successful
sub_watch = EventWatcher.watch(client.ws, 'subscribed')
await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair})
s_res = sub_watch.wait_until_complete()
# unsubscribe from channel
await s_res.unsubscribe()
last_sent = client.ws.get_last_sent_item()
sent_unsub = json.loads(last_sent['data'])
# {'event': 'unsubscribe', 'chanId': 2}
assert sent_unsub['event'] == 'unsubscribe'
assert sent_unsub['chanId'] == 2
@pytest.mark.asyncio
async def test_event_unsubscribe():
client = create_stubbed_client()
symb = 'tXRPBTC'
pair = 'XRPBTC'
# publish connection created message
await ws_publish_connection_init(client.ws)
# create new subscription to trades
await client.ws.subscribe('trades', symb)
# announce subscription was successful
sub_watch = EventWatcher.watch(client.ws, 'subscribed')
await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair})
s_res = sub_watch.wait_until_complete()
# unsubscribe from channel
await s_res.unsubscribe()
last_sent = client.ws.get_last_sent_item()
sent_unsub = json.loads(last_sent['data'])
# publish confirmation of unsubscribe
unsub_watch = EventWatcher.watch(client.ws, 'unsubscribed')
await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2})
unsub_res = unsub_watch.wait_until_complete()
assert s_res.channel_name == 'trades'
assert s_res.symbol == symb
assert s_res.is_subscribed_bool == False
assert s_res.chan_id == 2
@pytest.mark.asyncio
async def test_submit_resubscribe():
client = create_stubbed_client()
symb = 'tXRPBTC'
pair = 'XRPBTC'
# publish connection created message
await ws_publish_connection_init(client.ws)
# request two new subscriptions
await client.ws.subscribe('book', symb)
await client.ws.subscribe('trades', symb)
# confirm subscriptions
await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair})
await client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair})
# call resubscribe all
await client.ws.resubscribe_all()
## assert that 2 unsubscribe requests were sent
last_sent = client.ws.get_sent_items()[-2:]
for i in last_sent:
data = json.loads(i['data'])
assert data['event'] == 'unsubscribe'
assert (data['chanId'] == 2 or data['chanId'] == 3)
## confirm unsubscriptions
await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2})
await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":3})
## confirm subscriptions
# await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair})
# await client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair})
# wait for emit of event
n_last_sent = client.ws.get_sent_items()[-2:]
for i in n_last_sent:
data = json.loads(i['data'])
# print (data)
assert data['event'] == 'subscribe'
assert (data['channel'] == 'book' or data['channel'] == 'trades')
assert data['symbol'] == symb

52
bfxapi/utils/Decimal.py Normal file
View File

@@ -0,0 +1,52 @@
import decimal as dec
class Decimal(dec.Decimal):
@classmethod
def from_float(cls, f):
return cls(str(f))
def __new__(cls, value=0, *args, **kwargs):
if isinstance(value, float):
value = Decimal.from_float(value)
return super(Decimal, cls).__new__(cls, value, *args, **kwargs)
def __mul__(self, rhs):
if isinstance(rhs, float):
rhs = Decimal.from_float(rhs)
return Decimal(super().__mul__(rhs))
def __rmul__(self, lhs):
return self.__mul__(lhs)
def __add__(self, rhs):
if isinstance(rhs, float):
rhs = Decimal.from_float(rhs)
return Decimal(super().__add__(rhs))
def __radd__(self, lhs):
return self.__add__(lhs)
def __sub__(self, rhs):
if isinstance(rhs, float):
rhs = Decimal.from_float(rhs)
return Decimal(super().__sub__(rhs))
def __rsub__(self, lhs):
return self.__sub__(lhs)
def __truediv__(self, rhs):
if isinstance(rhs, float):
rhs = Decimal.from_float(rhs)
return Decimal(super().__truediv__(rhs))
def __rtruediv__(self, rhs):
return self.__truediv__(rhs)
def __floordiv__(self, rhs):
if isinstance(rhs, float):
rhs = Decimal.from_float(rhs)
return Decimal(super().__floordiv__(rhs))
def __rfloordiv__ (self, rhs):
return self.__floordiv__(rhs)

View File

@@ -98,13 +98,17 @@ class BfxWebsocket(GenericWebsocket):
} }
def __init__(self, API_KEY=None, API_SECRET=None, host='wss://api.bitfinex.com/ws/2', def __init__(self, API_KEY=None, API_SECRET=None, host='wss://api.bitfinex.com/ws/2',
manageOrderBooks=False, dead_man_switch=False, logLevel='INFO', *args, **kwargs): manageOrderBooks=False, dead_man_switch=False, logLevel='INFO', parse_float=float,
*args, **kwargs):
self.API_KEY = API_KEY self.API_KEY = API_KEY
self.API_SECRET = API_SECRET self.API_SECRET = API_SECRET
self.manageOrderBooks = manageOrderBooks self.manageOrderBooks = manageOrderBooks
self.dead_man_switch = dead_man_switch self.dead_man_switch = dead_man_switch
self.pendingOrders = {} self.pendingOrders = {}
self.orderBooks = {} self.orderBooks = {}
# How should we store float values? could also be bfxapi.Decimal
# which is slower but has higher precision.
self.parse_float = parse_float
super(BfxWebsocket, self).__init__( super(BfxWebsocket, self).__init__(
host, logLevel=logLevel, *args, **kwargs) host, logLevel=logLevel, *args, **kwargs)
@@ -149,7 +153,7 @@ class BfxWebsocket(GenericWebsocket):
self.logger.warn( self.logger.warn(
"Unknown websocket event: '{}' {}".format(eType, msg)) "Unknown websocket event: '{}' {}".format(eType, msg))
async def _ws_data_handler(self, data): async def _ws_data_handler(self, data, raw_message_str):
dataEvent = data[1] dataEvent = data[1]
chan_id = data[0] chan_id = data[0]
@@ -161,7 +165,7 @@ class BfxWebsocket(GenericWebsocket):
if subscription.channel_name == 'candles': if subscription.channel_name == 'candles':
await self._candle_handler(data) await self._candle_handler(data)
if subscription.channel_name == 'book': if subscription.channel_name == 'book':
await self._order_book_handler(data) await self._order_book_handler(data, raw_message_str)
if subscription.channel_name == 'trades': if subscription.channel_name == 'trades':
await self._trade_handler(data) await self._trade_handler(data)
else: else:
@@ -320,7 +324,7 @@ class BfxWebsocket(GenericWebsocket):
data[1], subscription.symbol, subscription.timeframe) data[1], subscription.symbol, subscription.timeframe)
self._emit('new_candle', candle) self._emit('new_candle', candle)
async def _order_book_handler(self, data): async def _order_book_handler(self, data, orig_raw_message):
obInfo = data[1] obInfo = data[1]
chan_id = data[0] chan_id = data[0]
subscription = self.subscriptionManager.get(data[0]) subscription = self.subscriptionManager.get(data[0])
@@ -345,23 +349,24 @@ class BfxWebsocket(GenericWebsocket):
isSnapshot = type(obInfo[0]) is list isSnapshot = type(obInfo[0]) is list
if isSnapshot: if isSnapshot:
self.orderBooks[symbol] = OrderBook() self.orderBooks[symbol] = OrderBook()
self.orderBooks[symbol].update_from_snapshot(obInfo) self.orderBooks[symbol].update_from_snapshot(obInfo, orig_raw_message)
self._emit('order_book_snapshot', { self._emit('order_book_snapshot', {
'symbol': symbol, 'data': obInfo}) 'symbol': symbol, 'data': obInfo})
else: else:
self.orderBooks[symbol].update_with(obInfo) self.orderBooks[symbol].update_with(obInfo, orig_raw_message)
self._emit('order_book_update', {'symbol': symbol, 'data': obInfo}) self._emit('order_book_update', {'symbol': symbol, 'data': obInfo})
async def on_message(self, message): async def on_message(self, message):
self.logger.debug(message) self.logger.debug(message)
msg = json.loads(message) # convert float values to decimal
msg = json.loads(message, parse_float=self.parse_float)
self._emit('all', msg) self._emit('all', msg)
if type(msg) is dict: if type(msg) is dict:
# System messages are received as json # System messages are received as json
await self._ws_system_handler(msg) await self._ws_system_handler(msg)
elif type(msg) is list: elif type(msg) is list:
# All data messages are received as a list # All data messages are received as a list
await self._ws_data_handler(msg) await self._ws_data_handler(msg, message)
else: else:
self.logger.warn('Unknown websocket response: {}'.format(msg)) self.logger.warn('Unknown websocket response: {}'.format(msg))
@@ -369,7 +374,7 @@ class BfxWebsocket(GenericWebsocket):
jdata = generate_auth_payload(self.API_KEY, self.API_SECRET) jdata = generate_auth_payload(self.API_KEY, self.API_SECRET)
if self.dead_man_switch: if self.dead_man_switch:
jdata['dms'] = 4 jdata['dms'] = 4
await self.ws.send(json.dumps(jdata)) await self.get_ws().send(json.dumps(jdata))
async def on_open(self): async def on_open(self):
self.logger.info("Websocket opened.") self.logger.info("Websocket opened.")
@@ -380,17 +385,19 @@ class BfxWebsocket(GenericWebsocket):
# enable order book checksums # enable order book checksums
if self.manageOrderBooks: if self.manageOrderBooks:
await self.enable_flag(Flags.CHECKSUM) await self.enable_flag(Flags.CHECKSUM)
# resubscribe to any channels
await self.subscriptionManager.resubscribe_all()
async def _send_auth_command(self, channel_name, data): async def _send_auth_command(self, channel_name, data):
payload = [0, channel_name, None, data] payload = [0, channel_name, None, data]
await self.ws.send(json.dumps(payload)) await self.get_ws().send(json.dumps(payload))
async def enable_flag(self, flag): async def enable_flag(self, flag):
payload = { payload = {
"event": 'conf', "event": 'conf',
"flags": flag "flags": flag
} }
await self.ws.send(json.dumps(payload)) await self.get_ws().send(json.dumps(payload))
def get_orderbook(self, symbol): def get_orderbook(self, symbol):
return self.orderBooks.get(symbol, None) return self.orderBooks.get(symbol, None)

View File

@@ -4,11 +4,14 @@ Module used as a interfeace to describe a generick websocket client
import asyncio import asyncio
import websockets import websockets
import socket
import json import json
from pyee import EventEmitter from pyee import EventEmitter
from ..utils.CustomLogger import CustomLogger from ..utils.CustomLogger import CustomLogger
# websocket exceptions
from websockets.exceptions import ConnectionClosed
class AuthError(Exception): class AuthError(Exception):
""" """
@@ -16,7 +19,6 @@ class AuthError(Exception):
""" """
pass pass
def is_json(myjson): def is_json(myjson):
try: try:
json_object = json.loads(myjson) json_object = json.loads(myjson)
@@ -24,20 +26,20 @@ def is_json(myjson):
return False return False
return True return True
class GenericWebsocket: class GenericWebsocket:
""" """
Websocket object used to contain the base functionality of a websocket. Websocket object used to contain the base functionality of a websocket.
Inlcudes an event emitter and a standard websocket client. Inlcudes an event emitter and a standard websocket client.
""" """
def __init__(self, host, logLevel='INFO', loop=None): def __init__(self, host, logLevel='INFO', loop=None, max_retries=5):
self.host = host self.host = host
self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel) self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel)
self.loop = loop or asyncio.get_event_loop() self.loop = loop or asyncio.get_event_loop()
self.events = EventEmitter( self.events = EventEmitter(
scheduler=asyncio.ensure_future, loop=self.loop) scheduler=asyncio.ensure_future, loop=self.loop)
self.ws = None self.ws = None
self.max_retries = max_retries
def run(self): def run(self):
""" """
@@ -51,15 +53,33 @@ class GenericWebsocket:
""" """
return self._main(self.host) return self._main(self.host)
async def _main(self, host): async def _connect(self, host):
async with websockets.connect(host) as websocket: async with websockets.connect(host) as websocket:
self.ws = websocket self.ws = websocket
self.logger.info("Wesocket connectedt to {}".format(self.host)) self.logger.info("Wesocket connected to {}".format(host))
while True: while True:
await asyncio.sleep(0) await asyncio.sleep(0)
message = await websocket.recv() message = await websocket.recv()
await self.on_message(message) await self.on_message(message)
def get_ws(self):
return self.ws
async def _main(self, host):
retries = 0
while retries < self.max_retries:
try:
await self._connect(host)
retries = 0
except (ConnectionClosed, socket.error) as e:
self.logger.error(str(e))
retries += 1
# wait 5 seconds befor retrying
self.logger.info("Waiting 5 seconds befor retrying...")
await asyncio.sleep(5)
self.logger.info("Reconnect attempt {}/{}".format(retries, self.max_retries))
self.logger.info("Unable to connect to websocket.")
def remove_all_listeners(self, event): def remove_all_listeners(self, event):
""" """
Remove all listeners from event emitter Remove all listeners from event emitter

View File

@@ -32,7 +32,7 @@ class SubscriptionManager:
""" """
# create a new subscription # create a new subscription
subscription = Subscription( subscription = Subscription(
self.bfxapi.ws, channel_name, symbol, timeframe, **kwargs) self.bfxapi, channel_name, symbol, timeframe, **kwargs)
self.logger.info("Subscribing to channel {}".format(channel_name)) self.logger.info("Subscribing to channel {}".format(channel_name))
key = "{}_{}".format(channel_name, subscription.key or symbol) key = "{}_{}".format(channel_name, subscription.key or symbol)
self.pending_subscriptions[key] = subscription self.pending_subscriptions[key] = subscription
@@ -63,11 +63,11 @@ class SubscriptionManager:
chan_id = raw_ws_data.get("chanId") chan_id = raw_ws_data.get("chanId")
sub = self.subscriptions_chanid[chan_id] sub = self.subscriptions_chanid[chan_id]
sub.confirm_unsubscribe() sub.confirm_unsubscribe()
self.bfxapi._emit('unsubscribed', sub)
# call onComplete callback if exists # call onComplete callback if exists
if sub.sub_id in self.unsubscribe_callbacks: if sub.sub_id in self.unsubscribe_callbacks:
await self.unsubscribe_callbacks[sub.sub_id]() await self.unsubscribe_callbacks[sub.sub_id]()
del self.unsubscribe_callbacks[sub.sub_id] del self.unsubscribe_callbacks[sub.sub_id]
self.bfxapi._emit('unsubscribed', sub)
def get(self, chan_id): def get(self, chan_id):
return self.subscriptions_chanid[chan_id] return self.subscriptions_chanid[chan_id]
@@ -121,6 +121,8 @@ class SubscriptionManager:
task_batch += [ task_batch += [
asyncio.ensure_future(self.unsubscribe(chan_id)) asyncio.ensure_future(self.unsubscribe(chan_id))
] ]
if len(task_batch) == 0:
return
await asyncio.wait(*[task_batch]) await asyncio.wait(*[task_batch])
async def resubscribe_all(self): async def resubscribe_all(self):
@@ -132,4 +134,6 @@ class SubscriptionManager:
task_batch += [ task_batch += [
asyncio.ensure_future(self.resubscribe(chan_id)) asyncio.ensure_future(self.resubscribe(chan_id))
] ]
if len(task_batch) == 0:
return
await asyncio.wait(*[task_batch]) await asyncio.wait(*[task_batch])

View File

@@ -9,3 +9,5 @@ disable=too-few-public-methods,
len-as-condition, len-as-condition,
too-many-instance-attributes, too-many-instance-attributes,
invalid-name invalid-name
ignore=tests

View File

@@ -2,3 +2,4 @@ eventemitter==0.2.0
asyncio==3.4.3 asyncio==3.4.3
websockets==7.0 websockets==7.0
pylint==2.2.2 pylint==2.2.2
pytest-asyncio==0.6.0