websocket: add write lock when sending

This commit is contained in:
JacobPlaster
2020-01-02 15:52:14 +00:00
committed by Jacob Plaster
parent 7db6b5d73f
commit d54c0d2391
3 changed files with 20 additions and 18 deletions

View File

@@ -54,13 +54,13 @@ class Subscription:
if not self.is_subscribed(): if not self.is_subscribed():
raise Exception("Subscription is not subscribed to websocket") raise Exception("Subscription is not subscribed to websocket")
payload = {'event': 'unsubscribe', 'chanId': self.chan_id} payload = {'event': 'unsubscribe', 'chanId': self.chan_id}
await self.socket.ws.send(json.dumps(payload)) await self.socket.send(json.dumps(payload))
async def subscribe(self): async def subscribe(self):
""" """
Send a subscription request to the bitfinex socket Send a subscription request to the bitfinex socket
""" """
await self.socket.ws.send(json.dumps(self._get_send_payload())) await self.socket.send(json.dumps(self._get_send_payload()))
def confirm_unsubscribe(self): def confirm_unsubscribe(self):
""" """

View File

@@ -445,7 +445,7 @@ class BfxWebsocket(GenericWebsocket):
jdata['dms'] = 4 jdata['dms'] = 4
if len(self.channel_filter) > 0: if len(self.channel_filter) > 0:
jdata['filter'] = self.channel_filter jdata['filter'] = self.channel_filter
await socket.ws.send(json.dumps(jdata)) await socket.send(json.dumps(jdata))
async def on_open(self, socket_id): async def on_open(self, socket_id):
self.logger.info("Websocket opened.") self.logger.info("Websocket opened.")
@@ -470,7 +470,7 @@ class BfxWebsocket(GenericWebsocket):
raise ValueError("authenticated socket connection not found") raise ValueError("authenticated socket connection not found")
if not socket.isConnected: if not socket.isConnected:
raise ValueError("authenticated socket not connected") raise ValueError("authenticated socket not connected")
await socket.ws.send(json.dumps(payload)) await socket.send(json.dumps(payload))
def get_orderbook(self, symbol): def get_orderbook(self, symbol):
return self.orderBooks.get(symbol, None) return self.orderBooks.get(symbol, None)
@@ -508,7 +508,7 @@ class BfxWebsocket(GenericWebsocket):
# enable on all sockets # enable on all sockets
for socket in self.sockets.values(): for socket in self.sockets.values():
if socket.isConnected: if socket.isConnected:
await socket.ws.send(json.dumps(payload)) await socket.send(json.dumps(payload))
async def subscribe_order_book(self, symbol): async def subscribe_order_book(self, symbol):
""" """

View File

@@ -7,7 +7,7 @@ import websockets
import socket import socket
import json import json
import time import time
from threading import Thread from threading import Thread, Lock
from pyee import EventEmitter from pyee import EventEmitter
from ..utils.custom_logger import CustomLogger from ..utils.custom_logger import CustomLogger
@@ -34,6 +34,7 @@ class Socket():
self.isConnected = False self.isConnected = False
self.isAuthenticated = False self.isAuthenticated = False
self.id = sId self.id = sId
self.lock = Lock()
def set_connected(self): def set_connected(self):
self.isConnected = True self.isConnected = True
@@ -50,6 +51,10 @@ class Socket():
def set_websocket(self, ws): def set_websocket(self, ws):
self.ws = ws self.ws = ws
async def send(self, data):
with self.lock:
await self.ws.send(data)
def _start_event_worker(): def _start_event_worker():
async def event_sleep_process(): async def event_sleep_process():
""" """
@@ -120,16 +125,6 @@ class GenericWebsocket:
return return
time.sleep(0.01) time.sleep(0.01)
async def _connect(self, socket):
async with websockets.connect(self.host) as websocket:
self.sockets[socket.id].set_websocket(websocket)
self.sockets[socket.id].set_connected()
self.logger.info("Websocket connected to {}".format(self.host))
while True:
await asyncio.sleep(0)
message = await websocket.recv()
await self.on_message(socket.id, message)
def get_socket(self, socketId): def get_socket(self, socketId):
return self.sockets[socketId] return self.sockets[socketId]
@@ -146,8 +141,15 @@ class GenericWebsocket:
self.sockets[sId] = s self.sockets[sId] = s
while retries < self.max_retries and self.attempt_retry: while retries < self.max_retries and self.attempt_retry:
try: try:
await self._connect(s) async with websockets.connect(self.host) as websocket:
self.sockets[sId].set_websocket(websocket)
self.sockets[sId].set_connected()
self.logger.info("Websocket connected to {}".format(self.host))
retries = 0 retries = 0
while True:
await asyncio.sleep(0)
message = await websocket.recv()
await self.on_message(sId, message)
except (ConnectionClosed, socket.error) as e: except (ConnectionClosed, socket.error) as e:
self.sockets[sId].set_disconnected() self.sockets[sId].set_disconnected()
if self.sockets[sId].isAuthenticated: if self.sockets[sId].isAuthenticated: