From 1812b0128f230a7b8be5df48ff7077edc6b64484 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Tue, 7 Feb 2023 23:05:32 +0100 Subject: [PATCH] message queues --- main.py | 4 ++-- nostr/client/client.py | 8 ++++---- nostr/relay.py | 13 +++++++++++-- nostr/relay_manager.py | 7 +++++++ 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/main.py b/main.py index 06bab60..c506783 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,7 @@ async def dm(): ) client = NostrClient(privatekey_hex=pk) - await asyncio.sleep(1) + # await asyncio.sleep(1) t = threading.Thread( target=client.get_dm, @@ -50,7 +50,7 @@ async def post(): print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}") sender_client = NostrClient(privatekey_hex=pk) - await asyncio.sleep(1) + # await asyncio.sleep(1) to_pubk_hex = ( input("Enter other pubkey (enter nothing to read your own posts): ") diff --git a/nostr/client/client.py b/nostr/client/client.py index 8cceb42..16e3c64 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -21,10 +21,9 @@ from . import cbc class NostrClient: relays = [ - # "wss://lnbits.link/nostrrelay/client" - "wss://nostr-pub.wellorder.net", - # "wss://nostr.zebedee.cloud", - # "wss://no.str.cr", + "wss://lnbits.link/nostrrelay/client" "wss://nostr-pub.wellorder.net", + "wss://nostr.zebedee.cloud", + "wss://no.str.cr", ] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr" relay_manager = RelayManager() private_key: PrivateKey @@ -44,6 +43,7 @@ class NostrClient: self.relay_manager.open_connections( {"cert_reqs": ssl.CERT_NONE} ) # NOTE: This disables ssl certificate verification + self.relay_manager.start_message_workers() def close(self): self.relay_manager.close_connections() diff --git a/nostr/relay.py b/nostr/relay.py index cc3d017..e47a17e 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -1,5 +1,6 @@ import json import time +from queue import Queue from threading import Lock from websocket import WebSocketApp from .event import Event @@ -37,6 +38,7 @@ class Relay: self.ssl_options: dict = {} self.proxy: dict = {} self.lock = Lock() + self.queue = Queue() self.ws = WebSocketApp( url, on_open=self._on_open, @@ -69,8 +71,15 @@ class Relay: self.connect(self.ssl_options, self.proxy) def publish(self, message: str): - if self.connected: - self.ws.send(message) + self.queue.put(message) + + def queue_worker(self): + while True: + if self.connected: + message = self.queue.get() + self.ws.send(message) + else: + time.sleep(0.1) def add_subscription(self, id, filters: Filters): with self.lock: diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index c2ac29b..44d9e7b 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -43,6 +43,13 @@ class RelayManager: name=f"{relay.url}-thread", ).start() + def start_message_workers(self): + for relay in self.relays.values(): + threading.Thread( + target=relay.queue_worker, + name=f"{relay.url}-queue", + ).start() + def close_connections(self): for relay in self.relays.values(): relay.close()