This commit is contained in:
vic
2023-10-29 15:04:38 -04:00
parent 5c09103a8a
commit 9b290d616c
10 changed files with 74 additions and 69 deletions

View File

@@ -2,7 +2,9 @@ import json
import time
from queue import Queue
from threading import Lock
from websocket import WebSocketApp, WebSocketConnectionClosedException
from websocket import WebSocketApp
from .event import Event
from .filter import Filters
from .message_pool import MessagePool
@@ -33,8 +35,9 @@ class Relay:
self.subscriptions = subscriptions
self.connected: bool = False
self.reconnect: bool = True
self.shutdown: bool = False
self.error_counter: int = 0
self.error_threshold: int = 0
self.error_threshold: int = 100
self.num_received_events: int = 0
self.num_sent_events: int = 0
self.num_subscriptions: int = 0
@@ -42,8 +45,10 @@ class Relay:
self.proxy: dict = {}
self.lock = Lock()
self.queue = Queue()
def connect(self, ssl_options: dict = None, proxy: dict = None):
self.ws = WebSocketApp(
url,
self.url,
on_open=self._on_open,
on_message=self._on_message,
on_error=self._on_error,
@@ -51,8 +56,6 @@ class Relay:
on_ping=self._on_ping,
on_pong=self._on_pong,
)
def connect(self, ssl_options: dict = None, proxy: dict = None):
self.ssl_options = ssl_options
self.proxy = proxy
if not self.connected:
@@ -66,6 +69,7 @@ class Relay:
def close(self):
self.ws.close()
self.shutdown = True
def check_reconnect(self):
try:
@@ -74,7 +78,7 @@ class Relay:
pass
self.connected = False
if self.reconnect:
time.sleep(1)
time.sleep(self.error_counter**2)
self.connect(self.ssl_options, self.proxy)
@property
@@ -85,15 +89,16 @@ class Relay:
def publish(self, message: str):
self.queue.put(message)
def queue_worker(self):
def queue_worker(self, shutdown):
while True:
if self.connected:
message = self.queue.get()
self.num_sent_events += 1
try:
message = self.queue.get(timeout=1)
self.ws.send(message)
except WebSocketConnectionClosedException as wscce:
self._on_error(None, wscce)
self.num_sent_events += 1
except:
if shutdown():
break
else:
time.sleep(0.1)
@@ -126,6 +131,10 @@ class Relay:
def _on_close(self, class_obj, status_code, message):
self.connected = False
if self.error_threshold and self.error_counter > self.error_threshold:
pass
else:
self.check_reconnect()
pass
def _on_message(self, class_obj, message: str):
@@ -136,10 +145,6 @@ class Relay:
def _on_error(self, class_obj, error):
self.connected = False
self.error_counter += 1
if self.error_threshold and self.error_counter > self.error_threshold:
pass
else:
self.check_reconnect()
def _on_ping(self, class_obj, message):
return