diff --git a/bfxapi/__init__.py b/bfxapi/__init__.py index 1f9755e..1ff271d 100644 --- a/bfxapi/__init__.py +++ b/bfxapi/__init__.py @@ -6,5 +6,7 @@ from .client import Client from .models import (Order, Trade, OrderBook, Subscription, Wallet, Position, FundingLoan, FundingOffer, FundingCredit) from .websockets.GenericWebsocket import GenericWebsocket +from .websockets.BfxWebsocket import BfxWebsocket +from .utils.Decimal import Decimal NAME = 'bfxapi' diff --git a/bfxapi/models/Order.py b/bfxapi/models/Order.py index dc05ba7..f209bb8 100644 --- a/bfxapi/models/Order.py +++ b/bfxapi/models/Order.py @@ -5,7 +5,6 @@ Module used to describe all of the different data types import time import datetime - class OrderType: """ Enum used to describe all of the different order types available for use diff --git a/bfxapi/models/Subscription.py b/bfxapi/models/Subscription.py index bc32ba4..0b3841c 100644 --- a/bfxapi/models/Subscription.py +++ b/bfxapi/models/Subscription.py @@ -4,7 +4,15 @@ Module used to describe all of the different data types import time import json +from random import randint +def generate_sub_id(): + """ + Generates a unique id in the form of 12345566-12334556 + """ + prefix = str(int(round(time.time() * 1000))) + suffix = str(randint(0, 9999999)) + return "{}-{}".format(prefix, suffix) class Subscription: """ @@ -13,8 +21,8 @@ class Subscription: such as unsibscribe and subscribe. """ - def __init__(self, ws, channel_name, symbol, timeframe=None, **kwargs): - self._ws = ws + def __init__(self, bfxapi, channel_name, symbol, timeframe=None, **kwargs): + self.bfxapi = bfxapi self.channel_name = channel_name self.symbol = symbol self.timeframe = timeframe @@ -23,7 +31,7 @@ class Subscription: self.chan_id = None if timeframe: self.key = 'trade:{}:{}'.format(self.timeframe, self.symbol) - self.sub_id = int(round(time.time() * 1000)) + self.sub_id = generate_sub_id() self.send_payload = self._generate_payload(**kwargs) def confirm_subscription(self, chan_id): @@ -40,13 +48,13 @@ class Subscription: if not self.is_subscribed(): raise Exception("Subscription is not subscribed to websocket") payload = {'event': 'unsubscribe', 'chanId': self.chan_id} - await self._ws.send(json.dumps(payload)) + await self.bfxapi.get_ws().send(json.dumps(payload)) async def subscribe(self): """ Send a subscription request to the bitfinex socket """ - await self._ws.send(json.dumps(self._get_send_payload())) + await self.bfxapi.get_ws().send(json.dumps(self._get_send_payload())) def confirm_unsubscribe(self): """ diff --git a/bfxapi/models/order.py b/bfxapi/models/order.py index dc05ba7..f209bb8 100644 --- a/bfxapi/models/order.py +++ b/bfxapi/models/order.py @@ -5,7 +5,6 @@ Module used to describe all of the different data types import time import datetime - class OrderType: """ Enum used to describe all of the different order types available for use diff --git a/bfxapi/models/order_book.py b/bfxapi/models/order_book.py index 48a8a44..f00cfbb 100644 --- a/bfxapi/models/order_book.py +++ b/bfxapi/models/order_book.py @@ -3,17 +3,7 @@ Module used to describe all of the different data types """ import zlib - - -def prepare_price(price): - """ - Convert the price to an acceptable format - """ - # convert to 4 significant figures - prep_price = '{0:.4f}'.format(price) - # remove decimal place if zero float - return '{0:g}'.format(float(prep_price)) - +import json class OrderBook: """ @@ -42,26 +32,37 @@ class OrderBook: """ return self.asks - def update_from_snapshot(self, data): + def update_from_snapshot(self, data, orig_raw_msg): """ Update the orderbook with a raw orderbook snapshot """ - for order in data: - if len(order) == 4: - if order[3] < 0: + # we need to keep the original string values that are sent to use + # this avoids any problems with floats + orig_raw = json.loads(orig_raw_msg, parse_float=str, parse_int=str)[1] + zip_data = [] + # zip both the float values and string values together + for index, order in enumerate(data): + zip_data += [(order, orig_raw[index])] + ## build our bids and asks + for order in zip_data: + if len(order[0]) == 4: + if order[0][3] < 0: self.bids += [order] else: self.asks += [order] else: - if order[2] < 0: + if order[0][2] < 0: self.asks += [order] else: self.bids += [order] - def update_with(self, order): + def update_with(self, order, orig_raw_msg): """ Update the orderbook with a single update """ + # keep orginal string vlues to avoid checksum float errors + orig_raw = json.loads(orig_raw_msg, parse_float=str, parse_int=str)[1] + zip_order = (order, orig_raw) if len(order) == 4: amount = order[3] count = order[2] @@ -74,12 +75,12 @@ class OrderBook: # if first item in ordebook if len(side) == 0: - side += [order] + side += [zip_order] return - # match price level + # match price level but use the float parsed object for index, s_order in enumerate(side): - s_price = s_order[0] + s_price = s_order[0][0] if s_price == price: if count == 0: del side[index] @@ -92,8 +93,8 @@ class OrderBook: return # add to book and sort lowest to highest - side += [order] - side.sort(key=lambda x: x[0], reverse=not amount < 0) + side += [zip_order] + side.sort(key=lambda x: x[0][0], reverse=not amount < 0) return def checksum(self): @@ -104,17 +105,19 @@ class OrderBook: # take set of top 25 bids/asks for index in range(0, 25): if index < len(self.bids): - bid = self.bids[index] + # use the string parsed array + bid = self.bids[index][1] price = bid[0] amount = bid[3] if len(bid) == 4 else bid[2] - data += [prepare_price(price)] - data += [str(amount)] + data += [price] + data += [amount] if index < len(self.asks): - ask = self.asks[index] + # use the string parsed array + ask = self.asks[index][1] price = ask[0] amount = ask[3] if len(ask) == 4 else ask[2] - data += [prepare_price(price)] - data += [str(amount)] + data += [price] + data += [amount] checksum_str = ':'.join(data) # calculate checksum and force signed integer checksum = zlib.crc32(checksum_str.encode('utf8')) & 0xffffffff diff --git a/bfxapi/models/subscription.py b/bfxapi/models/subscription.py index bc32ba4..0b3841c 100644 --- a/bfxapi/models/subscription.py +++ b/bfxapi/models/subscription.py @@ -4,7 +4,15 @@ Module used to describe all of the different data types import time import json +from random import randint +def generate_sub_id(): + """ + Generates a unique id in the form of 12345566-12334556 + """ + prefix = str(int(round(time.time() * 1000))) + suffix = str(randint(0, 9999999)) + return "{}-{}".format(prefix, suffix) class Subscription: """ @@ -13,8 +21,8 @@ class Subscription: such as unsibscribe and subscribe. """ - def __init__(self, ws, channel_name, symbol, timeframe=None, **kwargs): - self._ws = ws + def __init__(self, bfxapi, channel_name, symbol, timeframe=None, **kwargs): + self.bfxapi = bfxapi self.channel_name = channel_name self.symbol = symbol self.timeframe = timeframe @@ -23,7 +31,7 @@ class Subscription: self.chan_id = None if timeframe: self.key = 'trade:{}:{}'.format(self.timeframe, self.symbol) - self.sub_id = int(round(time.time() * 1000)) + self.sub_id = generate_sub_id() self.send_payload = self._generate_payload(**kwargs) def confirm_subscription(self, chan_id): @@ -40,13 +48,13 @@ class Subscription: if not self.is_subscribed(): raise Exception("Subscription is not subscribed to websocket") payload = {'event': 'unsubscribe', 'chanId': self.chan_id} - await self._ws.send(json.dumps(payload)) + await self.bfxapi.get_ws().send(json.dumps(payload)) async def subscribe(self): """ Send a subscription request to the bitfinex socket """ - await self._ws.send(json.dumps(self._get_send_payload())) + await self.bfxapi.get_ws().send(json.dumps(self._get_send_payload())) def confirm_unsubscribe(self): """ diff --git a/bfxapi/rest/BfxRest.py b/bfxapi/rest/BfxRest.py index 129abf8..fc5c0f0 100644 --- a/bfxapi/rest/BfxRest.py +++ b/bfxapi/rest/BfxRest.py @@ -22,11 +22,13 @@ class BfxRest: """ def __init__(self, API_KEY, API_SECRET, host='https://api.bitfinex.com/v2', loop=None, - logLevel='INFO', *args, **kwargs): + logLevel='INFO', parse_float=float, *args, **kwargs): self.loop = loop or asyncio.get_event_loop() self.API_KEY = API_KEY self.API_SECRET = API_SECRET self.host = host + # this value can also be set to bfxapi.Decimal for much higher precision + self.parse_float = parse_float self.logger = CustomLogger('BfxRest', logLevel=logLevel) async def fetch(self, endpoint, params=""): @@ -42,7 +44,8 @@ class BfxRest: if resp.status is not 200: raise Exception('GET {} failed with status {} - {}' .format(url, resp.status, text)) - return await resp.json() + parsed = json.loads(text, parse_float=self.parse_float) + return parsed async def post(self, endpoint, data={}, params=""): """ @@ -61,7 +64,8 @@ class BfxRest: if resp.status is not 200: raise Exception('POST {} failed with status {} - {}' .format(url, resp.status, text)) - return await resp.json() + parsed = json.loads(text, parse_float=self.parse_float) + return parsed ################################################## # Public Data # diff --git a/bfxapi/tests/__init__.py b/bfxapi/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/bfxapi/tests/helpers.py b/bfxapi/tests/helpers.py new file mode 100644 index 0000000..1670beb --- /dev/null +++ b/bfxapi/tests/helpers.py @@ -0,0 +1,90 @@ +import time +import json +import asyncio + +from .. import Client, BfxWebsocket + +def get_now(): + return int(round(time.time() * 1000)) + +class StubbedWebsocket(BfxWebsocket): + def __new__(cls, *args, **kwargs): + instance = super(StubbedWebsocket, cls).__new__(cls, *args, **kwargs) + instance.sent_items = [] + instance.published_items = [] + return instance + + async def _main(self, host): + print ("Faking wesocket connection to {}".format(host)) + + def get_ws(self): + return self + + async def publish(self, data, is_json=True): + self.published_items += [{ + 'time': get_now(), + 'data': data + }] + # convert to string and push through the websocket + data = json.dumps(data) if is_json else data + return await self.on_message(data) + + async def publish_auth_confirmation(self): + return self.publish({"event":"auth","status":"OK","chanId":0,"userId":269499,"auth_id":"58aa0472-b1a9-4690-8ab8-300d68e66aaf","caps":{"orders":{"read":1,"write":1},"account":{"read":1,"write":0},"funding":{"read":1,"write":1},"history":{"read":1,"write":0},"wallets":{"read":1,"write":1},"withdraw":{"read":0,"write":1},"positions":{"read":1,"write":1}}}) + + async def send(self, data_string): + self.sent_items += [{ + 'time': get_now(), + 'data': data_string + }] + + def get_published_items(self): + return self.published_items + + def get_sent_items(self): + return self.sent_items + + def get_last_sent_item(self): + return self.sent_items[-1:][0] + + def get_sent_items_count(self): + return len(self.sent_items) + +class EventWatcher(): + + def __init__(self, ws, event): + self.value = None + self.event = event + ws.once(event, self._finish) + + def _finish(self, value): + self.value = value or {} + + @classmethod + def watch(cls, ws, event): + return EventWatcher(ws, event) + + def wait_until_complete(self, max_wait_time=5): + counter = 0 + while self.value == None: + if counter > 5: + raise Exception('Wait time limit exceeded for event {}'.format(self.event)) + time.sleep(1) + counter += 1 + return self.value + +def create_stubbed_client(*args, **kwargs): + client = Client(*args, **kwargs) + # no support for rest stubbing yet + client.rest = None + client.ws = StubbedWebsocket(*args, **kwargs) + return client + +async def ws_publish_auth_accepted(ws): + return await ws.publish({"event":"auth","status":"OK","chanId":0,"userId":269499,"auth_id":"58aa0472-b1a9-4690-8ab8-300d68e66aaf","caps":{"orders":{"read":1,"write":1},"account":{"read":1,"write":0},"funding":{"read":1,"write":1},"history":{"read":1,"write":0},"wallets":{"read":1,"write":1},"withdraw":{"read":0,"write":1},"positions":{"read":1,"write":1}}}) + +async def ws_publish_connection_init(ws): + return await ws.publish({"event":"info","version":2,"serverId":"748c00f2-250b-46bb-8519-ce1d7d68e4f0","platform":{"status":1}}) + +async def ws_publish_conf_accepted(ws, flags_code): + return await ws.publish({"event":"conf","status":"OK","flags":flags_code}) diff --git a/bfxapi/tests/test_decimal.py b/bfxapi/tests/test_decimal.py new file mode 100644 index 0000000..23adbc1 --- /dev/null +++ b/bfxapi/tests/test_decimal.py @@ -0,0 +1,24 @@ +import sys +sys.path.append('../components') + +from bfxapi import Decimal + +def test_precision(): + assert str(Decimal(0.00000123456789)) == "0.00000123456789" + assert str(Decimal("0.00000123456789")) == "0.00000123456789" + +def test_float_operations(): + assert str(Decimal(0.0002) * 0.02) == "0.000004" + assert str(0.02 * Decimal(0.0002)) == "0.000004" + + assert str(Decimal(0.0002) / 0.02) == "0.01" + assert str(0.02 / Decimal(0.0002)) == "0.01" + + assert str(0.02 + Decimal(0.0002)) == "0.0202" + assert str(Decimal(0.0002) + 0.02) == "0.0202" + + assert str(0.02 - Decimal(0.0002)) == "-0.0198" + assert str(Decimal(0.0002) - 0.02) == "-0.0198" + + assert str(0.01 // Decimal(0.0004)) == "0" + assert str(Decimal(0.0004) // 0.01) == "0" diff --git a/bfxapi/tests/test_ws_orderbook.py b/bfxapi/tests/test_ws_orderbook.py new file mode 100644 index 0000000..cdeae1f --- /dev/null +++ b/bfxapi/tests/test_ws_orderbook.py @@ -0,0 +1,65 @@ +import pytest +from .helpers import create_stubbed_client, ws_publish_connection_init, ws_publish_conf_accepted + +@pytest.mark.asyncio +async def test_checksum_generation(): + client = create_stubbed_client() + symbol = "tXRPBTC" + # publsh connection created message + await ws_publish_connection_init(client.ws) + # publish checksum flag accepted + await ws_publish_conf_accepted(client.ws, 131072) + # subscribe to order book + await client.ws.subscribe('book', symbol) + ## send subscription accepted + chanId = 123 + await client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol}) + ## send orderbook snapshot + await client.ws.publish("""[123, [[0.0000886,1,1060.55466114],[0.00008859,1,1000],[0.00008858,1,2713.47159343],[0.00008857,1,4276.92870916],[0.00008856,2,6764.75562319], + [0.00008854,1,5641.48532401],[0.00008853,1,2255.92632223],[0.0000885,1,2256.69584601],[0.00008848,2,3630.3],[0.00008845,1,28195.70625766], + [0.00008844,1,15571.7],[0.00008843,1,2500],[0.00008841,1,64196.16117814],[0.00008838,1,7500],[0.00008837,2,2764.12999012],[0.00008834,2,10886.476298], + [0.00008831,1,20000],[0.0000883,1,1000],[0.00008829,2,2517.22175358],[0.00008828,1,450.45],[0.00008827,1,13000],[0.00008824,1,1500],[0.0000882,1,300], + [0.00008817,1,3000],[0.00008816,1,100],[0.00008864,1,-481.8549041],[0.0000887,2,-2141.77009092],[0.00008871,1,-2256.45433182],[0.00008872,1,-2707.58122743], + [0.00008874,1,-5640.31794092],[0.00008876,1,-29004.93294912],[0.00008878,1,-2500],[0.0000888,1,-20000],[0.00008881,2,-2880.15595827],[0.00008882,1,-27705.42933984], + [0.00008883,1,-4509.83708214],[0.00008884,1,-1500],[0.00008885,1,-2500],[0.00008888,1,-902.91405442],[0.00008889,1,-900],[0.00008891,1,-7500], + [0.00008894,1,-775.08564697],[0.00008896,1,-150],[0.00008899,3,-11628.02590049],[0.000089,2,-1299.7],[0.00008902,2,-4841.8],[0.00008904,3,-25320.46250083], + [0.00008909,1,-14000],[0.00008913,1,-123947.999],[0.00008915,2,-28019.6]]]""", is_json=False) + ## send some more price updates + await client.ws.publish("[{},[0.00008915,0,-1]]".format(chanId), is_json=False) + await client.ws.publish("[{},[0.00008837,1,56.54876269]]".format(chanId), is_json=False) + await client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False) + ## check checksum is the same as expected + expected_checksum = 30026640 + actual_checksum = client.ws.orderBooks[symbol].checksum() + assert expected_checksum == actual_checksum + +@pytest.mark.asyncio +async def test_checksum_really_samll_numbers_generation(): + client = create_stubbed_client() + symbol = "tVETBTC" + # publsh connection created message + await ws_publish_connection_init(client.ws) + # publish checksum flag accepted + await ws_publish_conf_accepted(client.ws, 131072) + # subscribe to order book + await client.ws.subscribe('book', symbol) + ## send subscription accepted + chanId = 123 + await client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol}) + ## send orderbook snapshot + await client.ws.publish("""[123, [[0.00000121,5,249013.0209708],[0.0000012,6,518315.33310128],[0.00000119,4,566200.89],[0.00000118,2,260000],[0.00000117,1,100000], + [0.00000116,2,160000],[0.00000114,1,60000],[0.00000113,2,198500],[0.00000112,1,60000],[0.0000011,1,60000],[0.00000106,2,113868.87735849],[0.00000105,2,105000], + [0.00000103,1,3000],[0.00000102,2,105000],[0.00000101,2,202970],[0.000001,2,21000],[7e-7,1,10000],[6.6e-7,1,10000],[6e-7,1,100000],[4.9e-7,1,10000],[2.5e-7,1,2000], + [6e-8,1,100000],[5e-8,1,200000],[1e-8,4,640000],[0.00000122,7,-312043.19],[0.00000123,6,-415094.8939744],[0.00000124,5,-348181.23],[0.00000125,1,-12000], + [0.00000126,2,-143872.31],[0.00000127,1,-5000],[0.0000013,1,-5000],[0.00000134,1,-8249.18938656],[0.00000135,2,-230043.1337899],[0.00000136,1,-13161.25184766], + [0.00000145,1,-2914],[0.0000015,3,-54448.5],[0.00000152,2,-5538.54849594],[0.00000153,1,-62691.75475079],[0.00000159,1,-2914],[0.0000016,1,-52631.10296831], + [0.00000164,1,-4000],[0.00000166,1,-3831.46784605],[0.00000171,1,-14575.17730379],[0.00000174,1,-3124.81815395],[0.0000018,1,-18000],[0.00000182,1,-16000], + [0.00000186,1,-4000],[0.00000189,1,-10000.686624],[0.00000191,1,-14500]]]""", is_json=False) + ## send some more price updates + await client.ws.publish("[{},[0.00000121,4,228442.6609708]]".format(chanId), is_json=False) + await client.ws.publish("[{},[0.00000121,6,304023.8109708]]".format(chanId), is_json=False) + # await client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False) + ## check checksum is the same as expected + expected_checksum = 1770440002 + actual_checksum = client.ws.orderBooks[symbol].checksum() + assert expected_checksum == actual_checksum diff --git a/bfxapi/tests/test_ws_orders.py b/bfxapi/tests/test_ws_orders.py new file mode 100644 index 0000000..d981721 --- /dev/null +++ b/bfxapi/tests/test_ws_orders.py @@ -0,0 +1,112 @@ +import pytest +import json +from .helpers import (create_stubbed_client, ws_publish_auth_accepted, ws_publish_connection_init, + EventWatcher) + +@pytest.mark.asyncio +async def test_submit_order(): + client = create_stubbed_client() + # publsh connection created message + await ws_publish_connection_init(client.ws) + ## send auth accepted + await ws_publish_auth_accepted(client.ws) + ## send new order + await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET') + last_sent = client.ws.get_last_sent_item() + sent_order_array = json.loads(last_sent['data']) + assert sent_order_array[1] == "on" + sent_order_json = sent_order_array[3] + assert sent_order_json['type'] == "EXCHANGE MARKET" + assert sent_order_json['symbol'] == "tBTCUSD" + assert sent_order_json['amount'] == "0.01" + assert sent_order_json['price'] == "19000" + +@pytest.mark.asyncio +async def test_submit_update_order(): + client = create_stubbed_client() + # publsh connection created message + await ws_publish_connection_init(client.ws) + ## send auth accepted + await ws_publish_auth_accepted(client.ws) + ## send new order + await client.ws.update_order(123, price=100, amount=0.01, hidden=True) + last_sent = client.ws.get_last_sent_item() + sent_order_array = json.loads(last_sent['data']) + assert sent_order_array[1] == "ou" + sent_order_json = sent_order_array[3] + # {"id": 123, "price": "100", "amount": "0.01", "flags": 64} + assert sent_order_json['id'] == 123 + assert sent_order_json['price'] == "100" + assert sent_order_json['amount'] == "0.01" + assert sent_order_json['flags'] == 64 + +@pytest.mark.asyncio +async def test_submit_cancel_order(): + client = create_stubbed_client() + # publsh connection created message + await ws_publish_connection_init(client.ws) + ## send auth accepted + await ws_publish_auth_accepted(client.ws) + ## send new order + await client.ws.cancel_order(123) + last_sent = client.ws.get_last_sent_item() + sent_order_array = json.loads(last_sent['data']) + assert sent_order_array[1] == "oc" + sent_order_json = sent_order_array[3] + assert sent_order_json['id'] == 123 + +@pytest.mark.asyncio +async def test_events_on_new_order(): + client = create_stubbed_client() + # publsh connection created message + await ws_publish_connection_init(client.ws) + ## send auth accepted + await ws_publish_auth_accepted(client.ws) + + ## look for new order confirmation + o_new = EventWatcher.watch(client.ws, 'order_new') + await client.ws.publish([0,"on",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + new_res = o_new.wait_until_complete() + assert new_res.amount_orig == -1 + assert new_res.amount_filled == 0 + assert new_res.price == 15980 + assert new_res.type == 'EXCHANGE LIMIT' + + ## look for order update confirmation + o_update = EventWatcher.watch(client.ws, 'order_update') + await client.ws.publish([0,"ou",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + update_res = o_update.wait_until_complete() + assert update_res.amount_orig == -1 + assert float(update_res.amount_filled) == -0.5 + assert update_res.price == 15980 + assert update_res.type == 'EXCHANGE LIMIT' + + ## look for closed notification + o_closed = EventWatcher.watch(client.ws, 'order_closed') + await client.ws.publish([0,"oc",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + closed_res = o_closed.wait_until_complete() + assert new_res.amount_orig == -1 + assert new_res.amount_filled == 0 + assert new_res.price == 15980 + assert new_res.type == 'EXCHANGE LIMIT' + +@pytest.mark.asyncio +async def test_events_on_cancel_order(): + client = create_stubbed_client() + # publsh connection created message + await ws_publish_connection_init(client.ws) + ## send auth accepted + await ws_publish_auth_accepted(client.ws) + + ## Create new order + await client.ws.publish([0,"on",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123460,1,1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + + ## look for order closed confirmation + o_close = EventWatcher.watch(client.ws, 'order_closed') + await client.ws.publish([0,"oc",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123548,1,1,"EXCHANGE LIMIT",None,None,None,0,"CANCELED",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + close_res = o_close.wait_until_complete() + assert close_res.amount_orig == 1 + assert float(close_res.amount_filled) == 0 + assert close_res.price == 10 + assert close_res.type == 'EXCHANGE LIMIT' + diff --git a/bfxapi/tests/test_ws_subscriptions.py b/bfxapi/tests/test_ws_subscriptions.py new file mode 100644 index 0000000..8c1313d --- /dev/null +++ b/bfxapi/tests/test_ws_subscriptions.py @@ -0,0 +1,141 @@ +import pytest +import json +import asyncio +from .helpers import (create_stubbed_client, ws_publish_connection_init, EventWatcher) + +@pytest.mark.asyncio +async def test_submit_subscribe(): + client = create_stubbed_client() + symb = 'tXRPBTC' + # publsh connection created message + await ws_publish_connection_init(client.ws) + + # Create new subscription to orderbook + await client.ws.subscribe('book', symb) + last_sent = client.ws.get_last_sent_item() + sent_sub = json.loads(last_sent['data']) + # {'time': 1548327054030, 'data': '{"event": "subscribe", "channel": "book", "symbol": "tXRPBTC"}'} + assert sent_sub['event'] == "subscribe" + assert sent_sub['channel'] == "book" + assert sent_sub['symbol'] == symb + + # create new subscription to trades + await client.ws.subscribe('trades', symb) + last_sent = client.ws.get_last_sent_item() + sent_sub = json.loads(last_sent['data']) + # {'event': 'subscribe', 'channel': 'trades', 'symbol': 'tBTCUSD'} + assert sent_sub['event'] == 'subscribe' + assert sent_sub['channel'] == 'trades' + assert sent_sub['symbol'] == symb + + # create new subscription to candles + await client.ws.subscribe('candles', symb, timeframe='1m') + last_sent = client.ws.get_last_sent_item() + sent_sub = json.loads(last_sent['data']) + #{'event': 'subscribe', 'channel': 'candles', 'symbol': 'tBTCUSD', 'key': 'trade:1m:tBTCUSD'} + assert sent_sub['event'] == 'subscribe' + assert sent_sub['channel'] == 'candles' + assert sent_sub['key'] == 'trade:1m:{}'.format(symb) + +@pytest.mark.asyncio +async def test_event_subscribe(): + client = create_stubbed_client() + symb = 'tXRPBTC' + pair = 'XRPBTC' + # publsh connection created message + await ws_publish_connection_init(client.ws) + # create a new subscription + await client.ws.subscribe('trades', symb) + # announce subscription was successful + sub_watch = EventWatcher.watch(client.ws, 'subscribed') + await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + s_res = sub_watch.wait_until_complete() + assert s_res.channel_name == 'trades' + assert s_res.symbol == symb + assert s_res.is_subscribed_bool == True + assert s_res.chan_id == 2 + +@pytest.mark.asyncio +async def test_submit_unsubscribe(): + client = create_stubbed_client() + symb = 'tXRPBTC' + pair = 'XRPBTC' + # publsh connection created message + await ws_publish_connection_init(client.ws) + # create new subscription to trades + await client.ws.subscribe('trades', symb) + # announce subscription was successful + sub_watch = EventWatcher.watch(client.ws, 'subscribed') + await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + s_res = sub_watch.wait_until_complete() + # unsubscribe from channel + await s_res.unsubscribe() + last_sent = client.ws.get_last_sent_item() + sent_unsub = json.loads(last_sent['data']) + # {'event': 'unsubscribe', 'chanId': 2} + assert sent_unsub['event'] == 'unsubscribe' + assert sent_unsub['chanId'] == 2 + +@pytest.mark.asyncio +async def test_event_unsubscribe(): + client = create_stubbed_client() + symb = 'tXRPBTC' + pair = 'XRPBTC' + # publish connection created message + await ws_publish_connection_init(client.ws) + # create new subscription to trades + await client.ws.subscribe('trades', symb) + # announce subscription was successful + sub_watch = EventWatcher.watch(client.ws, 'subscribed') + await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + s_res = sub_watch.wait_until_complete() + # unsubscribe from channel + await s_res.unsubscribe() + last_sent = client.ws.get_last_sent_item() + sent_unsub = json.loads(last_sent['data']) + + # publish confirmation of unsubscribe + unsub_watch = EventWatcher.watch(client.ws, 'unsubscribed') + await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2}) + unsub_res = unsub_watch.wait_until_complete() + assert s_res.channel_name == 'trades' + assert s_res.symbol == symb + assert s_res.is_subscribed_bool == False + assert s_res.chan_id == 2 + +@pytest.mark.asyncio +async def test_submit_resubscribe(): + client = create_stubbed_client() + symb = 'tXRPBTC' + pair = 'XRPBTC' + # publish connection created message + await ws_publish_connection_init(client.ws) + # request two new subscriptions + await client.ws.subscribe('book', symb) + await client.ws.subscribe('trades', symb) + # confirm subscriptions + await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + await client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair}) + # call resubscribe all + await client.ws.resubscribe_all() + ## assert that 2 unsubscribe requests were sent + last_sent = client.ws.get_sent_items()[-2:] + for i in last_sent: + data = json.loads(i['data']) + assert data['event'] == 'unsubscribe' + assert (data['chanId'] == 2 or data['chanId'] == 3) + ## confirm unsubscriptions + await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2}) + await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":3}) + + ## confirm subscriptions + # await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + # await client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair}) + # wait for emit of event + n_last_sent = client.ws.get_sent_items()[-2:] + for i in n_last_sent: + data = json.loads(i['data']) + # print (data) + assert data['event'] == 'subscribe' + assert (data['channel'] == 'book' or data['channel'] == 'trades') + assert data['symbol'] == symb diff --git a/bfxapi/utils/Decimal.py b/bfxapi/utils/Decimal.py new file mode 100644 index 0000000..679ae8b --- /dev/null +++ b/bfxapi/utils/Decimal.py @@ -0,0 +1,52 @@ +import decimal as dec + +class Decimal(dec.Decimal): + + @classmethod + def from_float(cls, f): + return cls(str(f)) + + def __new__(cls, value=0, *args, **kwargs): + if isinstance(value, float): + value = Decimal.from_float(value) + return super(Decimal, cls).__new__(cls, value, *args, **kwargs) + + def __mul__(self, rhs): + if isinstance(rhs, float): + rhs = Decimal.from_float(rhs) + return Decimal(super().__mul__(rhs)) + + def __rmul__(self, lhs): + return self.__mul__(lhs) + + def __add__(self, rhs): + if isinstance(rhs, float): + rhs = Decimal.from_float(rhs) + return Decimal(super().__add__(rhs)) + + def __radd__(self, lhs): + return self.__add__(lhs) + + def __sub__(self, rhs): + if isinstance(rhs, float): + rhs = Decimal.from_float(rhs) + return Decimal(super().__sub__(rhs)) + + def __rsub__(self, lhs): + return self.__sub__(lhs) + + def __truediv__(self, rhs): + if isinstance(rhs, float): + rhs = Decimal.from_float(rhs) + return Decimal(super().__truediv__(rhs)) + + def __rtruediv__(self, rhs): + return self.__truediv__(rhs) + + def __floordiv__(self, rhs): + if isinstance(rhs, float): + rhs = Decimal.from_float(rhs) + return Decimal(super().__floordiv__(rhs)) + + def __rfloordiv__ (self, rhs): + return self.__floordiv__(rhs) diff --git a/bfxapi/websockets/BfxWebsocket.py b/bfxapi/websockets/BfxWebsocket.py index a03097b..b8bf4a6 100644 --- a/bfxapi/websockets/BfxWebsocket.py +++ b/bfxapi/websockets/BfxWebsocket.py @@ -98,13 +98,17 @@ class BfxWebsocket(GenericWebsocket): } def __init__(self, API_KEY=None, API_SECRET=None, host='wss://api.bitfinex.com/ws/2', - manageOrderBooks=False, dead_man_switch=False, logLevel='INFO', *args, **kwargs): + manageOrderBooks=False, dead_man_switch=False, logLevel='INFO', parse_float=float, + *args, **kwargs): self.API_KEY = API_KEY self.API_SECRET = API_SECRET self.manageOrderBooks = manageOrderBooks self.dead_man_switch = dead_man_switch self.pendingOrders = {} self.orderBooks = {} + # How should we store float values? could also be bfxapi.Decimal + # which is slower but has higher precision. + self.parse_float = parse_float super(BfxWebsocket, self).__init__( host, logLevel=logLevel, *args, **kwargs) @@ -149,7 +153,7 @@ class BfxWebsocket(GenericWebsocket): self.logger.warn( "Unknown websocket event: '{}' {}".format(eType, msg)) - async def _ws_data_handler(self, data): + async def _ws_data_handler(self, data, raw_message_str): dataEvent = data[1] chan_id = data[0] @@ -161,7 +165,7 @@ class BfxWebsocket(GenericWebsocket): if subscription.channel_name == 'candles': await self._candle_handler(data) if subscription.channel_name == 'book': - await self._order_book_handler(data) + await self._order_book_handler(data, raw_message_str) if subscription.channel_name == 'trades': await self._trade_handler(data) else: @@ -320,7 +324,7 @@ class BfxWebsocket(GenericWebsocket): data[1], subscription.symbol, subscription.timeframe) self._emit('new_candle', candle) - async def _order_book_handler(self, data): + async def _order_book_handler(self, data, orig_raw_message): obInfo = data[1] chan_id = data[0] subscription = self.subscriptionManager.get(data[0]) @@ -345,23 +349,24 @@ class BfxWebsocket(GenericWebsocket): isSnapshot = type(obInfo[0]) is list if isSnapshot: self.orderBooks[symbol] = OrderBook() - self.orderBooks[symbol].update_from_snapshot(obInfo) + self.orderBooks[symbol].update_from_snapshot(obInfo, orig_raw_message) self._emit('order_book_snapshot', { 'symbol': symbol, 'data': obInfo}) else: - self.orderBooks[symbol].update_with(obInfo) + self.orderBooks[symbol].update_with(obInfo, orig_raw_message) self._emit('order_book_update', {'symbol': symbol, 'data': obInfo}) async def on_message(self, message): self.logger.debug(message) - msg = json.loads(message) + # convert float values to decimal + msg = json.loads(message, parse_float=self.parse_float) self._emit('all', msg) if type(msg) is dict: # System messages are received as json await self._ws_system_handler(msg) elif type(msg) is list: # All data messages are received as a list - await self._ws_data_handler(msg) + await self._ws_data_handler(msg, message) else: self.logger.warn('Unknown websocket response: {}'.format(msg)) @@ -369,7 +374,7 @@ class BfxWebsocket(GenericWebsocket): jdata = generate_auth_payload(self.API_KEY, self.API_SECRET) if self.dead_man_switch: jdata['dms'] = 4 - await self.ws.send(json.dumps(jdata)) + await self.get_ws().send(json.dumps(jdata)) async def on_open(self): self.logger.info("Websocket opened.") @@ -380,17 +385,19 @@ class BfxWebsocket(GenericWebsocket): # enable order book checksums if self.manageOrderBooks: await self.enable_flag(Flags.CHECKSUM) + # resubscribe to any channels + await self.subscriptionManager.resubscribe_all() async def _send_auth_command(self, channel_name, data): payload = [0, channel_name, None, data] - await self.ws.send(json.dumps(payload)) + await self.get_ws().send(json.dumps(payload)) async def enable_flag(self, flag): payload = { "event": 'conf', "flags": flag } - await self.ws.send(json.dumps(payload)) + await self.get_ws().send(json.dumps(payload)) def get_orderbook(self, symbol): return self.orderBooks.get(symbol, None) diff --git a/bfxapi/websockets/GenericWebsocket.py b/bfxapi/websockets/GenericWebsocket.py index 0801fbf..0c44f59 100644 --- a/bfxapi/websockets/GenericWebsocket.py +++ b/bfxapi/websockets/GenericWebsocket.py @@ -4,11 +4,14 @@ Module used as a interfeace to describe a generick websocket client import asyncio import websockets +import socket import json from pyee import EventEmitter from ..utils.CustomLogger import CustomLogger +# websocket exceptions +from websockets.exceptions import ConnectionClosed class AuthError(Exception): """ @@ -16,7 +19,6 @@ class AuthError(Exception): """ pass - def is_json(myjson): try: json_object = json.loads(myjson) @@ -24,20 +26,20 @@ def is_json(myjson): return False return True - class GenericWebsocket: """ Websocket object used to contain the base functionality of a websocket. Inlcudes an event emitter and a standard websocket client. """ - def __init__(self, host, logLevel='INFO', loop=None): + def __init__(self, host, logLevel='INFO', loop=None, max_retries=5): self.host = host self.logger = CustomLogger('BfxWebsocket', logLevel=logLevel) self.loop = loop or asyncio.get_event_loop() self.events = EventEmitter( scheduler=asyncio.ensure_future, loop=self.loop) self.ws = None + self.max_retries = max_retries def run(self): """ @@ -51,15 +53,33 @@ class GenericWebsocket: """ return self._main(self.host) - async def _main(self, host): + async def _connect(self, host): async with websockets.connect(host) as websocket: self.ws = websocket - self.logger.info("Wesocket connectedt to {}".format(self.host)) + self.logger.info("Wesocket connected to {}".format(host)) while True: await asyncio.sleep(0) message = await websocket.recv() await self.on_message(message) + def get_ws(self): + return self.ws + + async def _main(self, host): + retries = 0 + while retries < self.max_retries: + try: + await self._connect(host) + retries = 0 + except (ConnectionClosed, socket.error) as e: + self.logger.error(str(e)) + retries += 1 + # wait 5 seconds befor retrying + self.logger.info("Waiting 5 seconds befor retrying...") + await asyncio.sleep(5) + self.logger.info("Reconnect attempt {}/{}".format(retries, self.max_retries)) + self.logger.info("Unable to connect to websocket.") + def remove_all_listeners(self, event): """ Remove all listeners from event emitter diff --git a/bfxapi/websockets/SubscriptionManager.py b/bfxapi/websockets/SubscriptionManager.py index df961ee..8caf995 100644 --- a/bfxapi/websockets/SubscriptionManager.py +++ b/bfxapi/websockets/SubscriptionManager.py @@ -32,7 +32,7 @@ class SubscriptionManager: """ # create a new subscription subscription = Subscription( - self.bfxapi.ws, channel_name, symbol, timeframe, **kwargs) + self.bfxapi, channel_name, symbol, timeframe, **kwargs) self.logger.info("Subscribing to channel {}".format(channel_name)) key = "{}_{}".format(channel_name, subscription.key or symbol) self.pending_subscriptions[key] = subscription @@ -63,11 +63,11 @@ class SubscriptionManager: chan_id = raw_ws_data.get("chanId") sub = self.subscriptions_chanid[chan_id] sub.confirm_unsubscribe() - self.bfxapi._emit('unsubscribed', sub) # call onComplete callback if exists if sub.sub_id in self.unsubscribe_callbacks: await self.unsubscribe_callbacks[sub.sub_id]() del self.unsubscribe_callbacks[sub.sub_id] + self.bfxapi._emit('unsubscribed', sub) def get(self, chan_id): return self.subscriptions_chanid[chan_id] @@ -121,6 +121,8 @@ class SubscriptionManager: task_batch += [ asyncio.ensure_future(self.unsubscribe(chan_id)) ] + if len(task_batch) == 0: + return await asyncio.wait(*[task_batch]) async def resubscribe_all(self): @@ -132,4 +134,6 @@ class SubscriptionManager: task_batch += [ asyncio.ensure_future(self.resubscribe(chan_id)) ] + if len(task_batch) == 0: + return await asyncio.wait(*[task_batch]) diff --git a/pylint.rc b/pylint.rc index 0efb406..55ec3d9 100644 --- a/pylint.rc +++ b/pylint.rc @@ -9,3 +9,5 @@ disable=too-few-public-methods, len-as-condition, too-many-instance-attributes, invalid-name + +ignore=tests diff --git a/requirements.txt b/requirements.txt index 991faee..da5244a 100644 --- a/requirements.txt +++ b/requirements.txt @@ -2,3 +2,4 @@ eventemitter==0.2.0 asyncio==3.4.3 websockets==7.0 pylint==2.2.2 +pytest-asyncio==0.6.0