add EncryptedDirectMessage class; simplify Event class (#39)

This commit is contained in:
kdmukai
2023-02-04 08:50:48 -06:00
committed by GitHub
parent bda320f6d6
commit 69ff17b163
5 changed files with 289 additions and 38 deletions

View File

@@ -11,6 +11,7 @@ public_key = private_key.public_key
print(f"Private key: {private_key.bech32()}") print(f"Private key: {private_key.bech32()}")
print(f"Public key: {public_key.bech32()}") print(f"Public key: {public_key.bech32()}")
``` ```
**Connect to relays** **Connect to relays**
```python ```python
import json import json
@@ -30,6 +31,7 @@ while relay_manager.message_pool.has_notices():
relay_manager.close_connections() relay_manager.close_connections()
``` ```
**Publish to relays** **Publish to relays**
```python ```python
import json import json
@@ -48,7 +50,7 @@ time.sleep(1.25) # allow the connections to open
private_key = PrivateKey() private_key = PrivateKey()
event = Event(private_key.public_key.hex(), "Hello Nostr") event = Event("Hello Nostr")
private_key.sign_event(event) private_key.sign_event(event)
relay_manager.publish_event(event) relay_manager.publish_event(event)
@@ -56,6 +58,38 @@ time.sleep(1) # allow the messages to send
relay_manager.close_connections() relay_manager.close_connections()
``` ```
**Reply to a note**
```python
from nostr.event import Event
reply = Event(
content="Hey, that's a great point!",
)
# create 'e' tag reference to the note you're replying to
reply.add_event_ref(original_note_id)
# create 'p' tag reference to the pubkey you're replying to
reply.add_pubkey_ref(original_note_author_pubkey)
private_key.sign_event(reply)
relay_manager.publish_event(reply)
```
**Send a DM**
```python
from nostr.event import EncryptedDirectMessage
dm = EncryptedDirectMessage(
recipient_pubkey=recipient_pubkey,
cleartext_content="Secret message!"
)
private_key.sign_event(dm)
relay_manager.publish_event(dm)
```
**Receive events from relays** **Receive events from relays**
```python ```python
import json import json
@@ -112,7 +146,6 @@ delegation = Delegation(
identity_pk.sign_delegation(delegation) identity_pk.sign_delegation(delegation)
event = Event( event = Event(
delegatee_pk.public_key.hex(),
"Hello, NIP-26!", "Hello, NIP-26!",
tags=[delegation.get_tag()], tags=[delegation.get_tag()],
) )

View File

@@ -1,12 +1,15 @@
import time import time
import json import json
from dataclasses import dataclass, field
from enum import IntEnum from enum import IntEnum
from typing import List
from secp256k1 import PrivateKey, PublicKey from secp256k1 import PrivateKey, PublicKey
from hashlib import sha256 from hashlib import sha256
from nostr.message_type import ClientMessageType from nostr.message_type import ClientMessageType
class EventKind(IntEnum): class EventKind(IntEnum):
SET_METADATA = 0 SET_METADATA = 0
TEXT_NOTE = 1 TEXT_NOTE = 1
@@ -16,41 +19,58 @@ class EventKind(IntEnum):
DELETE = 5 DELETE = 5
class Event():
def __init__( @dataclass
self, class Event:
public_key: str, content: str = None
content: str, public_key: str = None
created_at: int = None, created_at: int = None
kind: int=EventKind.TEXT_NOTE, kind: int = EventKind.TEXT_NOTE
tags: "list[list[str]]"=[], tags: List[List[str]] = field(default_factory=list) # Dataclasses require special handling when the default value is a mutable type
id: str=None, signature: str = None
signature: str=None) -> None:
if not isinstance(content, str):
def __post_init__(self):
if self.content is not None and not isinstance(self.content, str):
# DMs initialize content to None but all other kinds should pass in a str
raise TypeError("Argument 'content' must be of type str") raise TypeError("Argument 'content' must be of type str")
self.public_key = public_key if self.created_at is None:
self.content = content self.created_at = int(time.time())
self.created_at = created_at or int(time.time())
self.kind = kind
self.tags = tags
self.signature = signature
self.id = id or Event.compute_id(self.public_key, self.created_at, self.kind, self.tags, self.content)
@staticmethod @staticmethod
def serialize(public_key: str, created_at: int, kind: int, tags: "list[list[str]]", content: str) -> bytes: def serialize(public_key: str, created_at: int, kind: int, tags: List[List[str]], content: str) -> bytes:
data = [0, public_key, created_at, kind, tags, content] data = [0, public_key, created_at, kind, tags, content]
data_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False) data_str = json.dumps(data, separators=(',', ':'), ensure_ascii=False)
return data_str.encode() return data_str.encode()
@staticmethod @staticmethod
def compute_id(public_key: str, created_at: int, kind: int, tags: "list[list[str]]", content: str) -> str: def compute_id(public_key: str, created_at: int, kind: int, tags: List[List[str]], content: str):
return sha256(Event.serialize(public_key, created_at, kind, tags, content)).hexdigest() return sha256(Event.serialize(public_key, created_at, kind, tags, content)).hexdigest()
@property
def id(self) -> str:
# Always recompute the id to reflect the up-to-date state of the Event
return Event.compute_id(self.public_key, self.created_at, self.kind, self.tags, self.content)
def add_pubkey_ref(self, pubkey:str):
""" Adds a reference to a pubkey as a 'p' tag """
self.tags.append(['p', pubkey])
def add_event_ref(self, event_id:str):
""" Adds a reference to an event_id as an 'e' tag """
self.tags.append(['e', event_id])
def verify(self) -> bool: def verify(self) -> bool:
pub_key = PublicKey(bytes.fromhex("02" + self.public_key), True) # add 02 for schnorr (bip340) pub_key = PublicKey(bytes.fromhex("02" + self.public_key), True) # add 02 for schnorr (bip340)
event_id = Event.compute_id(self.public_key, self.created_at, self.kind, self.tags, self.content) return pub_key.schnorr_verify(bytes.fromhex(self.id), bytes.fromhex(self.signature), None, raw=True)
return pub_key.schnorr_verify(bytes.fromhex(event_id), bytes.fromhex(self.signature), None, raw=True)
def to_message(self) -> str: def to_message(self) -> str:
return json.dumps( return json.dumps(
@@ -67,3 +87,37 @@ class Event():
} }
] ]
) )
@dataclass
class EncryptedDirectMessage(Event):
recipient_pubkey: str = None
cleartext_content: str = None
reference_event_id: str = None
def __post_init__(self):
if self.content is not None:
self.cleartext_content = self.content
self.content = None
if self.recipient_pubkey is None:
raise Exception("Must specify a recipient_pubkey.")
self.kind = EventKind.ENCRYPTED_DIRECT_MESSAGE
super().__post_init__()
# Must specify the DM recipient's pubkey in a 'p' tag
self.add_pubkey_ref(self.recipient_pubkey)
# Optionally specify a reference event (DM) this is a reply to
if self.reference_event_id is not None:
self.add_event_ref(self.reference_event_id)
@property
def id(self) -> str:
if self.content is None:
raise Exception("EncryptedDirectMessage `id` is undefined until its message is encrypted and stored in the `content` field")
return super().id

View File

@@ -7,7 +7,7 @@ from cryptography.hazmat.primitives import padding
from hashlib import sha256 from hashlib import sha256
from nostr.delegation import Delegation from nostr.delegation import Delegation
from nostr.event import Event from nostr.event import EncryptedDirectMessage, Event, EventKind
from . import bech32 from . import bech32
@@ -77,6 +77,9 @@ class PrivateKey:
encrypted_message = encryptor.update(padded_data) + encryptor.finalize() encrypted_message = encryptor.update(padded_data) + encryptor.finalize()
return f"{base64.b64encode(encrypted_message).decode()}?iv={base64.b64encode(iv).decode()}" return f"{base64.b64encode(encrypted_message).decode()}?iv={base64.b64encode(iv).decode()}"
def encrypt_dm(self, dm: EncryptedDirectMessage) -> None:
dm.content = self.encrypt_message(message=dm.cleartext_content, public_key_hex=dm.recipient_pubkey)
def decrypt_message(self, encoded_message: str, public_key_hex: str) -> str: def decrypt_message(self, encoded_message: str, public_key_hex: str) -> str:
encoded_data = encoded_message.split('?iv=') encoded_data = encoded_message.split('?iv=')
@@ -100,6 +103,10 @@ class PrivateKey:
return sig.hex() return sig.hex()
def sign_event(self, event: Event) -> None: def sign_event(self, event: Event) -> None:
if event.kind == EventKind.ENCRYPTED_DIRECT_MESSAGE and event.content is None:
self.encrypt_dm(event)
if event.public_key is None:
event.public_key = self.public_key.hex()
event.signature = self.sign_message_hash(bytes.fromhex(event.id)) event.signature = self.sign_message_hash(bytes.fromhex(event.id))
def sign_delegation(self, delegation: Delegation) -> None: def sign_delegation(self, delegation: Delegation) -> None:
@@ -108,6 +115,7 @@ class PrivateKey:
def __eq__(self, other): def __eq__(self, other):
return self.raw_secret == other.raw_secret return self.raw_secret == other.raw_secret
def mine_vanity_key(prefix: str = None, suffix: str = None) -> PrivateKey: def mine_vanity_key(prefix: str = None, suffix: str = None) -> PrivateKey:
if prefix is None and suffix is None: if prefix is None and suffix is None:
raise ValueError("Expected at least one of 'prefix' or 'suffix' arguments") raise ValueError("Expected at least one of 'prefix' or 'suffix' arguments")
@@ -122,6 +130,7 @@ def mine_vanity_key(prefix: str = None, suffix: str = None) -> PrivateKey:
return sk return sk
ffi = FFI() ffi = FFI()
@ffi.callback("int (unsigned char *, const unsigned char *, const unsigned char *, void *)") @ffi.callback("int (unsigned char *, const unsigned char *, const unsigned char *, void *)")
def copy_x(output, x32, y32, data): def copy_x(output, x32, y32, data):

