Merge branch 'main' into feat/count_events

This commit is contained in:
callebtc
2023-02-02 16:38:51 +01:00
18 changed files with 899 additions and 88 deletions

6
.gitignore vendored
View File

@@ -1,2 +1,6 @@
venv/
nostr/__pycache__/
__pycache__/
nostr.egg-info/
dist/
nostr/_version.py
.DS_Store

View File

@@ -49,10 +49,9 @@ time.sleep(1.25) # allow the connections to open
private_key = PrivateKey()
event = Event(private_key.public_key.hex(), "Hello Nostr")
event.sign(private_key.hex())
private_key.sign_event(event)
message = json.dumps([ClientMessageType.EVENT, event.to_json_object()])
relay_manager.publish_message(message)
relay_manager.publish_event(event)
time.sleep(1) # allow the messages to send
relay_manager.close_connections()
@@ -90,34 +89,56 @@ while relay_manager.message_pool.has_events():
relay_manager.close_connections()
```
**NIP-26 delegation**
```python
from nostr.delegation import Delegation
from nostr.event import EventKind, Event
from nostr.key import PrivateKey
# Load your "identity" PK that you'd like to keep safely offline
identity_pk = PrivateKey.from_nsec("nsec1...")
# Create a new, disposable PK as the "delegatee" that can be "hot" in a Nostr client
delegatee_pk = PrivateKey()
# the "identity" PK will authorize "delegatee" to sign TEXT_NOTEs on its behalf for the next month
delegation = Delegation(
delegator_pubkey=identity_pk.public_key.hex(),
delegatee_pubkey=delegatee_pk.public_key.hex(),
event_kind=EventKind.TEXT_NOTE,
duration_secs=30*24*60*60
)
identity_pk.sign_delegation(delegation)
event = Event(
delegatee_pk.public_key.hex(),
"Hello, NIP-26!",
tags=[delegation.get_tag()],
)
delegatee_pk.sign_event(event)
# ...normal broadcast steps...
```
The resulting delegation tag can be stored as plaintext and reused as-is by the "delegatee" PK until the delegation token expires. There is no way to revoke a signed delegation, so current best practice is to keep the expiration time relatively short.
Hopefully clients will include an optional field to store the delegation tag. That would allow the "delegatee" PK to seamlessly post messages on the "identity" key's behalf, while the "identity" key stays safely offline in cold storage.
## Installation
1. Clone repository
```bash
git clone https://github.com/jeffthibault/python-nostr.git
pip install nostr
```
2. Install dependencies in repo
```bash
python -m venv venv
pip install -r requirements.txt
```
Note: If the pip install fails, you might need to install ```wheel```. Try the following:
```bash
pip install wheel
pip install -r requirements.txt
```
## Dependencies
- [websocket-client](https://github.com/websocket-client/websocket-client) for websocket operations
- [secp256k1](https://github.com/rustyrussell/secp256k1-py) for key generation, signing, and verifying
- [cryptography](https://github.com/pyca/cryptography) for encrypting and decrypting direct messages
Note: I wrote this with Python 3.9.5.
## Test Suite
See the [Test Suite README](test/README.md)
## Disclaimer
- This library is in very early development and still a WIP.
- This library is in very early development.
- It might have some bugs.
- I need to add tests.
- I will try to publish this as a [PyPI](https://pypi.org/) package at some point.
- I need to add more tests.
Please feel free to add issues, add PRs, or provide any feedback!

32
nostr/delegation.py Normal file
View File

@@ -0,0 +1,32 @@
import time
from dataclasses import dataclass
@dataclass
class Delegation:
delegator_pubkey: str
delegatee_pubkey: str
event_kind: int
duration_secs: int = 30*24*60 # default to 30 days
signature: str = None # set in PrivateKey.sign_delegation
@property
def expires(self) -> int:
return int(time.time()) + self.duration_secs
@property
def conditions(self) -> str:
return f"kind={self.event_kind}&created_at<{self.expires}"
@property
def delegation_token(self) -> str:
return f"nostr:delegation:{self.delegatee_pubkey}:{self.conditions}"
def get_tag(self) -> list[str]:
""" Called by Event """
return [
"delegation",
self.delegator_pubkey,
self.conditions,
self.signature,
]

View File

@@ -4,6 +4,9 @@ from enum import IntEnum
from secp256k1 import PrivateKey, PublicKey
from hashlib import sha256
from nostr.message_type import ClientMessageType
class EventKind(IntEnum):
SET_METADATA = 0
TEXT_NOTE = 1
@@ -12,26 +15,27 @@ class EventKind(IntEnum):
ENCRYPTED_DIRECT_MESSAGE = 4
DELETE = 5
class Event():
def __init__(
self,
public_key: str,
content: str,
created_at: int=int(time.time()),
created_at: int = None,
kind: int=EventKind.TEXT_NOTE,
tags: "list[list[str]]"=[],
id: str=None,
signature: str=None) -> None:
if not isinstance(content, str):
raise TypeError("Argument 'content' must be of type str")
self.id = id if not id is None else Event.compute_id(public_key, created_at, kind, tags, content)
self.public_key = public_key
self.content = content
self.created_at = created_at
self.created_at = created_at or int(time.time())
self.kind = kind
self.tags = tags
self.signature = signature
self.id = id or Event.compute_id(self.public_key, self.created_at, self.kind, self.tags, self.content)
@staticmethod
def serialize(public_key: str, created_at: int, kind: int, tags: "list[list[str]]", content: str) -> bytes:
@@ -43,23 +47,23 @@ class Event():
def compute_id(public_key: str, created_at: int, kind: int, tags: "list[list[str]]", content: str) -> str:
return sha256(Event.serialize(public_key, created_at, kind, tags, content)).hexdigest()
def sign(self, private_key_hex: str) -> None:
sk = PrivateKey(bytes.fromhex(private_key_hex))
sig = sk.schnorr_sign(bytes.fromhex(self.id), None, raw=True)
self.signature = sig.hex()
def verify(self) -> bool:
pub_key = PublicKey(bytes.fromhex("02" + self.public_key), True) # add 02 for schnorr (bip340)
event_id = Event.compute_id(self.public_key, self.created_at, self.kind, self.tags, self.content)
return pub_key.schnorr_verify(bytes.fromhex(event_id), bytes.fromhex(self.signature), None, raw=True)
def to_json_object(self) -> dict:
return {
"id": self.id,
"pubkey": self.public_key,
"created_at": self.created_at,
"kind": self.kind,
"tags": self.tags,
"content": self.content,
"sig": self.signature
}
def to_message(self) -> str:
return json.dumps(
[
ClientMessageType.EVENT,
{
"id": self.id,
"pubkey": self.public_key,
"created_at": self.created_at,
"kind": self.kind,
"tags": self.tags,
"content": self.content,
"sig": self.signature
}
]
)

View File

@@ -1,9 +1,28 @@
from collections import UserList
from .event import Event
from typing import List
from .event import Event, EventKind
class Filter:
"""
NIP-01 filtering.
Explicitly supports "#e" and "#p" tag filters via `event_refs` and `pubkey_refs`.
Arbitrary NIP-12 single-letter tag filters are also supported via `add_arbitrary_tag`.
If a particular single-letter tag gains prominence, explicit support should be
added. For example:
# arbitrary tag
filter.add_arbitrary_tag('t', [hashtags])
# promoted to explicit support
Filter(hashtag_refs=[hashtags])
"""
def __init__(
<<<<<<< HEAD
self,
ids: "list[str]" = None,
kinds: "list[int]" = None,
@@ -14,58 +33,122 @@ class Filter:
limit: int = None,
) -> None:
self.IDs = ids
=======
self,
event_ids: List[str] = None,
kinds: List[EventKind] = None,
authors: List[str] = None,
since: int = None,
until: int = None,
event_refs: List[str] = None, # the "#e" attr; list of event ids referenced in an "e" tag
pubkey_refs: List[str] = None, # The "#p" attr; list of pubkeys referenced in a "p" tag
limit: int = None) -> None:
self.event_ids = event_ids
>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633
self.kinds = kinds
self.authors = authors
self.since = since
self.until = until
self.tags = tags
self.event_refs = event_refs
self.pubkey_refs = pubkey_refs
self.limit = limit
self.tags = {}
if self.event_refs:
self.add_arbitrary_tag('e', self.event_refs)
if self.pubkey_refs:
self.add_arbitrary_tag('p', self.pubkey_refs)
def add_arbitrary_tag(self, tag: str, values: list):
"""
Filter on any arbitrary tag with explicit handling for NIP-01 and NIP-12
single-letter tags.
"""
# NIP-01 'e' and 'p' tags and any NIP-12 single-letter tags must be prefixed with "#"
tag_key = tag if len(tag) > 1 else f"#{tag}"
self.tags[tag_key] = values
def matches(self, event: Event) -> bool:
if self.IDs != None and event.id not in self.IDs:
if self.event_ids is not None and event.id not in self.event_ids:
return False
if self.kinds != None and event.kind not in self.kinds:
if self.kinds is not None and event.kind not in self.kinds:
return False
if self.authors != None and event.public_key not in self.authors:
if self.authors is not None and event.public_key not in self.authors:
return False
if self.since != None and event.created_at < self.since:
if self.since is not None and event.created_at < self.since:
return False
if self.until != None and event.created_at > self.until:
if self.until is not None and event.created_at > self.until:
return False
if self.tags != None and len(event.tags) == 0:
if (self.event_refs is not None or self.pubkey_refs is not None) and len(event.tags) == 0:
return False
<<<<<<< HEAD
if self.tags != None:
e_tag_identifiers = [e_tag[0] for e_tag in event.tags]
=======
if self.tags:
e_tag_identifiers = set([e_tag[0] for e_tag in event.tags])
>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633
for f_tag, f_tag_values in self.tags.items():
if f_tag[1:] not in e_tag_identifiers:
# Omit any NIP-01 or NIP-12 "#" chars on single-letter tags
f_tag = f_tag.replace("#", "")
if f_tag not in e_tag_identifiers:
# Event is missing a tag type that we're looking for
return False
# Multiple values within f_tag_values are treated as OR search; an Event
# needs to match only one.
# Note: an Event could have multiple entries of the same tag type
# (e.g. a reply to multiple people) so we have to check all of them.
match_found = False
for e_tag in event.tags:
<<<<<<< HEAD
if e_tag[1] not in f_tag_values:
return False
=======
if e_tag[0] == f_tag and e_tag[1] in f_tag_values:
match_found = True
break
if not match_found:
return False
>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633
return True
def to_json_object(self) -> dict:
res = {}
<<<<<<< HEAD
if self.IDs != None:
res["ids"] = self.IDs
if self.kinds != None:
=======
if self.event_ids is not None:
res["ids"] = self.event_ids
if self.kinds is not None:
>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633
res["kinds"] = self.kinds
if self.authors != None:
if self.authors is not None:
res["authors"] = self.authors
if self.since != None:
if self.since is not None:
res["since"] = self.since
if self.until != None:
if self.until is not None:
res["until"] = self.until
if self.tags != None:
for tag, values in self.tags.items():
res[tag] = values
if self.limit != None:
if self.limit is not None:
res["limit"] = self.limit
if self.tags:
res.update(self.tags)
return res
<<<<<<< HEAD
=======
>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633
class Filters(UserList):
def __init__(self, initlist: "list[Filter]" = []) -> None:
super().__init__(initlist)
@@ -78,4 +161,8 @@ class Filters(UserList):
return False
def to_json_array(self) -> list:
<<<<<<< HEAD
return [filter.to_json_object() for filter in self.data]
=======
return [filter.to_json_object() for filter in self.data]
>>>>>>> bda320f6d6d5087fe1afecd122831afe025f7633

View File

@@ -4,8 +4,13 @@ import secp256k1
from cffi import FFI
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
from cryptography.hazmat.primitives import padding
from hashlib import sha256
from nostr.delegation import Delegation
from nostr.event import Event
from . import bech32
class PublicKey:
def __init__(self, raw_bytes: bytes) -> None:
self.raw_bytes = raw_bytes
@@ -21,6 +26,14 @@ class PublicKey:
pk = secp256k1.PublicKey(b"\x02" + self.raw_bytes, True)
return pk.schnorr_verify(bytes.fromhex(hash), bytes.fromhex(sig), None, True)
@classmethod
def from_npub(cls, npub: str):
""" Load a PublicKey from its bech32/npub form """
hrp, data, spec = bech32.bech32_decode(npub)
raw_public_key = bech32.convertbits(data, 5, 8)[:-1]
return cls(bytes(raw_public_key))
class PrivateKey:
def __init__(self, raw_secret: bytes=None) -> None:
if not raw_secret is None:
@@ -31,6 +44,13 @@ class PrivateKey:
sk = secp256k1.PrivateKey(self.raw_secret)
self.public_key = PublicKey(sk.pubkey.serialize()[1:])
@classmethod
def from_nsec(cls, nsec: str):
""" Load a PrivateKey from its bech32/nsec form """
hrp, data, spec = bech32.bech32_decode(nsec)
raw_secret = bech32.convertbits(data, 5, 8)[:-1]
return cls(bytes(raw_secret))
def bech32(self) -> str:
converted_bits = bech32.convertbits(self.raw_secret, 8, 5)
return bech32.bech32_encode("nsec", converted_bits, bech32.Encoding.BECH32)
@@ -79,8 +99,31 @@ class PrivateKey:
sig = sk.schnorr_sign(hash, None, raw=True)
return sig.hex()
def sign_event(self, event: Event) -> None:
event.signature = self.sign_message_hash(bytes.fromhex(event.id))
def sign_delegation(self, delegation: Delegation) -> None:
delegation.signature = self.sign_message_hash(sha256(delegation.delegation_token.encode()).digest())
def __eq__(self, other):
return self.raw_secret == other.raw_secret
def mine_vanity_key(prefix: str = None, suffix: str = None) -> PrivateKey:
if prefix is None and suffix is None:
raise ValueError("Expected at least one of 'prefix' or 'suffix' arguments")
while True:
sk = PrivateKey()
if prefix is not None and not sk.public_key.bech32()[5:5+len(prefix)] == prefix:
continue
if suffix is not None and not sk.public_key.bech32()[-len(suffix):] == suffix:
continue
break
return sk
ffi = FFI()
@ffi.callback("int (unsigned char *, const unsigned char *, const unsigned char *, void *)")
def copy_x(output, x32, y32, data):
ffi.memmove(output, x32, 32)
return 1
return 1

54
nostr/pow.py Normal file
View File

@@ -0,0 +1,54 @@
import time
from .event import Event
from .key import PrivateKey
def zero_bits(b: int) -> int:
n = 0
if b == 0:
return 8
while b >> 1:
b = b >> 1
n += 1
return 7 - n
def count_leading_zero_bits(hex_str: str) -> int:
total = 0
for i in range(0, len(hex_str) - 2, 2):
bits = zero_bits(int(hex_str[i:i+2], 16))
total += bits
if bits != 8:
break
return total
def mine_event(content: str, difficulty: int, public_key: str, kind: int, tags: list=[]) -> Event:
all_tags = [["nonce", "1", str(difficulty)]]
all_tags.extend(tags)
created_at = int(time.time())
event_id = Event.compute_id(public_key, created_at, kind, all_tags, content)
num_leading_zero_bits = count_leading_zero_bits(event_id)
attempts = 1
while num_leading_zero_bits < difficulty:
attempts += 1
all_tags[0][1] = str(attempts)
created_at = int(time.time())
event_id = Event.compute_id(public_key, created_at, kind, all_tags, content)
num_leading_zero_bits = count_leading_zero_bits(event_id)
return Event(public_key, content, created_at, kind, all_tags, event_id)
def mine_key(difficulty: int) -> PrivateKey:
sk = PrivateKey()
num_leading_zero_bits = count_leading_zero_bits(sk.public_key.hex())
while num_leading_zero_bits < difficulty:
sk = PrivateKey()
num_leading_zero_bits = count_leading_zero_bits(sk.public_key.hex())
return sk

View File

@@ -49,10 +49,19 @@ class Relay:
on_pong=self._on_pong,
)
def connect(self, ssl_options: dict = {}):
def connect(self, ssl_options: dict=None, proxy: dict=None):
self.ssl_options = ssl_options
<<<<<<< HEAD
print(self.url, "🟢")
self.ws.run_forever(sslopt=self.ssl_options)
=======
self.ws.run_forever(
sslopt=ssl_options,
http_proxy_host=None if proxy is None else proxy.get('host'),
http_proxy_port=None if proxy is None else proxy.get('port'),
proxy_type=None if proxy is None else proxy.get('type')
)
>>>>>>> main
def close(self):
print(self.url, "🔴")

View File

@@ -1,8 +1,19 @@
import json
import threading
from .event import Event
from .filter import Filters
from .message_pool import MessagePool
from .message_type import ClientMessageType
from .relay import Relay, RelayPolicy
class RelayException(Exception):
pass
class RelayManager:
def __init__(self) -> None:
self.relays: dict[str, Relay] = {}
@@ -24,11 +35,11 @@ class RelayManager:
for relay in self.relays.values():
relay.close_subscription(id)
def open_connections(self, ssl_options: dict=None):
def open_connections(self, ssl_options: dict=None, proxy: dict=None):
for relay in self.relays.values():
threading.Thread(
target=relay.connect,
args=(ssl_options,),
args=(ssl_options, proxy),
name=f"{relay.url}-thread"
).start()
@@ -40,4 +51,13 @@ class RelayManager:
for relay in self.relays.values():
if relay.policy.should_write:
relay.publish(message)
def publish_event(self, event: Event):
""" Verifies that the Event is publishable before submitting it to relays """
if event.signature is None:
raise RelayException(f"Could not publish {event.id}: must be signed")
if not event.verify():
raise RelayException(f"Could not publish {event.id}: failed to verify signature {event.signature}")
self.publish_message(event.to_message())

View File

@@ -1,19 +1,36 @@
[tool.poetry]
name = "python-nostr"
version = "0.1.0"
description = ""
authors = ["Your Name <you@example.com>"]
readme = "README.md"
[tool.poetry.dependencies]
python = "^3.7"
pycryptodomex = "^3.16.0"
websocket-client = "1.3.3"
[tool.poetry.group.dev.dependencies]
black = {version = "^22.12.0", allow-prereleases = true}
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
requires = ["setuptools", "setuptools-scm"]
build-backend = "setuptools.build_meta"
[project]
name = "nostr"
authors = [
{ name="Jeff Thibault", email="jdthibault2@gmail.com" },
]
description = "A Python library for making Nostr clients"
urls = { Homepage = "https://github.com/jeffthibault/python-nostr" }
readme = "README.md"
requires-python = ">3.6.0"
dependencies = [
"cffi>=1.15.0",
"cryptography>=37.0.4",
"pycparser>=2.21",
"secp256k1>=0.14.0",
"websocket-client>=1.3.3",
]
license = {file = "LICENSE"}
classifiers=[
'Operating System :: POSIX :: Linux',
'Operating System :: Microsoft :: Windows',
'Operating System :: MacOS :: MacOS X',
]
dynamic=["version"]
[tool.setuptools_scm]
write_to = "nostr/_version.py"
[project.optional-dependencies]
test = [
"pytest >=7.2.0",
"pytest-cov[all]"
]

View File

@@ -1,5 +0,0 @@
cffi==1.15.0
cryptography==37.0.4
pycparser==2.21
secp256k1==0.14.0
websocket-client==1.3.3

3
setup.py Normal file
View File

@@ -0,0 +1,3 @@
from setuptools import setup
setup()

31
test/README.md Normal file
View File

@@ -0,0 +1,31 @@
# Testing python-nostr
## Set up the test environment
Install the test-runner dependencies:
```
pip3 install -r test/requirements.txt
```
Then make the `nostr` python module visible/importable to the tests by installing the local dev dir as an editable module:
```
# from the repo root
pip3 install -e .
```
## Running the test suite
Run the whole test suite:
```
# from the repo root
pytest
```
Run a specific test file:
```
pytest test/test_this_file.py
```
Run a specific test:
```
pytest test/test_this_file.py::test_this_specific_test
```

1
test/requirements.txt Normal file
View File

@@ -0,0 +1 @@
pytest>=7.2.0

14
test/test_event.py Normal file
View File

@@ -0,0 +1,14 @@
from nostr.event import Event
from nostr.key import PrivateKey
import time
def test_event_default_time():
"""
ensure created_at default value reflects the time at Event object instantiation
see: https://github.com/jeffthibault/python-nostr/issues/23
"""
public_key = PrivateKey().public_key.hex()
event1 = Event(public_key=public_key, content='test event')
time.sleep(1.5)
event2 = Event(public_key=public_key, content='test event')
assert event1.created_at < event2.created_at

423
test/test_filter.py Normal file
View File

@@ -0,0 +1,423 @@
from typing import List
import pytest
from nostr.event import Event, EventKind
from nostr.filter import Filter, Filters
from nostr.key import PrivateKey
class TestFilter:
def setup_class(self):
self.pk1 = PrivateKey()
self.pk2 = PrivateKey()
""" pk1 kicks off a thread and interacts with pk2 """
self.pk1_thread = [
# Note posted by pk1
Event(
public_key=self.pk1.public_key.hex(),
content="pk1's first note!"
),
]
self.pk1_thread.append(
# Note posted by pk2 in response to pk1's note
Event(
public_key=self.pk2.public_key.hex(),
content="Nice to see you here, pk1!",
tags=[
['e', self.pk1_thread[0].id], # Replies reference which note they're directly responding to
['p', self.pk1.public_key.hex()], # Replies reference who they're responding to
],
)
)
self.pk1_thread.append(
# Next response note by pk1 continuing thread with pk2
Event(
public_key=self.pk1.public_key.hex(),
content="Thanks! Glad you're here, too, pk2!",
tags=[
['e', self.pk1_thread[0].id], # Threads reference the original note
['e', self.pk1_thread[-1].id], # Replies reference which note they're directly responding to
['p', self.pk2.public_key.hex()], # Replies reference who they're responding to
],
)
)
""" pk2 starts a new thread but no one responds """
self.pk2_thread = [
# Note posted by pk2
Event(
public_key=self.pk2.public_key.hex(),
content="pk2's first note!"
)
]
self.pk2_thread.append(
# pk2's self-reply
Event(
public_key=self.pk2.public_key.hex(),
content="So... I guess no one's following me.",
tags=[
['e', self.pk2_thread[0].id]
]
)
)
""" pk1 DMs pk2 """
self.pk1_pk2_dms = [
# DM sent by pk1 to pk2
Event(
public_key=self.pk1.public_key.hex(),
content="Hey pk2, here's a secret",
tags=[['p', self.pk2.public_key.hex()]],
kind=EventKind.ENCRYPTED_DIRECT_MESSAGE,
),
Event(
public_key=self.pk2.public_key.hex(),
content="Thanks! I'll keep it secure.",
tags=[['p', self.pk1.public_key.hex()]],
kind=EventKind.ENCRYPTED_DIRECT_MESSAGE,
)
]
def test_match_by_event_id(self):
""" should match Events by event_id """
filter = Filter(
event_ids=[self.pk1_thread[0].id],
)
assert filter.matches(self.pk1_thread[0])
# None of the others should match
for event in self.pk1_thread[1:] + self.pk2_thread + self.pk1_pk2_dms[1:]:
assert filter.matches(event) is False
def test_multiple_values_in_same_tag(self):
""" should treat multiple tag values as OR searches """
filter = Filter(
event_ids=[self.pk1_thread[0].id, self.pk1_pk2_dms[0].id, "some_other_event_id"],
)
assert filter.matches(self.pk1_thread[0])
assert filter.matches(self.pk1_pk2_dms[0])
# None of the others should match
for event in self.pk1_thread[1:] + self.pk2_thread + self.pk1_pk2_dms[1:]:
assert filter.matches(event) is False
def test_match_by_kinds(self):
""" should match Events by kind """
filter = Filter(
kinds=[EventKind.TEXT_NOTE],
)
# Both threads should match
for event in self.pk1_thread + self.pk2_thread:
assert filter.matches(event)
# DMs should not match
for event in self.pk1_pk2_dms:
assert filter.matches(event) is False
# Now allow either kind
filter = Filter(
kinds=[EventKind.TEXT_NOTE, EventKind.ENCRYPTED_DIRECT_MESSAGE],
)
# Now everything should match
for event in self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms:
assert filter.matches(event)
def test_match_by_authors(self):
""" should match Events by author """
filter = Filter(authors=[self.pk1.public_key.hex()])
# Everything sent by pk1 should match
for event in [event for event in (self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms) if event.public_key == self.pk1.public_key.hex()]:
assert filter.matches(event)
# None of pk2's should match
for event in [event for event in (self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms) if event.public_key == self.pk2.public_key.hex()]:
assert filter.matches(event) is False
def test_match_by_event_refs(self):
""" should match Events by event_ref 'e' tags """
filter = Filter(
event_refs=[self.pk1_thread[0].id],
)
# All replies to pk1's initial note should match (even pk1's reply at the end)
assert filter.matches(self.pk1_thread[1])
assert filter.matches(self.pk1_thread[2])
# Everything else should not match
for event in [self.pk1_thread[0]] + self.pk2_thread + self.pk1_pk2_dms:
assert filter.matches(event) is False
def test_match_by_pubkey_refs(self):
""" should match Events by pubkey_ref 'p' tags """
filter = Filter(
pubkey_refs=[self.pk1_thread[0].public_key],
)
# pk2's reply in pk1's thread should match
assert filter.matches(self.pk1_thread[1])
# pk2's DM reply to pk1 should match
assert filter.matches(self.pk1_pk2_dms[1])
# Everything else should not match
for event in [self.pk1_thread[0], self.pk1_thread[2]] + self.pk2_thread + [self.pk1_pk2_dms[0]]:
assert filter.matches(event) is False
def test_match_by_arbitrary_single_letter_tag(self):
""" should match NIP-12 arbitrary single-letter tags """
filter = Filter()
filter.add_arbitrary_tag('x', ["oranges"])
# None of our Events match
for event in self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms:
assert filter.matches(event) is False
# A new Event that has the target tag but the wrong value should not match
event = Event(
public_key=self.pk1.public_key.hex(),
content="Additional event to test with",
tags=[
['x', "bananas"]
]
)
assert filter.matches(event) is False
# But a new Event that includes the target should match
event = Event(
public_key=self.pk1.public_key.hex(),
content="Additional event to test with",
tags=[
['x', "oranges"]
]
)
assert filter.matches(event)
# Filter shouldn't care if there are other extraneous values
event.tags.append(['x', "pizza"])
assert filter.matches(event)
event.tags.append(['y', "honey badger"])
assert filter.matches(event)
def test_match_by_arbitrary_multi_letter_tag(self):
""" should match any arbitrary multi-letter tag """
filter = Filter()
filter.add_arbitrary_tag('favorites', ["bitcoin"])
# None of our Events match
for event in self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms:
assert filter.matches(event) is False
# A new Event that has the target tag but the wrong value should not match
event = Event(
public_key=self.pk1.public_key.hex(),
content="Additional event to test with",
tags=[
['favorites', "shitcoin"]
]
)
assert filter.matches(event) is False
# But a new Event that includes the target should match
event = Event(
public_key=self.pk1.public_key.hex(),
content="Additional event to test with",
tags=[
['favorites', "bitcoin"]
]
)
assert filter.matches(event)
# Filter shouldn't care if there are other extraneous values
event.tags.append(['favorites', "sats"])
assert filter.matches(event)
event.tags.append(['foo', "bar"])
assert filter.matches(event)
def test_match_by_delegation_tag(self):
"""
should match on delegation tag.
Note: this is to demonstrate that it works w/out special handling, but
arguably Delegation filtering should have its own explicit Filter support.
"""
filter = Filter()
# Search just for the delegator's pubkey (only aspect of delegation search that is supported this way)
filter.add_arbitrary_tag(
'delegation', ["8e0d3d3eb2881ec137a11debe736a9086715a8c8beeeda615780064d68bc25dd"]
)
# None of our Events match
for event in self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms:
assert filter.matches(event) is False
# A new Event that has the target tag but the wrong value should not match
event = Event(
public_key=self.pk1.public_key.hex(),
content="Additional event to test with",
tags=[
[
'delegation',
"some_other_delegators_pubkey",
"kind=1&created_at<1675721813",
"cbc49c65fe04a3181d72fb5a9f1c627e329d5f45d300a2dfed1c3e788b7834dad48a6d27d8e244af39c77381334ede97d4fd15abe80f35fda695fd9bd732aa1e"
]
]
)
assert filter.matches(event) is False
# But a new Event that includes the target should match
event = Event(
public_key=self.pk1.public_key.hex(),
content="Additional event to test with",
tags=[
[
'delegation',
"8e0d3d3eb2881ec137a11debe736a9086715a8c8beeeda615780064d68bc25dd",
"kind=1&created_at<1675721813",
"cbc49c65fe04a3181d72fb5a9f1c627e329d5f45d300a2dfed1c3e788b7834dad48a6d27d8e244af39c77381334ede97d4fd15abe80f35fda695fd9bd732aa1e"
]
]
)
assert filter.matches(event)
# Filter shouldn't care if there are other extraneous values
event.tags.append(['favorites', "sats"])
assert filter.matches(event)
event.tags.append(['foo', "bar"])
assert filter.matches(event)
def test_match_by_authors_and_kinds(self):
""" should match Events by authors AND kinds """
filter = Filter(
authors=[self.pk1.public_key.hex()],
kinds=[EventKind.TEXT_NOTE],
)
# Should match pk1's notes but not pk2's reply
assert filter.matches(self.pk1_thread[0])
assert filter.matches(self.pk1_thread[1]) is False
assert filter.matches(self.pk1_thread[2])
# Should not match anything else
for event in self.pk2_thread + self.pk1_pk2_dms:
assert filter.matches(event) is False
# Typical search to get all Events sent by a pubkey
filter = Filter(
authors=[self.pk1.public_key.hex()],
kinds=[EventKind.TEXT_NOTE, EventKind.ENCRYPTED_DIRECT_MESSAGE],
)
# Should still match pk1's notes but not pk2's reply
assert filter.matches(self.pk1_thread[0])
assert filter.matches(self.pk1_thread[1]) is False
assert filter.matches(self.pk1_thread[2])
# Should not match any of pk2's solo thread
assert filter.matches(self.pk2_thread[0]) is False
assert filter.matches(self.pk2_thread[1]) is False
# Should match pk1's DM but not pk2's DM reply
assert filter.matches(self.pk1_pk2_dms[0])
assert filter.matches(self.pk1_pk2_dms[1]) is False
def test_match_by_kinds_and_pubkey_refs(self):
""" should match Events by kind AND pubkey_ref 'p' tags """
filter = Filter(
kinds=[EventKind.TEXT_NOTE],
pubkey_refs=[self.pk2.public_key.hex()],
)
# Only pk1's reply to pk2 should match
assert filter.matches(self.pk1_thread[2])
# Should not match anything else
for event in self.pk1_thread[:1] + self.pk2_thread + self.pk1_pk2_dms:
assert filter.matches(event) is False
# Typical search to get all Events sent to a pubkey
filter = Filter(
kinds=[EventKind.TEXT_NOTE, EventKind.ENCRYPTED_DIRECT_MESSAGE],
pubkey_refs=[self.pk2.public_key.hex()],
)
# pk1's reply to pk2 should match
assert filter.matches(self.pk1_thread[2])
# pk2's DM to pk1 should match
assert filter.matches(self.pk1_pk2_dms[0])
# Should not match anything else
for event in self.pk1_thread[:1] + self.pk2_thread + self.pk1_pk2_dms[1:]:
assert filter.matches(event) is False
def test_event_refs_json(self):
""" should insert event_refs as "#e" in json """
filter = Filter(event_refs=["some_event_id"])
assert "#e" in filter.to_json_object().keys()
assert "e" not in filter.to_json_object().keys()
def test_pubkey_refs_json(self):
""" should insert pubkey_refs as "#p" in json """
filter = Filter(pubkey_refs=["some_pubkey"])
assert "#p" in filter.to_json_object().keys()
assert "p" not in filter.to_json_object().keys()
def test_arbitrary_single_letter_json(self):
""" should prefix NIP-12 arbitrary single-letter tags with "#" in json """
filter = Filter()
filter.add_arbitrary_tag('x', ["oranges"])
assert "#x" in filter.to_json_object().keys()
assert "x" not in filter.to_json_object().keys()
def test_arbitrary_multi_letter_json(self):
""" should include arbitrary multi-letter tags as-is in json """
filter = Filter()
filter.add_arbitrary_tag('foo', ["bar"])
assert "foo" in filter.to_json_object().keys()
# Inherit from TestFilter to get all the same test data
class TestFilters(TestFilter):
def test_match_by_authors_or_pubkey_refs(self):
""" Should match on authors or pubkey_refs """
# Typical filters for anything sent by or to a pubkey
filter1 = Filter(
authors=[self.pk1.public_key.hex()],
)
filter2 = Filter(
pubkey_refs=[self.pk1.public_key.hex()],
)
filters = Filters([filter1, filter2])
# Should match the entire pk1 thread and the DM exchange
for event in self.pk1_thread + self.pk1_pk2_dms:
assert filters.match(event)
# Should not match anything in pk2's solo thread
assert filters.match(self.pk2_thread[0]) is False
assert filters.match(self.pk2_thread[1]) is False

23
test/test_key.py Normal file
View File

@@ -0,0 +1,23 @@
from nostr.key import PrivateKey
def test_eq_true():
""" __eq__ should return True when PrivateKeys are equal """
pk1 = PrivateKey()
pk2 = PrivateKey(pk1.raw_secret)
assert pk1 == pk2
def test_eq_false():
""" __eq__ should return False when PrivateKeys are not equal """
pk1 = PrivateKey()
pk2 = PrivateKey()
assert pk1.raw_secret != pk2.raw_secret
assert pk1 != pk2
def test_from_nsec():
""" PrivateKey.from_nsec should yield the source's raw_secret """
pk1 = PrivateKey()
pk2 = PrivateKey.from_nsec(pk1.bech32())
assert pk1.raw_secret == pk2.raw_secret

View File

@@ -0,0 +1,30 @@
import pytest
from nostr.event import Event
from nostr.key import PrivateKey
from nostr.relay_manager import RelayManager, RelayException
def test_only_relay_valid_events():
""" publish_event raise a RelayException if an Event fails verification """
pk = PrivateKey()
event = Event(
public_key=pk.public_key.hex(),
content="Hello, world!",
)
relay_manager = RelayManager()
# Deliberately forget to sign the Event
with pytest.raises(RelayException) as e:
relay_manager.publish_event(event)
assert "must be signed" in str(e)
# Attempt to relay with a nonsense signature
event.signature = '0' * 32
with pytest.raises(RelayException) as e:
relay_manager.publish_event(event)
assert "failed to verify" in str(e)
# Properly signed Event can be relayed
pk.sign_event(event)
relay_manager.publish_event(event)