tests: add websocket capacity test

This commit is contained in:
Jacob Plaster
2019-06-18 14:42:35 +08:00
committed by Jacob Plaster
parent 13e7d505f8
commit 286e922eb1
4 changed files with 77 additions and 14 deletions

View File

@@ -2,23 +2,33 @@ import time
import json
import asyncio
from .. import Client, BfxWebsocket
from .. import Client, BfxWebsocket, Socket
def get_now():
return int(round(time.time() * 1000))
def ev_worker_override():
return asyncio.get_event_loop()
class StubbedWebsocket(BfxWebsocket):
def __new__(cls, *args, **kwargs):
instance = super(StubbedWebsocket, cls).__new__(cls, *args, **kwargs)
instance.sent_items = []
instance.published_items = []
return instance
def __init__(self, *args, **kwargs):
self.sent_items = []
self.published_items = []
super().__init__(create_event_emitter=ev_worker_override, *args, **kwargs)
async def _main(self, host):
print ("Faking wesocket connection to {}".format(host))
def get_ws(self):
return self
def _start_new_socket(self):
socket = Socket(len(self.sockets))
socket.set_connected()
socket.ws = self
self.sockets[socket.id] = socket
return socket.id
def _wait_for_socket(self, socketId):
return
async def publish(self, data, is_json=True):
self.published_items += [{
@@ -27,7 +37,7 @@ class StubbedWebsocket(BfxWebsocket):
}]
# convert to string and push through the websocket
data = json.dumps(data) if is_json else data
return await self.on_message(data)
return await self.on_message(0, data)
async def publish_auth_confirmation(self):
return self.publish({"event":"auth","status":"OK","chanId":0,"userId":269499,"auth_id":"58aa0472-b1a9-4690-8ab8-300d68e66aaf","caps":{"orders":{"read":1,"write":1},"account":{"read":1,"write":0},"funding":{"read":1,"write":1},"history":{"read":1,"write":0},"wallets":{"read":1,"write":1},"withdraw":{"read":0,"write":1},"positions":{"read":1,"write":1}}})
@@ -73,11 +83,24 @@ class EventWatcher():
counter += 1
return self.value
class StubClient():
ws = None
res = None
def create_stubbed_client(*args, **kwargs):
client = Client(*args, **kwargs)
client = StubClient()
# no support for rest stubbing yet
client.rest = None
client.ws = StubbedWebsocket(*args, **kwargs)
wsStub = StubbedWebsocket(*args, **kwargs)
# stub client.ws so tests can use publish
client.ws = wsStub
client.ws.API_KEY = "test key"
client.ws.API_SECRET = "secret key"
# stub socket so we can track socket send requests
socket = Socket(0)
socket.set_connected()
socket.ws = wsStub
client.ws.sockets = { 0: socket }
return client
async def ws_publish_auth_accepted(ws):