View File

@@ -1,14 +1,93 @@
from nostr.event import Event import pytest
from nostr.key import PrivateKey
import time import time
from nostr.event import Event, EncryptedDirectMessage
from nostr.key import PrivateKey
def test_event_default_time():
"""
ensure created_at default value reflects the time at Event object instantiation class TestEvent:
see: https://github.com/jeffthibault/python-nostr/issues/23 def test_event_default_time(self):
""" """
public_key = PrivateKey().public_key.hex() ensure created_at default value reflects the time at Event object instantiation
event1 = Event(public_key=public_key, content='test event') see: https://github.com/jeffthibault/python-nostr/issues/23
time.sleep(1.5) """
event2 = Event(public_key=public_key, content='test event') event1 = Event(content='test event')
assert event1.created_at < event2.created_at time.sleep(1.5)
event2 = Event(content='test event')
assert event1.created_at < event2.created_at
def test_content_only_instantiation(self):
""" should be able to create an Event by only specifying content without kwarg """
event = Event("Hello, world!")
assert event.content is not None
def test_event_id_recomputes(self):
""" should recompute the Event.id to reflect the current Event attrs """
event = Event(content="some event")
# id should be computed on the fly
event_id = event.id
event.created_at += 10
# Recomputed id should now be different
assert event.id != event_id
def test_add_event_ref(self):
""" should add an 'e' tag for each event_ref added """
some_event_id = "some_event_id"
event = Event(content="Adding an 'e' tag")
event.add_event_ref(some_event_id)
assert ['e', some_event_id] in event.tags
def test_add_pubkey_ref(self):
""" should add a 'p' tag for each pubkey_ref added """
some_pubkey = "some_pubkey"
event = Event(content="Adding a 'p' tag")
event.add_pubkey_ref(some_pubkey)
assert ['p', some_pubkey] in event.tags
class TestEncryptedDirectMessage:
def setup_class(self):
self.sender_pk = PrivateKey()
self.sender_pubkey = self.sender_pk.public_key.hex()
self.recipient_pk = PrivateKey()
self.recipient_pubkey = self.recipient_pk.public_key.hex()
def test_content_field_moved_to_cleartext_content(self):
""" Should transfer `content` field data to `cleartext_content` """
dm = EncryptedDirectMessage(content="My message!", recipient_pubkey=self.recipient_pubkey)
assert dm.content is None
assert dm.cleartext_content is not None
def test_nokwarg_content_allowed(self):
""" Should allow creating a new DM w/no `content` nor `cleartext_content` kwarg """
dm = EncryptedDirectMessage("My message!", recipient_pubkey=self.recipient_pubkey)
assert dm.cleartext_content is not None
def test_recipient_p_tag(self):
""" Should generate recipient 'p' tag """
dm = EncryptedDirectMessage(cleartext_content="Secret message!", recipient_pubkey=self.recipient_pubkey)
assert ['p', self.recipient_pubkey] in dm.tags
def test_unencrypted_dm_has_undefined_id(self):
""" Should raise Exception if `id` is requested before DM is encrypted """
dm = EncryptedDirectMessage(cleartext_content="My message!", recipient_pubkey=self.recipient_pubkey)
with pytest.raises(Exception) as e:
dm.id
assert "undefined" in str(e)
# But once we encrypt it, we can request its id
self.sender_pk.encrypt_dm(dm)
assert dm.id is not None

View File

@@ -1,3 +1,4 @@
from nostr.event import Event, EncryptedDirectMessage
from nostr.key import PrivateKey from nostr.key import PrivateKey
@@ -21,3 +22,78 @@ def test_from_nsec():
pk1 = PrivateKey() pk1 = PrivateKey()
pk2 = PrivateKey.from_nsec(pk1.bech32()) pk2 = PrivateKey.from_nsec(pk1.bech32())
assert pk1.raw_secret == pk2.raw_secret assert pk1.raw_secret == pk2.raw_secret
class TestEvent:
def setup_class(self):
self.sender_pk = PrivateKey()
self.sender_pubkey = self.sender_pk.public_key.hex()
def test_sign_event_is_valid(self):
""" sign should create a signature that can be verified against Event.id """
event = Event(content="Hello, world!")
self.sender_pk.sign_event(event)
assert event.verify()
def test_sign_event_adds_pubkey(self):
""" sign should add the sender's pubkey if not already specified """
event = Event(content="Hello, world!")
# The event's public_key hasn't been specified yet
assert event.public_key is None
self.sender_pk.sign_event(event)
# PrivateKey.sign() should have populated public_key
assert event.public_key == self.sender_pubkey
class TestEncryptedDirectMessage:
def setup_class(self):
self.sender_pk = PrivateKey()
self.sender_pubkey = self.sender_pk.public_key.hex()
self.recipient_pk = PrivateKey()
self.recipient_pubkey = self.recipient_pk.public_key.hex()
def test_encrypt_dm(self):
""" Should encrypt a DM and populate its `content` field with ciphertext that either party can decrypt """
message = "My secret message!"
dm = EncryptedDirectMessage(
recipient_pubkey=self.recipient_pubkey,
cleartext_content=message,
)
# DM's content field should be initially blank
assert dm.content is None
self.sender_pk.encrypt_dm(dm)
# After encrypting, the content field should now be populated
assert dm.content is not None
# Sender should be able to decrypt
decrypted_message = self.sender_pk.decrypt_message(encoded_message=dm.content, public_key_hex=self.recipient_pubkey)
assert decrypted_message == message
# Recipient should be able to decrypt by referencing the sender's pubkey
decrypted_message = self.recipient_pk.decrypt_message(encoded_message=dm.content, public_key_hex=self.sender_pubkey)
assert decrypted_message == message
def test_sign_encrypts_dm(self):
""" `sign` should encrypt a DM that hasn't been encrypted yet """
dm = EncryptedDirectMessage(
recipient_pubkey=self.recipient_pubkey,
cleartext_content="Some DM message",
)
assert dm.content is None
self.sender_pk.sign_event(dm)
assert dm.content is not None