From d54c0d23912caeed2456700aec4fe07226c14837 Mon Sep 17 00:00:00 2001 From: JacobPlaster Date: Thu, 2 Jan 2020 15:52:14 +0000 Subject: [PATCH] websocket: add write lock when sending --- bfxapi/models/subscription.py | 4 ++-- bfxapi/websockets/bfx_websocket.py | 6 +++--- bfxapi/websockets/generic_websocket.py | 28 ++++++++++++++------------ 3 files changed, 20 insertions(+), 18 deletions(-) diff --git a/bfxapi/models/subscription.py b/bfxapi/models/subscription.py index 3db29de..449b16e 100644 --- a/bfxapi/models/subscription.py +++ b/bfxapi/models/subscription.py @@ -54,13 +54,13 @@ class Subscription: if not self.is_subscribed(): raise Exception("Subscription is not subscribed to websocket") payload = {'event': 'unsubscribe', 'chanId': self.chan_id} - await self.socket.ws.send(json.dumps(payload)) + await self.socket.send(json.dumps(payload)) async def subscribe(self): """ Send a subscription request to the bitfinex socket """ - await self.socket.ws.send(json.dumps(self._get_send_payload())) + await self.socket.send(json.dumps(self._get_send_payload())) def confirm_unsubscribe(self): """ diff --git a/bfxapi/websockets/bfx_websocket.py b/bfxapi/websockets/bfx_websocket.py index a352261..bce9a0c 100644 --- a/bfxapi/websockets/bfx_websocket.py +++ b/bfxapi/websockets/bfx_websocket.py @@ -445,7 +445,7 @@ class BfxWebsocket(GenericWebsocket): jdata['dms'] = 4 if len(self.channel_filter) > 0: jdata['filter'] = self.channel_filter - await socket.ws.send(json.dumps(jdata)) + await socket.send(json.dumps(jdata)) async def on_open(self, socket_id): self.logger.info("Websocket opened.") @@ -470,7 +470,7 @@ class BfxWebsocket(GenericWebsocket): raise ValueError("authenticated socket connection not found") if not socket.isConnected: raise ValueError("authenticated socket not connected") - await socket.ws.send(json.dumps(payload)) + await socket.send(json.dumps(payload)) def get_orderbook(self, symbol): return self.orderBooks.get(symbol, None) @@ -508,7 +508,7 @@ class BfxWebsocket(GenericWebsocket): # enable on all sockets for socket in self.sockets.values(): if socket.isConnected: - await socket.ws.send(json.dumps(payload)) + await socket.send(json.dumps(payload)) async def subscribe_order_book(self, symbol): """ diff --git a/bfxapi/websockets/generic_websocket.py b/bfxapi/websockets/generic_websocket.py index 42a7e17..2d2863c 100644 --- a/bfxapi/websockets/generic_websocket.py +++ b/bfxapi/websockets/generic_websocket.py @@ -7,7 +7,7 @@ import websockets import socket import json import time -from threading import Thread +from threading import Thread, Lock from pyee import EventEmitter from ..utils.custom_logger import CustomLogger @@ -34,6 +34,7 @@ class Socket(): self.isConnected = False self.isAuthenticated = False self.id = sId + self.lock = Lock() def set_connected(self): self.isConnected = True @@ -50,6 +51,10 @@ class Socket(): def set_websocket(self, ws): self.ws = ws + async def send(self, data): + with self.lock: + await self.ws.send(data) + def _start_event_worker(): async def event_sleep_process(): """ @@ -120,16 +125,6 @@ class GenericWebsocket: return time.sleep(0.01) - async def _connect(self, socket): - async with websockets.connect(self.host) as websocket: - self.sockets[socket.id].set_websocket(websocket) - self.sockets[socket.id].set_connected() - self.logger.info("Websocket connected to {}".format(self.host)) - while True: - await asyncio.sleep(0) - message = await websocket.recv() - await self.on_message(socket.id, message) - def get_socket(self, socketId): return self.sockets[socketId] @@ -146,8 +141,15 @@ class GenericWebsocket: self.sockets[sId] = s while retries < self.max_retries and self.attempt_retry: try: - await self._connect(s) - retries = 0 + async with websockets.connect(self.host) as websocket: + self.sockets[sId].set_websocket(websocket) + self.sockets[sId].set_connected() + self.logger.info("Websocket connected to {}".format(self.host)) + retries = 0 + while True: + await asyncio.sleep(0) + message = await websocket.recv() + await self.on_message(sId, message) except (ConnectionClosed, socket.error) as e: self.sockets[sId].set_disconnected() if self.sockets[sId].isAuthenticated: