From 76815b77eed230affc8dbd9c6a90a9663484e864 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Tue, 31 Jan 2023 16:21:21 +0100 Subject: [PATCH 1/9] count events --- nostr/relay.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/nostr/relay.py b/nostr/relay.py index da7ecd6..12be651 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -34,6 +34,10 @@ class Relay: self.reconnect: bool = True self.error_counter: int = 0 self.error_threshold: int = 0 + self.num_received_events: int = 0 + self.num_sent_events: int = 0 + self.num_subscriptions: int = 0 + self.ping: int = 0 self.ssl_options: dict = {} self.lock = Lock() self.ws = WebSocketApp( @@ -63,6 +67,7 @@ class Relay: def publish(self, message: str): if self.connected: + self.num_sent_events += 1 self.ws.send(message) def add_subscription(self, id, filters: Filters): @@ -98,6 +103,7 @@ class Relay: def _on_message(self, class_obj, message: str): if self._is_valid_message(message): + self.num_received_events += 1 self.message_pool.add_message(message, self.url) def _on_error(self, class_obj, error): From 1d6ee33aa4669b786bedb086451f690e54f9df58 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Wed, 1 Feb 2023 14:37:22 +0100 Subject: [PATCH 2/9] subscribe global --- main.py | 15 ++++++++++----- nostr/client/client.py | 8 +++++--- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/main.py b/main.py index 4a17436..4b6b75f 100644 --- a/main.py +++ b/main.py @@ -53,11 +53,16 @@ async def post(): await asyncio.sleep(1) to_pubk_hex = ( - input("Enter other pubkey (enter nothing to read your own posts): ") + input( + "Enter other pubkey (enter nothing to read your own posts, enter * for all): " + ) or sender_client.public_key.hex() ) - print(f"Subscribing to posts by {to_pubk_hex}") - to_pubk = PublicKey(bytes.fromhex(to_pubk_hex)) + if to_pubk_hex == "*": + to_pubk = None + else: + print(f"Subscribing to posts by {to_pubk_hex}") + to_pubk = PublicKey(bytes.fromhex(to_pubk_hex)) t = threading.Thread( target=sender_client.get_post, @@ -74,7 +79,7 @@ async def post(): # write a DM and receive DMs -asyncio.run(dm()) +# asyncio.run(dm()) # make a post and subscribe to posts -# asyncio.run(post()) +asyncio.run(post()) diff --git a/nostr/client/client.py b/nostr/client/client.py index 11119ed..e27bc04 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -62,10 +62,12 @@ class NostrClient: # print(message) self.relay_manager.publish_message(message) - def get_post(self, sender_publickey: PublicKey, callback_func=None): - filters = Filters( - [Filter(authors=[sender_publickey.hex()], kinds=[EventKind.TEXT_NOTE])] + def get_post(self, sender_publickey: PublicKey = None, callback_func=None): + filter = Filter( + authors=[sender_publickey.hex()] if sender_publickey else None, + kinds=[EventKind.TEXT_NOTE], ) + filters = Filters([filter]) subscription_id = os.urandom(4).hex() self.relay_manager.add_subscription(subscription_id, filters) From 36768d323659c112e8544255a950fea5715c936f Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Wed, 1 Feb 2023 15:10:20 +0100 Subject: [PATCH 3/9] add filter and ping --- main.py | 20 ++++++++++++++++++++ nostr/client/client.py | 41 +++++++++++------------------------------ nostr/relay.py | 6 ++++++ 3 files changed, 37 insertions(+), 30 deletions(-) diff --git a/main.py b/main.py index 4b6b75f..56885bb 100644 --- a/main.py +++ b/main.py @@ -3,6 +3,8 @@ from nostr.event import Event from nostr.key import PublicKey import asyncio import threading +import time +import datetime async def dm(): @@ -20,11 +22,20 @@ async def dm(): client = NostrClient(privatekey_hex=pk) await asyncio.sleep(1) + filters = { + "since": int( + time.mktime( + (datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple() + ) + ) + } + t = threading.Thread( target=client.get_dm, args=( client.public_key, callback, + filters, ), ) t.start() @@ -64,11 +75,20 @@ async def post(): print(f"Subscribing to posts by {to_pubk_hex}") to_pubk = PublicKey(bytes.fromhex(to_pubk_hex)) + filters = { + "since": int( + time.mktime( + (datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple() + ) + ) + } + t = threading.Thread( target=sender_client.get_post, args=( to_pubk, callback, + filters, ), ) t.start() diff --git a/nostr/client/client.py b/nostr/client/client.py index e27bc04..874c97c 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -23,6 +23,7 @@ class NostrClient: relays = [ "wss://nostr-pub.wellorder.net", "wss://nostr.zebedee.cloud", + "nostr.mom", # "wss://no.str.cr", ] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr" relay_manager = RelayManager() @@ -49,23 +50,20 @@ class NostrClient: pk = bytes.fromhex(privatekey_hex) if privatekey_hex else None self.private_key = PrivateKey(pk) self.public_key = self.private_key.public_key - # print( - # f"Nostr private key: {self.private_key.hex()} ({self.private_key.bech32()})" - # ) - # print(f"Nostr public key: {self.public_key.hex()} ({self.public_key.bech32()})") def post(self, message: str): event = Event(self.public_key.hex(), message, kind=EventKind.TEXT_NOTE) event.sign(self.private_key.hex()) message = json.dumps([ClientMessageType.EVENT, event.to_json_object()]) - # print("Publishing message:") - # print(message) self.relay_manager.publish_message(message) - def get_post(self, sender_publickey: PublicKey = None, callback_func=None): + def get_post( + self, sender_publickey: PublicKey = None, callback_func=None, filter_kwargs={} + ): filter = Filter( authors=[sender_publickey.hex()] if sender_publickey else None, kinds=[EventKind.TEXT_NOTE], + **filter_kwargs, ) filters = Filters([filter]) subscription_id = os.urandom(4).hex() @@ -87,14 +85,9 @@ class NostrClient: time.sleep(0.1) def dm(self, message: str, to_pubkey: PublicKey): - shared_secret = self.private_key.compute_shared_secret(to_pubkey.hex()) - - # print("shared secret: ", shared_secret.hex()) - # print("plain text:", message) aes = cbc.AESCipher(key=shared_secret) iv, enc_text = aes.encrypt(message) - # print("encrypt iv: ", iv) content = f"{base64.b64encode(enc_text).decode('utf-8')}?iv={base64.b64encode(iv).decode('utf-8')}" event = Event( @@ -105,29 +98,23 @@ class NostrClient: ) event.sign(self.private_key.hex()) event_message = json.dumps([ClientMessageType.EVENT, event.to_json_object()]) - # print("DM message:") - # print(event_message) time.sleep(1) self.relay_manager.publish_message(event_message) - def get_dm(self, sender_publickey: PublicKey, callback_func=None): - filters = Filters( - [ - Filter( - kinds=[EventKind.ENCRYPTED_DIRECT_MESSAGE], - tags={"#p": [sender_publickey.hex()]}, - ) - ] + def get_dm(self, sender_publickey: PublicKey, callback_func=None, filter_kwargs={}): + filter = Filter( + kinds=[EventKind.ENCRYPTED_DIRECT_MESSAGE], + tags={"#p": [sender_publickey.hex()]}, + **filter_kwargs, ) + filters = Filters([filter]) subscription_id = os.urandom(4).hex() self.relay_manager.add_subscription(subscription_id, filters) request = [ClientMessageType.REQUEST, subscription_id] request.extend(filters.to_json_array()) message = json.dumps(request) - # print("Subscribing to events:") - # print(message) self.relay_manager.publish_message(message) while True: @@ -138,21 +125,15 @@ class NostrClient: shared_secret = self.private_key.compute_shared_secret( event_msg.event.public_key ) - # print("shared secret: ", shared_secret.hex()) - # print("plain text:", message) aes = cbc.AESCipher(key=shared_secret) enc_text_b64, iv_b64 = event_msg.event.content.split("?iv=") iv = base64.decodebytes(iv_b64.encode("utf-8")) enc_text = base64.decodebytes(enc_text_b64.encode("utf-8")) - # print("decrypt iv: ", iv) dec_text = aes.decrypt(iv, enc_text) - # print(f"From {event_msg.event.public_key[:5]}...: {dec_text}") if callback_func: callback_func(event_msg.event, dec_text) except: pass - # else: - # print(f"\nFrom {event_msg.event.public_key[:5]}...: {event_msg.event.content}") break time.sleep(0.1) diff --git a/nostr/relay.py b/nostr/relay.py index 12be651..8265ac6 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -65,6 +65,12 @@ class Relay: time.sleep(1) self.connect(self.ssl_options) + def get_pint(self): + if self.connected: + return int(self.ws.last_ping_tm - self.ws.last_pong_tm) + else: + return 0 + def publish(self, message: str): if self.connected: self.num_sent_events += 1 From c22a3e0c435f83f16b4dd5699402c788eb31ab5e Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Wed, 1 Feb 2023 15:12:17 +0100 Subject: [PATCH 4/9] rename to ping --- nostr/relay.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/nostr/relay.py b/nostr/relay.py index 8265ac6..10d1da6 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -37,7 +37,6 @@ class Relay: self.num_received_events: int = 0 self.num_sent_events: int = 0 self.num_subscriptions: int = 0 - self.ping: int = 0 self.ssl_options: dict = {} self.lock = Lock() self.ws = WebSocketApp( @@ -65,7 +64,7 @@ class Relay: time.sleep(1) self.connect(self.ssl_options) - def get_pint(self): + def ping(self): if self.connected: return int(self.ws.last_ping_tm - self.ws.last_pong_tm) else: From 0c9dcc7070dc69979d77679e9383c2aa1df09f02 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Wed, 1 Feb 2023 15:14:47 +0100 Subject: [PATCH 5/9] enable ping --- nostr/relay.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/nostr/relay.py b/nostr/relay.py index 10d1da6..7639c1c 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -49,7 +49,7 @@ class Relay: def connect(self, ssl_options: dict = {}): self.ssl_options = ssl_options - self.ws.run_forever(sslopt=self.ssl_options) + self.ws.run_forever(sslopt=self.ssl_options, ping_interval=2) def close(self): self.ws.close() From f9e1a536099021e8d68a2dc9e5c8701fe33bb2b0 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Wed, 1 Feb 2023 15:21:22 +0100 Subject: [PATCH 6/9] ping is a property --- nostr/relay.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nostr/relay.py b/nostr/relay.py index 7639c1c..0e28d49 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -64,6 +64,7 @@ class Relay: time.sleep(1) self.connect(self.ssl_options) + @property def ping(self): if self.connected: return int(self.ws.last_ping_tm - self.ws.last_pong_tm) From fa802d78628f1b8696b56f109218fa5b90394aa9 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Wed, 1 Feb 2023 15:48:23 +0100 Subject: [PATCH 7/9] verbose stuff --- main.py | 6 ++---- nostr/client/client.py | 8 +++----- nostr/relay.py | 17 ++++++++++++++++- 3 files changed, 21 insertions(+), 10 deletions(-) diff --git a/main.py b/main.py index 56885bb..050d973 100644 --- a/main.py +++ b/main.py @@ -20,7 +20,6 @@ async def dm(): ) client = NostrClient(privatekey_hex=pk) - await asyncio.sleep(1) filters = { "since": int( @@ -61,7 +60,6 @@ async def post(): print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}") sender_client = NostrClient(privatekey_hex=pk) - await asyncio.sleep(1) to_pubk_hex = ( input( @@ -99,7 +97,7 @@ async def post(): # write a DM and receive DMs -# asyncio.run(dm()) +asyncio.run(dm()) # make a post and subscribe to posts -asyncio.run(post()) +# asyncio.run(post()) diff --git a/nostr/client/client.py b/nostr/client/client.py index 874c97c..a8836f7 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -21,9 +21,11 @@ from . import cbc class NostrClient: relays = [ + "wss://relay.snort.social", "wss://nostr-pub.wellorder.net", "wss://nostr.zebedee.cloud", - "nostr.mom", + "wss://nostr.mom", + # "wss://wss://lnbits.link/nostrrelay/client" # "wss://no.str.cr", ] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr" relay_manager = RelayManager() @@ -72,8 +74,6 @@ class NostrClient: request = [ClientMessageType.REQUEST, subscription_id] request.extend(filters.to_json_array()) message = json.dumps(request) - # print("Subscribing to events:") - # print(message) self.relay_manager.publish_message(message) while True: @@ -98,8 +98,6 @@ class NostrClient: ) event.sign(self.private_key.hex()) event_message = json.dumps([ClientMessageType.EVENT, event.to_json_object()]) - - time.sleep(1) self.relay_manager.publish_message(event_message) def get_dm(self, sender_publickey: PublicKey, callback_func=None, filter_kwargs={}): diff --git a/nostr/relay.py b/nostr/relay.py index 0e28d49..98012b3 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -45,13 +45,17 @@ class Relay: on_message=self._on_message, on_error=self._on_error, on_close=self._on_close, + on_ping=self._on_ping, + on_pong=self._on_pong, ) def connect(self, ssl_options: dict = {}): self.ssl_options = ssl_options - self.ws.run_forever(sslopt=self.ssl_options, ping_interval=2) + print(self.url, "🟢") + self.ws.run_forever(sslopt=self.ssl_options) def close(self): + print(self.url, "🔴") self.ws.close() def check_reconnect(self): @@ -111,8 +115,11 @@ class Relay: if self._is_valid_message(message): self.num_received_events += 1 self.message_pool.add_message(message, self.url) + else: + print(self.url, "invalid message", message) def _on_error(self, class_obj, error): + print(self.url, "🚫", error) self.connected = False self.error_counter += 1 if self.error_threshold and self.error_counter > self.error_threshold: @@ -120,6 +127,14 @@ class Relay: else: self.check_reconnect() + def _on_ping(self, class_obj, message): + print(self.url, "ping", message) + return + + def _on_pong(self, class_obj, message): + print(self.url, "pong", message) + return + def _is_valid_message(self, message: str) -> bool: message = message.strip("\n") if not message or message[0] != "[" or message[-1] != "]": From 1812b0128f230a7b8be5df48ff7077edc6b64484 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Tue, 7 Feb 2023 23:05:32 +0100 Subject: [PATCH 8/9] message queues --- main.py | 4 ++-- nostr/client/client.py | 8 ++++---- nostr/relay.py | 13 +++++++++++-- nostr/relay_manager.py | 7 +++++++ 4 files changed, 24 insertions(+), 8 deletions(-) diff --git a/main.py b/main.py index 06bab60..c506783 100644 --- a/main.py +++ b/main.py @@ -18,7 +18,7 @@ async def dm(): ) client = NostrClient(privatekey_hex=pk) - await asyncio.sleep(1) + # await asyncio.sleep(1) t = threading.Thread( target=client.get_dm, @@ -50,7 +50,7 @@ async def post(): print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}") sender_client = NostrClient(privatekey_hex=pk) - await asyncio.sleep(1) + # await asyncio.sleep(1) to_pubk_hex = ( input("Enter other pubkey (enter nothing to read your own posts): ") diff --git a/nostr/client/client.py b/nostr/client/client.py index 8cceb42..16e3c64 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -21,10 +21,9 @@ from . import cbc class NostrClient: relays = [ - # "wss://lnbits.link/nostrrelay/client" - "wss://nostr-pub.wellorder.net", - # "wss://nostr.zebedee.cloud", - # "wss://no.str.cr", + "wss://lnbits.link/nostrrelay/client" "wss://nostr-pub.wellorder.net", + "wss://nostr.zebedee.cloud", + "wss://no.str.cr", ] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr" relay_manager = RelayManager() private_key: PrivateKey @@ -44,6 +43,7 @@ class NostrClient: self.relay_manager.open_connections( {"cert_reqs": ssl.CERT_NONE} ) # NOTE: This disables ssl certificate verification + self.relay_manager.start_message_workers() def close(self): self.relay_manager.close_connections() diff --git a/nostr/relay.py b/nostr/relay.py index cc3d017..e47a17e 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -1,5 +1,6 @@ import json import time +from queue import Queue from threading import Lock from websocket import WebSocketApp from .event import Event @@ -37,6 +38,7 @@ class Relay: self.ssl_options: dict = {} self.proxy: dict = {} self.lock = Lock() + self.queue = Queue() self.ws = WebSocketApp( url, on_open=self._on_open, @@ -69,8 +71,15 @@ class Relay: self.connect(self.ssl_options, self.proxy) def publish(self, message: str): - if self.connected: - self.ws.send(message) + self.queue.put(message) + + def queue_worker(self): + while True: + if self.connected: + message = self.queue.get() + self.ws.send(message) + else: + time.sleep(0.1) def add_subscription(self, id, filters: Filters): with self.lock: diff --git a/nostr/relay_manager.py b/nostr/relay_manager.py index c2ac29b..44d9e7b 100644 --- a/nostr/relay_manager.py +++ b/nostr/relay_manager.py @@ -43,6 +43,13 @@ class RelayManager: name=f"{relay.url}-thread", ).start() + def start_message_workers(self): + for relay in self.relays.values(): + threading.Thread( + target=relay.queue_worker, + name=f"{relay.url}-queue", + ).start() + def close_connections(self): for relay in self.relays.values(): relay.close() From 5054fea937ebaf03bb007bf1d0b6de8617b66393 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Tue, 7 Feb 2023 23:11:43 +0100 Subject: [PATCH 9/9] fix filter --- main.py | 1 - nostr/client/client.py | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/main.py b/main.py index 2d895d1..b4fbb93 100644 --- a/main.py +++ b/main.py @@ -34,7 +34,6 @@ async def dm(): args=( client.public_key, callback, - filters, ), ) t.start() diff --git a/nostr/client/client.py b/nostr/client/client.py index 9ae36b5..b7454d8 100644 --- a/nostr/client/client.py +++ b/nostr/client/client.py @@ -21,7 +21,7 @@ from . import cbc class NostrClient: relays = [ - "wss://relay.snort.social", + # "wss://relay.snort.social", "wss://nostr-pub.wellorder.net", "wss://nostr.zebedee.cloud", "wss://nostr.mom", @@ -103,7 +103,6 @@ class NostrClient: ) ] ) - filters = Filters([filter]) subscription_id = os.urandom(4).hex() self.relay_manager.add_subscription(subscription_id, filters)