diff --git a/main.py b/main.py index b4fbb93..aee2128 100644 --- a/main.py +++ b/main.py @@ -20,15 +20,7 @@ async def dm(): ) client = NostrClient(privatekey_hex=pk) - - filters = { - "since": int( - time.mktime( - (datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple() - ) - ) - } - + # await asyncio.sleep(1) t = threading.Thread( target=client.get_dm, args=( @@ -59,6 +51,7 @@ async def post(): print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}") sender_client = NostrClient(privatekey_hex=pk) + # await asyncio.sleep(1) to_pubk_hex = ( input( diff --git a/nostr/client/client.py b/nostr/client/client.py index b7454d8..86501b9 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -21,12 +21,10 @@ from . import cbc class NostrClient: relays = [ - # "wss://relay.snort.social", + "wss://lnbits.link/nostrrelay/client", "wss://nostr-pub.wellorder.net", "wss://nostr.zebedee.cloud", - "wss://nostr.mom", - # "wss://wss://lnbits.link/nostrrelay/client" - # "wss://no.str.cr", + "wss://no.str.cr", ] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr" relay_manager = RelayManager() private_key: PrivateKey @@ -46,6 +44,7 @@ class NostrClient: self.relay_manager.open_connections( {"cert_reqs": ssl.CERT_NONE} ) # NOTE: This disables ssl certificate verification + self.relay_manager.start_message_workers() def close(self): self.relay_manager.close_connections() diff --git a/nostr/relay.py b/nostr/relay.py index dff2a99..bccbcad 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -1,5 +1,6 @@ import json import time +from queue import Queue from threading import Lock from websocket import WebSocketApp from .event import Event @@ -40,6 +41,7 @@ class Relay: self.ssl_options: dict = {} self.proxy: dict = {} self.lock = Lock() + self.queue = Queue() self.ws = WebSocketApp( url, on_open=self._on_open, @@ -52,17 +54,16 @@ class Relay: def connect(self, ssl_options: dict = None, proxy: dict = None): self.ssl_options = ssl_options - print(self.url, "🟢") self.proxy = proxy self.ws.run_forever( sslopt=ssl_options, http_proxy_host=None if proxy is None else proxy.get("host"), http_proxy_port=None if proxy is None else proxy.get("port"), proxy_type=None if proxy is None else proxy.get("type"), + ping_interval=5, ) def close(self): - print(self.url, "🔴") self.ws.close() def check_reconnect(self): @@ -78,14 +79,21 @@ class Relay: @property def ping(self): if self.connected: - return int(self.ws.last_ping_tm - self.ws.last_pong_tm) + return int((self.ws.last_pong_tm - self.ws.last_ping_tm) * 1000) else: return 0 def publish(self, message: str): - if self.connected: - self.num_sent_events += 1 - self.ws.send(message) + self.queue.put(message) + + def queue_worker(self): + while True: + if self.connected: + message = self.queue.get() + self.num_sent_events += 1 + self.ws.send(message) + else: + time.sleep(0.1) def add_subscription(self, id, filters: Filters): with self.lock: @@ -122,11 +130,8 @@ class Relay: if self._is_valid_message(message): self.num_received_events += 1 self.message_pool.add_message(message, self.url) - else: - print(self.url, "invalid message", message) def _on_error(self, class_obj, error): - print(self.url, "🚫", error) self.connected = False self.error_counter += 1 if self.error_threshold and self.error_counter > self.error_threshold: @@ -135,11 +140,9 @@ class Relay: self.check_reconnect() def _on_ping(self, class_obj, message): - print(self.url, "ping", message) return def _on_pong(self, class_obj, message): - print(self.url, "pong", message) return def _is_valid_message(self, message: str) -> bool: diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index c2ac29b..44d9e7b 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -43,6 +43,13 @@ class RelayManager: name=f"{relay.url}-thread", ).start() + def start_message_workers(self): + for relay in self.relays.values(): + threading.Thread( + target=relay.queue_worker, + name=f"{relay.url}-queue", + ).start() + def close_connections(self): for relay in self.relays.values(): relay.close()