mirror of
https://github.com/aljazceru/python-nostr.git
synced 2025-12-17 22:34:27 +01:00
119 lines
3.4 KiB
Python
119 lines
3.4 KiB
Python
from nostr.client.client import NostrClient
|
|
from nostr.event import Event
|
|
from nostr.key import PublicKey
|
|
import asyncio
|
|
import threading
|
|
import time
|
|
import datetime
|
|
|
|
|
|
def print_status(client):
|
|
print("")
|
|
for relay in client.relay_manager.relays.values():
|
|
connected_text = "🟢" if relay.connected else "🔴"
|
|
status_text = f"{connected_text} ⬆️ {relay.num_sent_events} ⬇️ {relay.num_received_events} ⚠️ {relay.error_counter} ⏱️ {relay.ping} ms - {relay.url.split('//')[1]}"
|
|
print(status_text)
|
|
|
|
|
|
async def dm():
|
|
print("This is an example NIP-04 DM flow")
|
|
pk = input("Enter your privatekey to post from (enter nothing for a random one): ")
|
|
|
|
def callback(event: Event, decrypted_content):
|
|
"""
|
|
Callback to trigger when a DM is received.
|
|
"""
|
|
print(
|
|
f"\nFrom {event.public_key[:3]}..{event.public_key[-3:]}: {decrypted_content}"
|
|
)
|
|
|
|
client = NostrClient(privatekey_hex=pk)
|
|
if not pk:
|
|
print(f"Your private key: {client.private_key.bech32()}")
|
|
|
|
print(f"Your public key: {client.public_key.bech32()}")
|
|
|
|
t = threading.Thread(
|
|
target=client.get_dm, args=(client.public_key, callback), daemon=True
|
|
)
|
|
t.start()
|
|
|
|
pubkey_to_str = (
|
|
input("Enter other pubkey to DM to (enter nothing to DM yourself): ")
|
|
or client.public_key.hex()
|
|
)
|
|
if pubkey_to_str.startswith("npub"):
|
|
pubkey_to = PublicKey().from_npub(pubkey_to_str)
|
|
else:
|
|
pubkey_to = PublicKey(bytes.fromhex(pubkey_to_str))
|
|
print(f"Sending DMs to {pubkey_to.bech32()}")
|
|
while True:
|
|
print_status(client)
|
|
await asyncio.sleep(1)
|
|
msg = input("\nEnter message: ")
|
|
client.dm(msg, pubkey_to)
|
|
|
|
|
|
async def post():
|
|
print("This posts and reads a nostr note")
|
|
pk = input("Enter your privatekey to post from (enter nothing for a random one): ")
|
|
|
|
def callback(event: Event):
|
|
"""
|
|
Callback to trigger when post appers.
|
|
"""
|
|
print(
|
|
f"\nFrom {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}"
|
|
)
|
|
|
|
sender_client = NostrClient(privatekey_hex=pk)
|
|
# await asyncio.sleep(1)
|
|
|
|
pubkey_to_str = (
|
|
input(
|
|
"Enter other pubkey (enter nothing to read your own posts, enter * for all): "
|
|
)
|
|
or sender_client.public_key.hex()
|
|
)
|
|
if pubkey_to_str == "*":
|
|
pubkey_to = None
|
|
elif pubkey_to_str.startswith("npub"):
|
|
pubkey_to = PublicKey().from_npub(pubkey_to_str)
|
|
else:
|
|
pubkey_to = PublicKey(bytes.fromhex(pubkey_to_str))
|
|
|
|
print(f"Subscribing to posts by {pubkey_to.bech32() if pubkey_to else 'everyone'}")
|
|
|
|
filters = {
|
|
"since": int(
|
|
time.mktime(
|
|
(datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple()
|
|
)
|
|
)
|
|
}
|
|
|
|
t = threading.Thread(
|
|
target=sender_client.get_post,
|
|
args=(
|
|
pubkey_to,
|
|
callback,
|
|
filters,
|
|
),
|
|
daemon=True,
|
|
)
|
|
t.start()
|
|
|
|
while True:
|
|
print_status(sender_client)
|
|
await asyncio.sleep(1)
|
|
msg = input("\nEnter post: ")
|
|
sender_client.post(msg)
|
|
|
|
|
|
if input("Enter '1' for DM, '2' for Posts (Default: 1): ") == "2":
|
|
# make a post and subscribe to posts
|
|
asyncio.run(post())
|
|
else:
|
|
# write a DM and receive DMs
|
|
asyncio.run(dm())
|