diff --git a/main.py b/main.py index 06bab60..aee2128 100644 --- a/main.py +++ b/main.py @@ -3,6 +3,8 @@ from nostr.event import Event from nostr.key import PublicKey import asyncio import threading +import time +import datetime async def dm(): @@ -18,8 +20,7 @@ async def dm(): ) client = NostrClient(privatekey_hex=pk) - await asyncio.sleep(1) - + # await asyncio.sleep(1) t = threading.Thread( target=client.get_dm, args=( @@ -50,20 +51,34 @@ async def post(): print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}") sender_client = NostrClient(privatekey_hex=pk) - await asyncio.sleep(1) + # await asyncio.sleep(1) to_pubk_hex = ( - input("Enter other pubkey (enter nothing to read your own posts): ") + input( + "Enter other pubkey (enter nothing to read your own posts, enter * for all): " + ) or sender_client.public_key.hex() ) - print(f"Subscribing to posts by {to_pubk_hex}") - to_pubk = PublicKey(bytes.fromhex(to_pubk_hex)) + if to_pubk_hex == "*": + to_pubk = None + else: + print(f"Subscribing to posts by {to_pubk_hex}") + to_pubk = PublicKey(bytes.fromhex(to_pubk_hex)) + + filters = { + "since": int( + time.mktime( + (datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple() + ) + ) + } t = threading.Thread( target=sender_client.get_post, args=( to_pubk, callback, + filters, ), ) t.start() diff --git a/nostr/client/client.py b/nostr/client/client.py index 8cceb42..86501b9 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -21,10 +21,10 @@ from . import cbc class NostrClient: relays = [ - # "wss://lnbits.link/nostrrelay/client" + "wss://lnbits.link/nostrrelay/client", "wss://nostr-pub.wellorder.net", - # "wss://nostr.zebedee.cloud", - # "wss://no.str.cr", + "wss://nostr.zebedee.cloud", + "wss://no.str.cr", ] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr" relay_manager = RelayManager() private_key: PrivateKey @@ -44,6 +44,7 @@ class NostrClient: self.relay_manager.open_connections( {"cert_reqs": ssl.CERT_NONE} ) # NOTE: This disables ssl certificate verification + self.relay_manager.start_message_workers() def close(self): self.relay_manager.close_connections() @@ -52,10 +53,6 @@ class NostrClient: pk = bytes.fromhex(privatekey_hex) if privatekey_hex else None self.private_key = PrivateKey(pk) self.public_key = self.private_key.public_key - # print( - # f"Nostr private key: {self.private_key.hex()} ({self.private_key.bech32()})" - # ) - # print(f"Nostr public key: {self.public_key.hex()} ({self.public_key.bech32()})") def post(self, message: str): event = Event(message, self.public_key.hex(), kind=EventKind.TEXT_NOTE) @@ -65,18 +62,21 @@ class NostrClient: # print(event_json) self.relay_manager.publish_message(event_json) - def get_post(self, sender_publickey: PublicKey, callback_func=None): - filters = Filters( - [Filter(authors=[sender_publickey.hex()], kinds=[EventKind.TEXT_NOTE])] + def get_post( + self, sender_publickey: PublicKey = None, callback_func=None, filter_kwargs={} + ): + filter = Filter( + authors=[sender_publickey.hex()] if sender_publickey else None, + kinds=[EventKind.TEXT_NOTE], + **filter_kwargs, ) + filters = Filters([filter]) subscription_id = os.urandom(4).hex() self.relay_manager.add_subscription(subscription_id, filters) request = [ClientMessageType.REQUEST, subscription_id] request.extend(filters.to_json_array()) message = json.dumps(request) - # print("Subscribing to events:") - # print(message) self.relay_manager.publish_message(message) while True: @@ -108,8 +108,6 @@ class NostrClient: request = [ClientMessageType.REQUEST, subscription_id] request.extend(filters.to_json_array()) message = json.dumps(request) - # print("Subscribing to events:") - # print(message) self.relay_manager.publish_message(message) while True: @@ -120,21 +118,15 @@ class NostrClient: shared_secret = self.private_key.compute_shared_secret( event_msg.event.public_key ) - # print("shared secret: ", shared_secret.hex()) - # print("plain text:", message) aes = cbc.AESCipher(key=shared_secret) enc_text_b64, iv_b64 = event_msg.event.content.split("?iv=") iv = base64.decodebytes(iv_b64.encode("utf-8")) enc_text = base64.decodebytes(enc_text_b64.encode("utf-8")) - # print("decrypt iv: ", iv) dec_text = aes.decrypt(iv, enc_text) - # print(f"From {event_msg.event.public_key[:5]}...: {dec_text}") if callback_func: callback_func(event_msg.event, dec_text) except: pass - # else: - # print(f"\nFrom {event_msg.event.public_key[:5]}...: {event_msg.event.content}") break time.sleep(0.1) diff --git a/nostr/relay.py b/nostr/relay.py index cc3d017..bccbcad 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -1,5 +1,6 @@ import json import time +from queue import Queue from threading import Lock from websocket import WebSocketApp from .event import Event @@ -34,15 +35,21 @@ class Relay: self.reconnect: bool = True self.error_counter: int = 0 self.error_threshold: int = 0 + self.num_received_events: int = 0 + self.num_sent_events: int = 0 + self.num_subscriptions: int = 0 self.ssl_options: dict = {} self.proxy: dict = {} self.lock = Lock() + self.queue = Queue() self.ws = WebSocketApp( url, on_open=self._on_open, on_message=self._on_message, on_error=self._on_error, on_close=self._on_close, + on_ping=self._on_ping, + on_pong=self._on_pong, ) def connect(self, ssl_options: dict = None, proxy: dict = None): @@ -53,6 +60,7 @@ class Relay: http_proxy_host=None if proxy is None else proxy.get("host"), http_proxy_port=None if proxy is None else proxy.get("port"), proxy_type=None if proxy is None else proxy.get("type"), + ping_interval=5, ) def close(self): @@ -68,9 +76,24 @@ class Relay: time.sleep(1) self.connect(self.ssl_options, self.proxy) - def publish(self, message: str): + @property + def ping(self): if self.connected: - self.ws.send(message) + return int((self.ws.last_pong_tm - self.ws.last_ping_tm) * 1000) + else: + return 0 + + def publish(self, message: str): + self.queue.put(message) + + def queue_worker(self): + while True: + if self.connected: + message = self.queue.get() + self.num_sent_events += 1 + self.ws.send(message) + else: + time.sleep(0.1) def add_subscription(self, id, filters: Filters): with self.lock: @@ -105,6 +128,7 @@ class Relay: def _on_message(self, class_obj, message: str): if self._is_valid_message(message): + self.num_received_events += 1 self.message_pool.add_message(message, self.url) def _on_error(self, class_obj, error): @@ -115,6 +139,12 @@ class Relay: else: self.check_reconnect() + def _on_ping(self, class_obj, message): + return + + def _on_pong(self, class_obj, message): + return + def _is_valid_message(self, message: str) -> bool: message = message.strip("\n") if not message or message[0] != "[" or message[-1] != "]": diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index c2ac29b..44d9e7b 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -43,6 +43,13 @@ class RelayManager: name=f"{relay.url}-thread", ).start() + def start_message_workers(self): + for relay in self.relays.values(): + threading.Thread( + target=relay.queue_worker, + name=f"{relay.url}-queue", + ).start() + def close_connections(self): for relay in self.relays.values(): relay.close()