From abfafbf8cf0cb304feba4c81a9cb303eba18bef8 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Wed, 24 Nov 2021 14:34:33 +0100 Subject: [PATCH 01/12] added support per py 3.9 and 3.10 --- CHANGELOG | 3 +++ bfxapi/version.py | 2 +- bfxapi/websockets/generic_websocket.py | 9 +++------ requirements.txt | 1 + setup.py | 2 +- 5 files changed, 9 insertions(+), 8 deletions(-) diff --git a/CHANGELOG b/CHANGELOG index dcd341b..b7426db 100644 --- a/CHANGELOG +++ b/CHANGELOG @@ -1,3 +1,6 @@ +1.2.7 +-) Added ws support for Python 3.9 and 3.10 + 1.2.6 -) Updated websockets to 9.1 diff --git a/bfxapi/version.py b/bfxapi/version.py index a663868..64e75f8 100644 --- a/bfxapi/version.py +++ b/bfxapi/version.py @@ -2,4 +2,4 @@ This module contains the current version of the bfxapi lib """ -__version__ = '1.2.6' +__version__ = '1.2.7' diff --git a/bfxapi/websockets/generic_websocket.py b/bfxapi/websockets/generic_websocket.py index a92ebd9..4f7d482 100644 --- a/bfxapi/websockets/generic_websocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -7,6 +7,7 @@ import websockets import socket import json import time +import nest_asyncio from threading import Thread, Lock from pyee import AsyncIOEventEmitter @@ -14,6 +15,7 @@ from ..utils.custom_logger import CustomLogger # websocket exceptions from websockets.exceptions import ConnectionClosed, InvalidStatusCode +nest_asyncio.apply() class AuthError(Exception): """ @@ -94,12 +96,7 @@ class GenericWebsocket: def _start_new_socket(self, socketId=None): if not socketId: socketId = len(self.sockets) - def start_loop(loop): - asyncio.set_event_loop(loop) - loop.run_until_complete(self._run_socket()) - worker_loop = asyncio.new_event_loop() - worker = Thread(target=start_loop, args=(worker_loop,)) - worker.start() + asyncio.run(self._run_socket()) return socketId def _wait_for_socket(self, socket_id): diff --git a/requirements.txt b/requirements.txt index b579314..9f4cfff 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,3 +6,4 @@ six==1.12.0 pyee==8.0.1 aiohttp==3.4.4 isort==4.3.21 +nest_asyncio==1.5.1 \ No newline at end of file diff --git a/setup.py b/setup.py index 69f5e2e..cec0c1f 100644 --- a/setup.py +++ b/setup.py @@ -11,7 +11,7 @@ from os import path here = path.abspath(path.dirname(__file__)) setup( name='bitfinex-api-py', - version='1.2.6', + version='1.2.7', description='Official Bitfinex Python API', long_description='A Python reference implementation of the Bitfinex API for both REST and websocket interaction', long_description_content_type='text/markdown', From faa6075eae4ec428f2db455b17e1f8e8acb7689b Mon Sep 17 00:00:00 2001 From: itsdeka Date: Thu, 25 Nov 2021 12:19:31 +0100 Subject: [PATCH 02/12] new approach --- bfxapi/websockets/generic_websocket.py | 10 +++++++--- requirements.txt | 3 +-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/bfxapi/websockets/generic_websocket.py b/bfxapi/websockets/generic_websocket.py index 4f7d482..f5b6269 100644 --- a/bfxapi/websockets/generic_websocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -7,7 +7,6 @@ import websockets import socket import json import time -import nest_asyncio from threading import Thread, Lock from pyee import AsyncIOEventEmitter @@ -15,7 +14,6 @@ from ..utils.custom_logger import CustomLogger # websocket exceptions from websockets.exceptions import ConnectionClosed, InvalidStatusCode -nest_asyncio.apply() class AuthError(Exception): """ @@ -86,6 +84,8 @@ class GenericWebsocket: thread and connection. """ self._start_new_socket() + while True: + time.sleep(1) def get_task_executable(self): """ @@ -93,10 +93,14 @@ class GenericWebsocket: """ return self._run_socket() + def _start_new_async_socket(self): + asyncio.run(self._run_socket()) + def _start_new_socket(self, socketId=None): if not socketId: socketId = len(self.sockets) - asyncio.run(self._run_socket()) + worker = Thread(target=self._start_new_async_socket) + worker.start() return socketId def _wait_for_socket(self, socket_id): diff --git a/requirements.txt b/requirements.txt index 9f4cfff..5bb3aee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -5,5 +5,4 @@ pytest-asyncio==0.15.1 six==1.12.0 pyee==8.0.1 aiohttp==3.4.4 -isort==4.3.21 -nest_asyncio==1.5.1 \ No newline at end of file +isort==4.3.21 \ No newline at end of file From f74c1c0fde4aa732555106ec93a8e0589a217ca1 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Thu, 25 Nov 2021 12:21:19 +0100 Subject: [PATCH 03/12] fix --- requirements.txt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/requirements.txt b/requirements.txt index 5bb3aee..91f7936 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,5 +4,4 @@ pylint==2.3.0 pytest-asyncio==0.15.1 six==1.12.0 pyee==8.0.1 -aiohttp==3.4.4 -isort==4.3.21 \ No newline at end of file +aiohttp==3.4.4 \ No newline at end of file From bf67841ada7e6a94b57af5ad189a332a9102f797 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Thu, 25 Nov 2021 12:22:08 +0100 Subject: [PATCH 04/12] fix --- requirements.txt | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/requirements.txt b/requirements.txt index 91f7936..5bb3aee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,4 +4,5 @@ pylint==2.3.0 pytest-asyncio==0.15.1 six==1.12.0 pyee==8.0.1 -aiohttp==3.4.4 \ No newline at end of file +aiohttp==3.4.4 +isort==4.3.21 \ No newline at end of file From d0246296e616b24fd65c20cb39fa69f7a8908507 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Thu, 25 Nov 2021 12:27:15 +0100 Subject: [PATCH 05/12] Support for Python <= 3.8 --- bfxapi/websockets/generic_websocket.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/bfxapi/websockets/generic_websocket.py b/bfxapi/websockets/generic_websocket.py index f5b6269..f975b1e 100644 --- a/bfxapi/websockets/generic_websocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -94,7 +94,8 @@ class GenericWebsocket: return self._run_socket() def _start_new_async_socket(self): - asyncio.run(self._run_socket()) + loop = asyncio.new_event_loop() + loop.run_until_complete(self._run_socket()) def _start_new_socket(self, socketId=None): if not socketId: From 765532185ef331afb1b46e77d6633aa7c273ab03 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Mon, 29 Nov 2021 09:38:42 +0100 Subject: [PATCH 06/12] fixed pyee 'error must derive from BaseException' issue --- bfxapi/websockets/bfx_websocket.py | 56 +++++++++++++------------- bfxapi/websockets/generic_websocket.py | 8 ++-- 2 files changed, 32 insertions(+), 32 deletions(-) diff --git a/bfxapi/websockets/bfx_websocket.py b/bfxapi/websockets/bfx_websocket.py index 0c3ff80..07771bd 100644 --- a/bfxapi/websockets/bfx_websocket.py +++ b/bfxapi/websockets/bfx_websocket.py @@ -266,13 +266,13 @@ class BfxWebsocket(GenericWebsocket): socketId, ERRORS[data.get('code', 10000)], data.get("msg", "")) - self._emit('error', err_string) + await self._emit(Exception(err_string)) async def _system_auth_handler(self, socketId, data): if data.get('status') == 'FAILED': raise AuthError(ERRORS[data.get('code')]) else: - self._emit('authenticated', data) + await self._emit('authenticated', data) self.logger.info("Authentication successful.") async def _trade_update_handler(self, data): @@ -281,7 +281,7 @@ class BfxWebsocket(GenericWebsocket): if self.subscriptionManager.is_subscribed(data[0]): symbol = self.subscriptionManager.get(data[0]).symbol tradeObj = _parse_trade(tData, symbol) - self._emit('trade_update', tradeObj) + await self._emit('trade_update', tradeObj) async def _trade_executed_handler(self, data): tData = data[2] @@ -289,28 +289,28 @@ class BfxWebsocket(GenericWebsocket): if self.subscriptionManager.is_subscribed(data[0]): symbol = self.subscriptionManager.get(data[0]).symbol tradeObj = _parse_trade(tData, symbol) - self._emit('new_trade', tradeObj) + await self._emit('new_trade', tradeObj) async def _wallet_update_handler(self, data): # [0,"wu",["exchange","USD",89134.66933283,0]] uw = self.wallets._update_from_event(data) - self._emit('wallet_update', uw) + await self._emit('wallet_update', uw) self.logger.info("Wallet update: {}".format(uw)) async def _heart_beat_handler(self, data): self.logger.debug("Heartbeat - {}".format(self.host)) async def _margin_info_update_handler(self, data): - self._emit('margin_info_update', data) + await self._emit('margin_info_update', data) self.logger.info("Margin info update: {}".format(data)) async def _funding_info_update_handler(self, data): - self._emit('funding_info_update', data) + await self._emit('funding_info_update', data) self.logger.info("Funding info update: {}".format(data)) async def _notification_handler(self, data): nInfo = data[2] - self._emit('notification', nInfo) + await self._emit('notification', nInfo) notificationType = nInfo[6] notificationText = nInfo[7] if notificationType == 'ERROR': @@ -324,7 +324,7 @@ class BfxWebsocket(GenericWebsocket): async def _balance_update_handler(self, data): self.logger.info('Balance update: {}'.format(data[2])) - self._emit('balance_update', data[2]) + await self._emit('balance_update', data[2]) async def _order_closed_handler(self, data): await self.orderManager.confirm_order_closed(data) @@ -343,34 +343,34 @@ class BfxWebsocket(GenericWebsocket): async def _wallet_snapshot_handler(self, data): wallets = self.wallets._update_from_snapshot(data) - self._emit('wallet_snapshot', wallets) + await self._emit('wallet_snapshot', wallets) async def _position_snapshot_handler(self, data): - self._emit('position_snapshot', data) + await self._emit('position_snapshot', data) self.logger.info("Position snapshot: {}".format(data)) async def _position_update_handler(self, data): - self._emit('position_update', data) + await self._emit('position_update', data) self.logger.info("Position update: {}".format(data)) async def _position_close_handler(self, data): - self._emit('position_close', data) + await self._emit('position_close', data) self.logger.info("Position close: {}".format(data)) async def _position_new_handler(self, data): - self._emit('position_new', data) + await self._emit('position_new', data) self.logger.info("Position new: {}".format(data)) async def _funding_offer_snapshot_handler(self, data): - self._emit('funding_offer_snapshot', data) + await self._emit('funding_offer_snapshot', data) self.logger.info("Funding offer snapshot: {}".format(data)) async def _funding_load_snapshot_handler(self, data): - self._emit('funding_loan_snapshot', data[2]) + await self._emit('funding_loan_snapshot', data[2]) self.logger.info("Funding loan snapshot: {}".format(data)) async def _funding_credit_snapshot_handler(self, data): - self._emit('funding_credit_snapshot', data[2]) + await self._emit('funding_credit_snapshot', data[2]) self.logger.info("Funding credit snapshot: {}".format(data)) async def _status_handler(self, data): @@ -381,7 +381,7 @@ class BfxWebsocket(GenericWebsocket): if status_type == "deriv": status = _parse_deriv_status_update(rstatus, symbol) if status: - self._emit('status_update', status) + await self._emit('status_update', status) else: self.logger.warn('Unknown status data type: {}'.format(data)) @@ -392,13 +392,13 @@ class BfxWebsocket(GenericWebsocket): t = None if symbol[0] == 't': t = Ticker.from_raw_ticker(raw_ticker, symbol) - self._emit('new_trading_ticker', t) + await self._emit('new_trading_ticker', t) elif symbol[0] == 'f': t = FundingTicker.from_raw_ticker(raw_ticker, symbol) - self._emit('new_funding_ticker', t) + await self._emit('new_funding_ticker', t) else: self.logger.warn('Unknown ticker type: {}'.format(raw_ticker)) - self._emit('new_ticker', t) + await self._emit('new_ticker', t) async def _trade_handler(self, data): symbol = self.subscriptionManager.get(data[0]).symbol @@ -414,7 +414,7 @@ class BfxWebsocket(GenericWebsocket): 'price': t[3], 'symbol': symbol } - self._emit('seed_trade', trade) + await self._emit('seed_trade', trade) async def _candle_handler(self, data): subscription = self.subscriptionManager.get(data[0]) @@ -429,11 +429,11 @@ class BfxWebsocket(GenericWebsocket): for c in candlesSnapshot: candle = _parse_candle( c, subscription.symbol, subscription.timeframe) - self._emit('seed_candle', candle) + await self._emit('seed_candle', candle) else: candle = _parse_candle( data[1], subscription.symbol, subscription.timeframe) - self._emit('new_candle', candle) + await self._emit('new_candle', candle) async def _order_book_handler(self, data, orig_raw_message): obInfo = data[1] @@ -461,17 +461,17 @@ class BfxWebsocket(GenericWebsocket): if isSnapshot: self.orderBooks[symbol] = OrderBook() self.orderBooks[symbol].update_from_snapshot(obInfo, orig_raw_message) - self._emit('order_book_snapshot', { + await self._emit('order_book_snapshot', { 'symbol': symbol, 'data': obInfo}) else: self.orderBooks[symbol].update_with(obInfo, orig_raw_message) - self._emit('order_book_update', {'symbol': symbol, 'data': obInfo}) + await self._emit('order_book_update', {'symbol': symbol, 'data': obInfo}) async def on_message(self, socketId, message): self.logger.debug(message) # convert float values to decimal msg = json.loads(message, parse_float=self.parse_float) - self._emit('all', msg) + await self._emit('all', msg) if type(msg) is dict: # System messages are received as json await self._ws_system_handler(socketId, msg) @@ -495,7 +495,7 @@ class BfxWebsocket(GenericWebsocket): self.logger.info("Websocket opened.") if len(self.sockets) == 1: ## only call on first connection - self._emit('connected') + await self._emit('connected') # Orders are simulated in backtest mode if self.API_KEY and self.API_SECRET and self.get_authenticated_socket() == None: await self._ws_authenticate_socket(socket_id) diff --git a/bfxapi/websockets/generic_websocket.py b/bfxapi/websockets/generic_websocket.py index f975b1e..91a9f0a 100644 --- a/bfxapi/websockets/generic_websocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -44,7 +44,7 @@ class Socket(): def set_authenticated(self): self.isAuthenticated = True - + def set_unauthenticated(self): self.isAuthenticated = False @@ -84,8 +84,6 @@ class GenericWebsocket: thread and connection. """ self._start_new_socket() - while True: - time.sleep(1) def get_task_executable(self): """ @@ -191,7 +189,9 @@ class GenericWebsocket: return self.events.once(event) self.events.once(event, func) - def _emit(self, event, *args, **kwargs): + async def _emit(self, event, *args, **kwargs): + if type(event) == Exception: + await self.on_error(event) self.events.emit(event, *args, **kwargs) async def on_error(self, error): From b0f07814e7b50124a078e3f6a894ee1c760b8345 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Mon, 29 Nov 2021 09:42:55 +0100 Subject: [PATCH 07/12] updated tests --- bfxapi/tests/test_ws_orders.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/bfxapi/tests/test_ws_orders.py b/bfxapi/tests/test_ws_orders.py index b28a841..be4a400 100644 --- a/bfxapi/tests/test_ws_orders.py +++ b/bfxapi/tests/test_ws_orders.py @@ -119,7 +119,7 @@ async def test_closed_callback_on_submit_order_closed(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - client.ws._emit('c1', order) + await client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 @@ -136,7 +136,7 @@ async def test_confirmed_callback_on_submit_order_closed(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - client.ws._emit('c1', order) + await client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 @@ -152,7 +152,7 @@ async def test_confirmed_callback_on_submit_new_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - client.ws._emit('c1', order) + await client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 @@ -168,7 +168,7 @@ async def test_confirmed_callback_on_submit_order_update(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - client.ws._emit('c1', order) + await client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 @@ -184,7 +184,7 @@ async def test_confirmed_callback_on_submit_cancel_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - client.ws._emit('c1', order) + await client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 @@ -200,7 +200,7 @@ async def test_confirmed_callback_on_submit_cancel_group_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - client.ws._emit('c1', order) + await client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 From e05e1522c4d25a08a4edfe4cc32b3a14b176b5d7 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Mon, 29 Nov 2021 09:46:49 +0100 Subject: [PATCH 08/12] removed useless import --- bfxapi/tests/test_ws_subscriptions.py | 1 - 1 file changed, 1 deletion(-) diff --git a/bfxapi/tests/test_ws_subscriptions.py b/bfxapi/tests/test_ws_subscriptions.py index 2928215..7e1866a 100644 --- a/bfxapi/tests/test_ws_subscriptions.py +++ b/bfxapi/tests/test_ws_subscriptions.py @@ -1,6 +1,5 @@ import pytest import json -import asyncio from .helpers import (create_stubbed_client, ws_publish_connection_init, EventWatcher) @pytest.mark.asyncio From d1308dad7b0d44c9daae6e3bc8d8690008bbf0c2 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Mon, 29 Nov 2021 09:49:33 +0100 Subject: [PATCH 09/12] refactoring --- bfxapi/websockets/bfx_websocket.py | 56 +++++++++++++------------- bfxapi/websockets/generic_websocket.py | 4 +- 2 files changed, 30 insertions(+), 30 deletions(-) diff --git a/bfxapi/websockets/bfx_websocket.py b/bfxapi/websockets/bfx_websocket.py index 07771bd..49febc5 100644 --- a/bfxapi/websockets/bfx_websocket.py +++ b/bfxapi/websockets/bfx_websocket.py @@ -266,13 +266,13 @@ class BfxWebsocket(GenericWebsocket): socketId, ERRORS[data.get('code', 10000)], data.get("msg", "")) - await self._emit(Exception(err_string)) + self._emit(Exception(err_string)) async def _system_auth_handler(self, socketId, data): if data.get('status') == 'FAILED': raise AuthError(ERRORS[data.get('code')]) else: - await self._emit('authenticated', data) + self._emit('authenticated', data) self.logger.info("Authentication successful.") async def _trade_update_handler(self, data): @@ -281,7 +281,7 @@ class BfxWebsocket(GenericWebsocket): if self.subscriptionManager.is_subscribed(data[0]): symbol = self.subscriptionManager.get(data[0]).symbol tradeObj = _parse_trade(tData, symbol) - await self._emit('trade_update', tradeObj) + self._emit('trade_update', tradeObj) async def _trade_executed_handler(self, data): tData = data[2] @@ -289,28 +289,28 @@ class BfxWebsocket(GenericWebsocket): if self.subscriptionManager.is_subscribed(data[0]): symbol = self.subscriptionManager.get(data[0]).symbol tradeObj = _parse_trade(tData, symbol) - await self._emit('new_trade', tradeObj) + self._emit('new_trade', tradeObj) async def _wallet_update_handler(self, data): # [0,"wu",["exchange","USD",89134.66933283,0]] uw = self.wallets._update_from_event(data) - await self._emit('wallet_update', uw) + self._emit('wallet_update', uw) self.logger.info("Wallet update: {}".format(uw)) async def _heart_beat_handler(self, data): self.logger.debug("Heartbeat - {}".format(self.host)) async def _margin_info_update_handler(self, data): - await self._emit('margin_info_update', data) + self._emit('margin_info_update', data) self.logger.info("Margin info update: {}".format(data)) async def _funding_info_update_handler(self, data): - await self._emit('funding_info_update', data) + self._emit('funding_info_update', data) self.logger.info("Funding info update: {}".format(data)) async def _notification_handler(self, data): nInfo = data[2] - await self._emit('notification', nInfo) + self._emit('notification', nInfo) notificationType = nInfo[6] notificationText = nInfo[7] if notificationType == 'ERROR': @@ -324,7 +324,7 @@ class BfxWebsocket(GenericWebsocket): async def _balance_update_handler(self, data): self.logger.info('Balance update: {}'.format(data[2])) - await self._emit('balance_update', data[2]) + self._emit('balance_update', data[2]) async def _order_closed_handler(self, data): await self.orderManager.confirm_order_closed(data) @@ -343,34 +343,34 @@ class BfxWebsocket(GenericWebsocket): async def _wallet_snapshot_handler(self, data): wallets = self.wallets._update_from_snapshot(data) - await self._emit('wallet_snapshot', wallets) + self._emit('wallet_snapshot', wallets) async def _position_snapshot_handler(self, data): - await self._emit('position_snapshot', data) + self._emit('position_snapshot', data) self.logger.info("Position snapshot: {}".format(data)) async def _position_update_handler(self, data): - await self._emit('position_update', data) + self._emit('position_update', data) self.logger.info("Position update: {}".format(data)) async def _position_close_handler(self, data): - await self._emit('position_close', data) + self._emit('position_close', data) self.logger.info("Position close: {}".format(data)) async def _position_new_handler(self, data): - await self._emit('position_new', data) + self._emit('position_new', data) self.logger.info("Position new: {}".format(data)) async def _funding_offer_snapshot_handler(self, data): - await self._emit('funding_offer_snapshot', data) + self._emit('funding_offer_snapshot', data) self.logger.info("Funding offer snapshot: {}".format(data)) async def _funding_load_snapshot_handler(self, data): - await self._emit('funding_loan_snapshot', data[2]) + self._emit('funding_loan_snapshot', data[2]) self.logger.info("Funding loan snapshot: {}".format(data)) async def _funding_credit_snapshot_handler(self, data): - await self._emit('funding_credit_snapshot', data[2]) + self._emit('funding_credit_snapshot', data[2]) self.logger.info("Funding credit snapshot: {}".format(data)) async def _status_handler(self, data): @@ -381,7 +381,7 @@ class BfxWebsocket(GenericWebsocket): if status_type == "deriv": status = _parse_deriv_status_update(rstatus, symbol) if status: - await self._emit('status_update', status) + self._emit('status_update', status) else: self.logger.warn('Unknown status data type: {}'.format(data)) @@ -392,13 +392,13 @@ class BfxWebsocket(GenericWebsocket): t = None if symbol[0] == 't': t = Ticker.from_raw_ticker(raw_ticker, symbol) - await self._emit('new_trading_ticker', t) + self._emit('new_trading_ticker', t) elif symbol[0] == 'f': t = FundingTicker.from_raw_ticker(raw_ticker, symbol) - await self._emit('new_funding_ticker', t) + self._emit('new_funding_ticker', t) else: self.logger.warn('Unknown ticker type: {}'.format(raw_ticker)) - await self._emit('new_ticker', t) + self._emit('new_ticker', t) async def _trade_handler(self, data): symbol = self.subscriptionManager.get(data[0]).symbol @@ -414,7 +414,7 @@ class BfxWebsocket(GenericWebsocket): 'price': t[3], 'symbol': symbol } - await self._emit('seed_trade', trade) + self._emit('seed_trade', trade) async def _candle_handler(self, data): subscription = self.subscriptionManager.get(data[0]) @@ -429,11 +429,11 @@ class BfxWebsocket(GenericWebsocket): for c in candlesSnapshot: candle = _parse_candle( c, subscription.symbol, subscription.timeframe) - await self._emit('seed_candle', candle) + self._emit('seed_candle', candle) else: candle = _parse_candle( data[1], subscription.symbol, subscription.timeframe) - await self._emit('new_candle', candle) + self._emit('new_candle', candle) async def _order_book_handler(self, data, orig_raw_message): obInfo = data[1] @@ -461,17 +461,17 @@ class BfxWebsocket(GenericWebsocket): if isSnapshot: self.orderBooks[symbol] = OrderBook() self.orderBooks[symbol].update_from_snapshot(obInfo, orig_raw_message) - await self._emit('order_book_snapshot', { + self._emit('order_book_snapshot', { 'symbol': symbol, 'data': obInfo}) else: self.orderBooks[symbol].update_with(obInfo, orig_raw_message) - await self._emit('order_book_update', {'symbol': symbol, 'data': obInfo}) + self._emit('order_book_update', {'symbol': symbol, 'data': obInfo}) async def on_message(self, socketId, message): self.logger.debug(message) # convert float values to decimal msg = json.loads(message, parse_float=self.parse_float) - await self._emit('all', msg) + self._emit('all', msg) if type(msg) is dict: # System messages are received as json await self._ws_system_handler(socketId, msg) @@ -495,7 +495,7 @@ class BfxWebsocket(GenericWebsocket): self.logger.info("Websocket opened.") if len(self.sockets) == 1: ## only call on first connection - await self._emit('connected') + self._emit('connected') # Orders are simulated in backtest mode if self.API_KEY and self.API_SECRET and self.get_authenticated_socket() == None: await self._ws_authenticate_socket(socket_id) diff --git a/bfxapi/websockets/generic_websocket.py b/bfxapi/websockets/generic_websocket.py index 91a9f0a..f155278 100644 --- a/bfxapi/websockets/generic_websocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -189,9 +189,9 @@ class GenericWebsocket: return self.events.once(event) self.events.once(event, func) - async def _emit(self, event, *args, **kwargs): + def _emit(self, event, *args, **kwargs): if type(event) == Exception: - await self.on_error(event) + self.logger.error(event) self.events.emit(event, *args, **kwargs) async def on_error(self, error): From 6c065e6fad8a5c75394d2230b917a83aa1931d59 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Mon, 29 Nov 2021 10:28:17 +0100 Subject: [PATCH 10/12] fixs --- bfxapi/tests/test_ws_capacity.py | 6 ++-- bfxapi/tests/test_ws_orderbook.py | 24 ++++++------- bfxapi/tests/test_ws_orders.py | 52 +++++++++++++-------------- bfxapi/tests/test_ws_subscriptions.py | 38 ++++++++++---------- 4 files changed, 60 insertions(+), 60 deletions(-) diff --git a/bfxapi/tests/test_ws_capacity.py b/bfxapi/tests/test_ws_capacity.py index 48cdbf9..77534a1 100644 --- a/bfxapi/tests/test_ws_capacity.py +++ b/bfxapi/tests/test_ws_capacity.py @@ -12,12 +12,12 @@ async def test_ws_creates_new_socket(): await ws_publish_connection_init(client.ws) # create a bunch of websocket subscriptions for symbol in ['tXRPBTC', 'tLTCUSD']: - await client.ws.subscribe('candles', symbol, timeframe='1m') + client.ws.subscribe('candles', symbol, timeframe='1m') assert len(client.ws.sockets) == 1 assert client.ws.get_total_available_capcity() == 3 # subscribe to a few more to force the lib to create a new ws conenction for symbol in ['tETHBTC', 'tBTCUSD', 'tETHUSD', 'tLTCBTC']: - await client.ws.subscribe('candles', symbol, timeframe='1m') + client.ws.subscribe('candles', symbol, timeframe='1m') assert len(client.ws.sockets) == 2 assert client.ws.get_total_available_capcity() == 4 @@ -29,7 +29,7 @@ async def test_ws_uses_authenticated_socket(): await ws_publish_connection_init(client.ws) # create a bunch of websocket subscriptions for symbol in ['tXRPBTC', 'tLTCUSD', 'tETHBTC', 'tBTCUSD', 'tETHUSD', 'tLTCBTC']: - await client.ws.subscribe('candles', symbol, timeframe='1m') + client.ws.subscribe('candles', symbol, timeframe='1m') # publish connection created message on socket (0 by default) await ws_publish_connection_init(client.ws) # send auth accepted (on socket by default) diff --git a/bfxapi/tests/test_ws_orderbook.py b/bfxapi/tests/test_ws_orderbook.py index ebbee52..7b4d2d7 100644 --- a/bfxapi/tests/test_ws_orderbook.py +++ b/bfxapi/tests/test_ws_orderbook.py @@ -10,12 +10,12 @@ async def test_checksum_generation(): # publish checksum flag accepted await ws_publish_conf_accepted(client.ws, 131072) # subscribe to order book - await client.ws.subscribe('book', symbol) + client.ws.subscribe('book', symbol) ## send subscription accepted chanId = 123 - await client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol}) + client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol}) ## send orderbook snapshot - await client.ws.publish("""[123, [[0.0000886,1,1060.55466114],[0.00008859,1,1000],[0.00008858,1,2713.47159343],[0.00008857,1,4276.92870916],[0.00008856,2,6764.75562319], + client.ws.publish("""[123, [[0.0000886,1,1060.55466114],[0.00008859,1,1000],[0.00008858,1,2713.47159343],[0.00008857,1,4276.92870916],[0.00008856,2,6764.75562319], [0.00008854,1,5641.48532401],[0.00008853,1,2255.92632223],[0.0000885,1,2256.69584601],[0.00008848,2,3630.3],[0.00008845,1,28195.70625766], [0.00008844,1,15571.7],[0.00008843,1,2500],[0.00008841,1,64196.16117814],[0.00008838,1,7500],[0.00008837,2,2764.12999012],[0.00008834,2,10886.476298], [0.00008831,1,20000],[0.0000883,1,1000],[0.00008829,2,2517.22175358],[0.00008828,1,450.45],[0.00008827,1,13000],[0.00008824,1,1500],[0.0000882,1,300], @@ -25,9 +25,9 @@ async def test_checksum_generation(): [0.00008894,1,-775.08564697],[0.00008896,1,-150],[0.00008899,3,-11628.02590049],[0.000089,2,-1299.7],[0.00008902,2,-4841.8],[0.00008904,3,-25320.46250083], [0.00008909,1,-14000],[0.00008913,1,-123947.999],[0.00008915,2,-28019.6]]]""", is_json=False) ## send some more price updates - await client.ws.publish("[{},[0.00008915,0,-1]]".format(chanId), is_json=False) - await client.ws.publish("[{},[0.00008837,1,56.54876269]]".format(chanId), is_json=False) - await client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False) + client.ws.publish("[{},[0.00008915,0,-1]]".format(chanId), is_json=False) + client.ws.publish("[{},[0.00008837,1,56.54876269]]".format(chanId), is_json=False) + client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False) ## check checksum is the same as expected expected_checksum = 30026640 actual_checksum = client.ws.orderBooks[symbol].checksum() @@ -42,12 +42,12 @@ async def test_checksum_really_samll_numbers_generation(): # publish checksum flag accepted await ws_publish_conf_accepted(client.ws, 131072) # subscribe to order book - await client.ws.subscribe('book', symbol) + client.ws.subscribe('book', symbol) ## send subscription accepted chanId = 123 - await client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol}) + client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol}) ## send orderbook snapshot - await client.ws.publish("""[123, [[0.00000121,5,249013.0209708],[0.0000012,6,518315.33310128],[0.00000119,4,566200.89],[0.00000118,2,260000],[0.00000117,1,100000], + client.ws.publish("""[123, [[0.00000121,5,249013.0209708],[0.0000012,6,518315.33310128],[0.00000119,4,566200.89],[0.00000118,2,260000],[0.00000117,1,100000], [0.00000116,2,160000],[0.00000114,1,60000],[0.00000113,2,198500],[0.00000112,1,60000],[0.0000011,1,60000],[0.00000106,2,113868.87735849],[0.00000105,2,105000], [0.00000103,1,3000],[0.00000102,2,105000],[0.00000101,2,202970],[0.000001,2,21000],[7e-7,1,10000],[6.6e-7,1,10000],[6e-7,1,100000],[4.9e-7,1,10000],[2.5e-7,1,2000], [6e-8,1,100000],[5e-8,1,200000],[1e-8,4,640000],[0.00000122,7,-312043.19],[0.00000123,6,-415094.8939744],[0.00000124,5,-348181.23],[0.00000125,1,-12000], @@ -56,9 +56,9 @@ async def test_checksum_really_samll_numbers_generation(): [0.00000164,1,-4000],[0.00000166,1,-3831.46784605],[0.00000171,1,-14575.17730379],[0.00000174,1,-3124.81815395],[0.0000018,1,-18000],[0.00000182,1,-16000], [0.00000186,1,-4000],[0.00000189,1,-10000.686624],[0.00000191,1,-14500]]]""", is_json=False) ## send some more price updates - await client.ws.publish("[{},[0.00000121,4,228442.6609708]]".format(chanId), is_json=False) - await client.ws.publish("[{},[0.00000121,6,304023.8109708]]".format(chanId), is_json=False) - # await client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False) + client.ws.publish("[{},[0.00000121,4,228442.6609708]]".format(chanId), is_json=False) + client.ws.publish("[{},[0.00000121,6,304023.8109708]]".format(chanId), is_json=False) + # client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False) ## check checksum is the same as expected expected_checksum = 1770440002 actual_checksum = client.ws.orderBooks[symbol].checksum() diff --git a/bfxapi/tests/test_ws_orders.py b/bfxapi/tests/test_ws_orders.py index be4a400..60e0b6e 100644 --- a/bfxapi/tests/test_ws_orders.py +++ b/bfxapi/tests/test_ws_orders.py @@ -12,7 +12,7 @@ async def test_submit_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) ## send new order - await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET') + client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET') last_sent = client.ws.get_last_sent_item() sent_order_array = json.loads(last_sent['data']) assert sent_order_array[1] == "on" @@ -30,7 +30,7 @@ async def test_submit_update_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) ## send new order - await client.ws.update_order(123, price=100, amount=0.01, hidden=True) + client.ws.update_order(123, price=100, amount=0.01, hidden=True) last_sent = client.ws.get_last_sent_item() sent_order_array = json.loads(last_sent['data']) assert sent_order_array[1] == "ou" @@ -49,7 +49,7 @@ async def test_submit_cancel_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) ## send new order - await client.ws.cancel_order(123) + client.ws.cancel_order(123) last_sent = client.ws.get_last_sent_item() sent_order_array = json.loads(last_sent['data']) assert sent_order_array[1] == "oc" @@ -66,7 +66,7 @@ async def test_events_on_new_order(): ## look for new order confirmation o_new = EventWatcher.watch(client.ws, 'order_new') - await client.ws.publish([0,"on",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + client.ws.publish([0,"on",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) new_res = o_new.wait_until_complete() assert new_res.amount_orig == -1 assert new_res.amount_filled == 0 @@ -75,7 +75,7 @@ async def test_events_on_new_order(): ## look for order update confirmation o_update = EventWatcher.watch(client.ws, 'order_update') - await client.ws.publish([0,"ou",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + client.ws.publish([0,"ou",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) update_res = o_update.wait_until_complete() assert update_res.amount_orig == -1 assert float(update_res.amount_filled) == -0.5 @@ -84,7 +84,7 @@ async def test_events_on_new_order(): ## look for closed notification o_closed = EventWatcher.watch(client.ws, 'order_closed') - await client.ws.publish([0,"oc",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + client.ws.publish([0,"oc",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) closed_res = o_closed.wait_until_complete() assert new_res.amount_orig == -1 assert new_res.amount_filled == 0 @@ -100,11 +100,11 @@ async def test_events_on_cancel_order(): await ws_publish_auth_accepted(client.ws) ## Create new order - await client.ws.publish([0,"on",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123460,1,1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + client.ws.publish([0,"on",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123460,1,1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) ## look for order closed confirmation o_close = EventWatcher.watch(client.ws, 'order_closed') - await client.ws.publish([0,"oc",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123548,1,1,"EXCHANGE LIMIT",None,None,None,0,"CANCELED",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + client.ws.publish([0,"oc",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123548,1,1,"EXCHANGE LIMIT",None,None,None,0,"CANCELED",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) close_res = o_close.wait_until_complete() assert close_res.amount_orig == 1 assert float(close_res.amount_filled) == 0 @@ -119,12 +119,12 @@ async def test_closed_callback_on_submit_order_closed(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - await client.ws._emit('c1', order) + client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onClose=c) - await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onClose=c) + client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() @@ -136,12 +136,12 @@ async def test_confirmed_callback_on_submit_order_closed(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - await client.ws._emit('c1', order) + client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c) - await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c) + client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() @pytest.mark.asyncio @@ -152,12 +152,12 @@ async def test_confirmed_callback_on_submit_new_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - await client.ws._emit('c1', order) + client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c) - await client.ws.publish([0,"on",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c) + client.ws.publish([0,"on",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() @pytest.mark.asyncio @@ -168,12 +168,12 @@ async def test_confirmed_callback_on_submit_order_update(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - await client.ws._emit('c1', order) + client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - await client.ws.update_order(123, price=100, onConfirm=c) - await client.ws.publish([0,"ou",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + client.ws.update_order(123, price=100, onConfirm=c) + client.ws.publish([0,"ou",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() @pytest.mark.asyncio @@ -184,12 +184,12 @@ async def test_confirmed_callback_on_submit_cancel_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - await client.ws._emit('c1', order) + client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - await client.ws.cancel_order(123, onConfirm=c) - await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + client.ws.cancel_order(123, onConfirm=c) + client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() @pytest.mark.asyncio @@ -200,10 +200,10 @@ async def test_confirmed_callback_on_submit_cancel_group_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) async def c(order): - await client.ws._emit('c1', order) + client.ws._emit('c1', order) callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - await client.ws.cancel_order_group(123, onConfirm=c) - await client.ws.publish([0,"oc",[1548262833910,123,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + client.ws.cancel_order_group(123, onConfirm=c) + client.ws.publish([0,"oc",[1548262833910,123,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() diff --git a/bfxapi/tests/test_ws_subscriptions.py b/bfxapi/tests/test_ws_subscriptions.py index 7e1866a..06d69b9 100644 --- a/bfxapi/tests/test_ws_subscriptions.py +++ b/bfxapi/tests/test_ws_subscriptions.py @@ -10,7 +10,7 @@ async def test_submit_subscribe(): await ws_publish_connection_init(client.ws) # Create new subscription to orderbook - await client.ws.subscribe('book', symb) + client.ws.subscribe('book', symb) last_sent = client.ws.get_last_sent_item() sent_sub = json.loads(last_sent['data']) # {'time': 1548327054030, 'data': '{"event": "subscribe", "channel": "book", "symbol": "tXRPBTC"}'} @@ -19,7 +19,7 @@ async def test_submit_subscribe(): assert sent_sub['symbol'] == symb # create new subscription to trades - await client.ws.subscribe('trades', symb) + client.ws.subscribe('trades', symb) last_sent = client.ws.get_last_sent_item() sent_sub = json.loads(last_sent['data']) # {'event': 'subscribe', 'channel': 'trades', 'symbol': 'tBTCUSD'} @@ -28,7 +28,7 @@ async def test_submit_subscribe(): assert sent_sub['symbol'] == symb # create new subscription to candles - await client.ws.subscribe('candles', symb, timeframe='1m') + client.ws.subscribe('candles', symb, timeframe='1m') last_sent = client.ws.get_last_sent_item() sent_sub = json.loads(last_sent['data']) #{'event': 'subscribe', 'channel': 'candles', 'symbol': 'tBTCUSD', 'key': 'trade:1m:tBTCUSD'} @@ -44,10 +44,10 @@ async def test_event_subscribe(): # publish connection created message await ws_publish_connection_init(client.ws) # create a new subscription - await client.ws.subscribe('trades', symb) + client.ws.subscribe('trades', symb) # announce subscription was successful sub_watch = EventWatcher.watch(client.ws, 'subscribed') - await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) s_res = sub_watch.wait_until_complete() assert s_res.channel_name == 'trades' assert s_res.symbol == symb @@ -62,10 +62,10 @@ async def test_submit_unsubscribe(): # publish connection created message await ws_publish_connection_init(client.ws) # create new subscription to trades - await client.ws.subscribe('trades', symb) + client.ws.subscribe('trades', symb) # announce subscription was successful sub_watch = EventWatcher.watch(client.ws, 'subscribed') - await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) s_res = sub_watch.wait_until_complete() # unsubscribe from channel await s_res.unsubscribe() @@ -83,10 +83,10 @@ async def test_event_unsubscribe(): # publish connection created message await ws_publish_connection_init(client.ws) # create new subscription to trades - await client.ws.subscribe('trades', symb) + client.ws.subscribe('trades', symb) # announce subscription was successful sub_watch = EventWatcher.watch(client.ws, 'subscribed') - await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) s_res = sub_watch.wait_until_complete() # unsubscribe from channel await s_res.unsubscribe() @@ -95,7 +95,7 @@ async def test_event_unsubscribe(): # publish confirmation of unsubscribe unsub_watch = EventWatcher.watch(client.ws, 'unsubscribed') - await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2}) + client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2}) unsub_res = unsub_watch.wait_until_complete() assert s_res.channel_name == 'trades' assert s_res.symbol == symb @@ -110,13 +110,13 @@ async def test_submit_resubscribe(): # publish connection created message await ws_publish_connection_init(client.ws) # request two new subscriptions - await client.ws.subscribe('book', symb) - await client.ws.subscribe('trades', symb) + client.ws.subscribe('book', symb) + client.ws.subscribe('trades', symb) # confirm subscriptions - await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) - await client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair}) + client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair}) # call resubscribe all - await client.ws.resubscribe_all() + client.ws.resubscribe_all() ## assert that 2 unsubscribe requests were sent last_sent = client.ws.get_sent_items()[-2:] for i in last_sent: @@ -124,12 +124,12 @@ async def test_submit_resubscribe(): assert data['event'] == 'unsubscribe' assert (data['chanId'] == 2 or data['chanId'] == 3) ## confirm unsubscriptions - await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2}) - await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":3}) + client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2}) + client.ws.publish({"event":"unsubscribed","status":"OK","chanId":3}) ## confirm subscriptions - # await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) - # await client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair}) + # client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + # client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair}) # wait for emit of event n_last_sent = client.ws.get_sent_items()[-2:] for i in n_last_sent: From 25d748042a2410d4c89f2aa416485d0a945658a7 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Mon, 29 Nov 2021 10:31:45 +0100 Subject: [PATCH 11/12] fixs --- bfxapi/tests/test_ws_capacity.py | 6 ++-- bfxapi/tests/test_ws_orderbook.py | 24 ++++++++-------- bfxapi/tests/test_ws_orders.py | 40 +++++++++++++-------------- bfxapi/tests/test_ws_subscriptions.py | 38 ++++++++++++------------- 4 files changed, 54 insertions(+), 54 deletions(-) diff --git a/bfxapi/tests/test_ws_capacity.py b/bfxapi/tests/test_ws_capacity.py index 77534a1..48cdbf9 100644 --- a/bfxapi/tests/test_ws_capacity.py +++ b/bfxapi/tests/test_ws_capacity.py @@ -12,12 +12,12 @@ async def test_ws_creates_new_socket(): await ws_publish_connection_init(client.ws) # create a bunch of websocket subscriptions for symbol in ['tXRPBTC', 'tLTCUSD']: - client.ws.subscribe('candles', symbol, timeframe='1m') + await client.ws.subscribe('candles', symbol, timeframe='1m') assert len(client.ws.sockets) == 1 assert client.ws.get_total_available_capcity() == 3 # subscribe to a few more to force the lib to create a new ws conenction for symbol in ['tETHBTC', 'tBTCUSD', 'tETHUSD', 'tLTCBTC']: - client.ws.subscribe('candles', symbol, timeframe='1m') + await client.ws.subscribe('candles', symbol, timeframe='1m') assert len(client.ws.sockets) == 2 assert client.ws.get_total_available_capcity() == 4 @@ -29,7 +29,7 @@ async def test_ws_uses_authenticated_socket(): await ws_publish_connection_init(client.ws) # create a bunch of websocket subscriptions for symbol in ['tXRPBTC', 'tLTCUSD', 'tETHBTC', 'tBTCUSD', 'tETHUSD', 'tLTCBTC']: - client.ws.subscribe('candles', symbol, timeframe='1m') + await client.ws.subscribe('candles', symbol, timeframe='1m') # publish connection created message on socket (0 by default) await ws_publish_connection_init(client.ws) # send auth accepted (on socket by default) diff --git a/bfxapi/tests/test_ws_orderbook.py b/bfxapi/tests/test_ws_orderbook.py index 7b4d2d7..ebbee52 100644 --- a/bfxapi/tests/test_ws_orderbook.py +++ b/bfxapi/tests/test_ws_orderbook.py @@ -10,12 +10,12 @@ async def test_checksum_generation(): # publish checksum flag accepted await ws_publish_conf_accepted(client.ws, 131072) # subscribe to order book - client.ws.subscribe('book', symbol) + await client.ws.subscribe('book', symbol) ## send subscription accepted chanId = 123 - client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol}) + await client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol}) ## send orderbook snapshot - client.ws.publish("""[123, [[0.0000886,1,1060.55466114],[0.00008859,1,1000],[0.00008858,1,2713.47159343],[0.00008857,1,4276.92870916],[0.00008856,2,6764.75562319], + await client.ws.publish("""[123, [[0.0000886,1,1060.55466114],[0.00008859,1,1000],[0.00008858,1,2713.47159343],[0.00008857,1,4276.92870916],[0.00008856,2,6764.75562319], [0.00008854,1,5641.48532401],[0.00008853,1,2255.92632223],[0.0000885,1,2256.69584601],[0.00008848,2,3630.3],[0.00008845,1,28195.70625766], [0.00008844,1,15571.7],[0.00008843,1,2500],[0.00008841,1,64196.16117814],[0.00008838,1,7500],[0.00008837,2,2764.12999012],[0.00008834,2,10886.476298], [0.00008831,1,20000],[0.0000883,1,1000],[0.00008829,2,2517.22175358],[0.00008828,1,450.45],[0.00008827,1,13000],[0.00008824,1,1500],[0.0000882,1,300], @@ -25,9 +25,9 @@ async def test_checksum_generation(): [0.00008894,1,-775.08564697],[0.00008896,1,-150],[0.00008899,3,-11628.02590049],[0.000089,2,-1299.7],[0.00008902,2,-4841.8],[0.00008904,3,-25320.46250083], [0.00008909,1,-14000],[0.00008913,1,-123947.999],[0.00008915,2,-28019.6]]]""", is_json=False) ## send some more price updates - client.ws.publish("[{},[0.00008915,0,-1]]".format(chanId), is_json=False) - client.ws.publish("[{},[0.00008837,1,56.54876269]]".format(chanId), is_json=False) - client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False) + await client.ws.publish("[{},[0.00008915,0,-1]]".format(chanId), is_json=False) + await client.ws.publish("[{},[0.00008837,1,56.54876269]]".format(chanId), is_json=False) + await client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False) ## check checksum is the same as expected expected_checksum = 30026640 actual_checksum = client.ws.orderBooks[symbol].checksum() @@ -42,12 +42,12 @@ async def test_checksum_really_samll_numbers_generation(): # publish checksum flag accepted await ws_publish_conf_accepted(client.ws, 131072) # subscribe to order book - client.ws.subscribe('book', symbol) + await client.ws.subscribe('book', symbol) ## send subscription accepted chanId = 123 - client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol}) + await client.ws.publish({"event":"subscribed","channel":"book","chanId": chanId,"symbol": symbol,"prec":"P0","freq":"F0","len":"25","pair": symbol}) ## send orderbook snapshot - client.ws.publish("""[123, [[0.00000121,5,249013.0209708],[0.0000012,6,518315.33310128],[0.00000119,4,566200.89],[0.00000118,2,260000],[0.00000117,1,100000], + await client.ws.publish("""[123, [[0.00000121,5,249013.0209708],[0.0000012,6,518315.33310128],[0.00000119,4,566200.89],[0.00000118,2,260000],[0.00000117,1,100000], [0.00000116,2,160000],[0.00000114,1,60000],[0.00000113,2,198500],[0.00000112,1,60000],[0.0000011,1,60000],[0.00000106,2,113868.87735849],[0.00000105,2,105000], [0.00000103,1,3000],[0.00000102,2,105000],[0.00000101,2,202970],[0.000001,2,21000],[7e-7,1,10000],[6.6e-7,1,10000],[6e-7,1,100000],[4.9e-7,1,10000],[2.5e-7,1,2000], [6e-8,1,100000],[5e-8,1,200000],[1e-8,4,640000],[0.00000122,7,-312043.19],[0.00000123,6,-415094.8939744],[0.00000124,5,-348181.23],[0.00000125,1,-12000], @@ -56,9 +56,9 @@ async def test_checksum_really_samll_numbers_generation(): [0.00000164,1,-4000],[0.00000166,1,-3831.46784605],[0.00000171,1,-14575.17730379],[0.00000174,1,-3124.81815395],[0.0000018,1,-18000],[0.00000182,1,-16000], [0.00000186,1,-4000],[0.00000189,1,-10000.686624],[0.00000191,1,-14500]]]""", is_json=False) ## send some more price updates - client.ws.publish("[{},[0.00000121,4,228442.6609708]]".format(chanId), is_json=False) - client.ws.publish("[{},[0.00000121,6,304023.8109708]]".format(chanId), is_json=False) - # client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False) + await client.ws.publish("[{},[0.00000121,4,228442.6609708]]".format(chanId), is_json=False) + await client.ws.publish("[{},[0.00000121,6,304023.8109708]]".format(chanId), is_json=False) + # await client.ws.publish("[{},[0.00008873,1,-15699.9]]".format(chanId), is_json=False) ## check checksum is the same as expected expected_checksum = 1770440002 actual_checksum = client.ws.orderBooks[symbol].checksum() diff --git a/bfxapi/tests/test_ws_orders.py b/bfxapi/tests/test_ws_orders.py index 60e0b6e..b28a841 100644 --- a/bfxapi/tests/test_ws_orders.py +++ b/bfxapi/tests/test_ws_orders.py @@ -12,7 +12,7 @@ async def test_submit_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) ## send new order - client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET') + await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET') last_sent = client.ws.get_last_sent_item() sent_order_array = json.loads(last_sent['data']) assert sent_order_array[1] == "on" @@ -30,7 +30,7 @@ async def test_submit_update_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) ## send new order - client.ws.update_order(123, price=100, amount=0.01, hidden=True) + await client.ws.update_order(123, price=100, amount=0.01, hidden=True) last_sent = client.ws.get_last_sent_item() sent_order_array = json.loads(last_sent['data']) assert sent_order_array[1] == "ou" @@ -49,7 +49,7 @@ async def test_submit_cancel_order(): ## send auth accepted await ws_publish_auth_accepted(client.ws) ## send new order - client.ws.cancel_order(123) + await client.ws.cancel_order(123) last_sent = client.ws.get_last_sent_item() sent_order_array = json.loads(last_sent['data']) assert sent_order_array[1] == "oc" @@ -66,7 +66,7 @@ async def test_events_on_new_order(): ## look for new order confirmation o_new = EventWatcher.watch(client.ws, 'order_new') - client.ws.publish([0,"on",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + await client.ws.publish([0,"on",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) new_res = o_new.wait_until_complete() assert new_res.amount_orig == -1 assert new_res.amount_filled == 0 @@ -75,7 +75,7 @@ async def test_events_on_new_order(): ## look for order update confirmation o_update = EventWatcher.watch(client.ws, 'order_update') - client.ws.publish([0,"ou",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + await client.ws.publish([0,"ou",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) update_res = o_update.wait_until_complete() assert update_res.amount_orig == -1 assert float(update_res.amount_filled) == -0.5 @@ -84,7 +84,7 @@ async def test_events_on_new_order(): ## look for closed notification o_closed = EventWatcher.watch(client.ws, 'order_closed') - client.ws.publish([0,"oc",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + await client.ws.publish([0,"oc",[1151718504,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) closed_res = o_closed.wait_until_complete() assert new_res.amount_orig == -1 assert new_res.amount_filled == 0 @@ -100,11 +100,11 @@ async def test_events_on_cancel_order(): await ws_publish_auth_accepted(client.ws) ## Create new order - client.ws.publish([0,"on",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123460,1,1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + await client.ws.publish([0,"on",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123460,1,1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) ## look for order closed confirmation o_close = EventWatcher.watch(client.ws, 'order_closed') - client.ws.publish([0,"oc",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123548,1,1,"EXCHANGE LIMIT",None,None,None,0,"CANCELED",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + await client.ws.publish([0,"oc",[1151718565,None,1548325124885,"tBTCUSD",1548325123435,1548325123548,1,1,"EXCHANGE LIMIT",None,None,None,0,"CANCELED",None,None,10,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) close_res = o_close.wait_until_complete() assert close_res.amount_orig == 1 assert float(close_res.amount_filled) == 0 @@ -123,8 +123,8 @@ async def test_closed_callback_on_submit_order_closed(): callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onClose=c) - client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onClose=c) + await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() @@ -140,8 +140,8 @@ async def test_confirmed_callback_on_submit_order_closed(): callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c) - client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c) + await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() @pytest.mark.asyncio @@ -156,8 +156,8 @@ async def test_confirmed_callback_on_submit_new_order(): callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c) - client.ws.publish([0,"on",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + await client.ws.submit_order('tBTCUSD', 19000, 0.01, 'EXCHANGE MARKET', onConfirm=c) + await client.ws.publish([0,"on",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262833410,-1,-1,"EXCHANGE LIMIT",None,None,None,0,"ACTIVE",None,None,15980,0,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() @pytest.mark.asyncio @@ -172,8 +172,8 @@ async def test_confirmed_callback_on_submit_order_update(): callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - client.ws.update_order(123, price=100, onConfirm=c) - client.ws.publish([0,"ou",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + await client.ws.update_order(123, price=100, onConfirm=c) + await client.ws.publish([0,"ou",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262846964,-0.5,-1,"EXCHANGE LIMIT",None,None,None,0,"PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() @pytest.mark.asyncio @@ -188,8 +188,8 @@ async def test_confirmed_callback_on_submit_cancel_order(): callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - client.ws.cancel_order(123, onConfirm=c) - client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + await client.ws.cancel_order(123, onConfirm=c) + await client.ws.publish([0,"oc",[123,None,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() @pytest.mark.asyncio @@ -204,6 +204,6 @@ async def test_confirmed_callback_on_submit_cancel_group_order(): callback_wait = EventWatcher.watch(client.ws, 'c1') # override cid generation client.ws.orderManager._gen_unique_cid = lambda: 123 - client.ws.cancel_order_group(123, onConfirm=c) - client.ws.publish([0,"oc",[1548262833910,123,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) + await client.ws.cancel_order_group(123, onConfirm=c) + await client.ws.publish([0,"oc",[1548262833910,123,1548262833910,"tBTCUSD",1548262833379,1548262888016,0,-1,"EXCHANGE LIMIT",None,None,None,0,"EXECUTED @ 15980.0(-0.5): was PARTIALLY FILLED @ 15980.0(-0.5)",None,None,15980,15980,0,0,None,None,None,0,0,None,None,None,"API>BFX",None,None,None]]) callback_wait.wait_until_complete() diff --git a/bfxapi/tests/test_ws_subscriptions.py b/bfxapi/tests/test_ws_subscriptions.py index 06d69b9..7e1866a 100644 --- a/bfxapi/tests/test_ws_subscriptions.py +++ b/bfxapi/tests/test_ws_subscriptions.py @@ -10,7 +10,7 @@ async def test_submit_subscribe(): await ws_publish_connection_init(client.ws) # Create new subscription to orderbook - client.ws.subscribe('book', symb) + await client.ws.subscribe('book', symb) last_sent = client.ws.get_last_sent_item() sent_sub = json.loads(last_sent['data']) # {'time': 1548327054030, 'data': '{"event": "subscribe", "channel": "book", "symbol": "tXRPBTC"}'} @@ -19,7 +19,7 @@ async def test_submit_subscribe(): assert sent_sub['symbol'] == symb # create new subscription to trades - client.ws.subscribe('trades', symb) + await client.ws.subscribe('trades', symb) last_sent = client.ws.get_last_sent_item() sent_sub = json.loads(last_sent['data']) # {'event': 'subscribe', 'channel': 'trades', 'symbol': 'tBTCUSD'} @@ -28,7 +28,7 @@ async def test_submit_subscribe(): assert sent_sub['symbol'] == symb # create new subscription to candles - client.ws.subscribe('candles', symb, timeframe='1m') + await client.ws.subscribe('candles', symb, timeframe='1m') last_sent = client.ws.get_last_sent_item() sent_sub = json.loads(last_sent['data']) #{'event': 'subscribe', 'channel': 'candles', 'symbol': 'tBTCUSD', 'key': 'trade:1m:tBTCUSD'} @@ -44,10 +44,10 @@ async def test_event_subscribe(): # publish connection created message await ws_publish_connection_init(client.ws) # create a new subscription - client.ws.subscribe('trades', symb) + await client.ws.subscribe('trades', symb) # announce subscription was successful sub_watch = EventWatcher.watch(client.ws, 'subscribed') - client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) s_res = sub_watch.wait_until_complete() assert s_res.channel_name == 'trades' assert s_res.symbol == symb @@ -62,10 +62,10 @@ async def test_submit_unsubscribe(): # publish connection created message await ws_publish_connection_init(client.ws) # create new subscription to trades - client.ws.subscribe('trades', symb) + await client.ws.subscribe('trades', symb) # announce subscription was successful sub_watch = EventWatcher.watch(client.ws, 'subscribed') - client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) s_res = sub_watch.wait_until_complete() # unsubscribe from channel await s_res.unsubscribe() @@ -83,10 +83,10 @@ async def test_event_unsubscribe(): # publish connection created message await ws_publish_connection_init(client.ws) # create new subscription to trades - client.ws.subscribe('trades', symb) + await client.ws.subscribe('trades', symb) # announce subscription was successful sub_watch = EventWatcher.watch(client.ws, 'subscribed') - client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) s_res = sub_watch.wait_until_complete() # unsubscribe from channel await s_res.unsubscribe() @@ -95,7 +95,7 @@ async def test_event_unsubscribe(): # publish confirmation of unsubscribe unsub_watch = EventWatcher.watch(client.ws, 'unsubscribed') - client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2}) + await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2}) unsub_res = unsub_watch.wait_until_complete() assert s_res.channel_name == 'trades' assert s_res.symbol == symb @@ -110,13 +110,13 @@ async def test_submit_resubscribe(): # publish connection created message await ws_publish_connection_init(client.ws) # request two new subscriptions - client.ws.subscribe('book', symb) - client.ws.subscribe('trades', symb) + await client.ws.subscribe('book', symb) + await client.ws.subscribe('trades', symb) # confirm subscriptions - client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) - client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair}) + await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + await client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair}) # call resubscribe all - client.ws.resubscribe_all() + await client.ws.resubscribe_all() ## assert that 2 unsubscribe requests were sent last_sent = client.ws.get_sent_items()[-2:] for i in last_sent: @@ -124,12 +124,12 @@ async def test_submit_resubscribe(): assert data['event'] == 'unsubscribe' assert (data['chanId'] == 2 or data['chanId'] == 3) ## confirm unsubscriptions - client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2}) - client.ws.publish({"event":"unsubscribed","status":"OK","chanId":3}) + await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":2}) + await client.ws.publish({"event":"unsubscribed","status":"OK","chanId":3}) ## confirm subscriptions - # client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) - # client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair}) + # await client.ws.publish({"event":"subscribed","channel":"trades","chanId":2,"symbol":symb,"pair":pair}) + # await client.ws.publish({"event":"subscribed","channel":"book","chanId":3,"symbol":symb,"prec":"P0","freq":"F0","len":"25","pair":pair}) # wait for emit of event n_last_sent = client.ws.get_sent_items()[-2:] for i in n_last_sent: From 603f546037191dda39d4bf92d50d0a3beddb7e07 Mon Sep 17 00:00:00 2001 From: itsdeka Date: Mon, 29 Nov 2021 18:37:01 +0100 Subject: [PATCH 12/12] fix run() --- bfxapi/websockets/generic_websocket.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/bfxapi/websockets/generic_websocket.py b/bfxapi/websockets/generic_websocket.py index f155278..20ff37a 100644 --- a/bfxapi/websockets/generic_websocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -84,6 +84,8 @@ class GenericWebsocket: thread and connection. """ self._start_new_socket() + while True: + time.sleep(1) def get_task_executable(self): """