message queues

This commit is contained in:
callebtc
2023-02-07 23:05:32 +01:00
parent 8e39666fe2
commit 1812b0128f
4 changed files with 24 additions and 8 deletions

View File

@@ -1,5 +1,6 @@
import json
import time
from queue import Queue
from threading import Lock
from websocket import WebSocketApp
from .event import Event
@@ -37,6 +38,7 @@ class Relay:
self.ssl_options: dict = {}
self.proxy: dict = {}
self.lock = Lock()
self.queue = Queue()
self.ws = WebSocketApp(
url,
on_open=self._on_open,
@@ -69,8 +71,15 @@ class Relay:
self.connect(self.ssl_options, self.proxy)
def publish(self, message: str):
if self.connected:
self.ws.send(message)
self.queue.put(message)
def queue_worker(self):
while True:
if self.connected:
message = self.queue.get()
self.ws.send(message)
else:
time.sleep(0.1)
def add_subscription(self, id, filters: Filters):
with self.lock: