diff --git a/.gitignore b/.gitignore index 09ebb8d..161ec36 100644 --- a/.gitignore +++ b/.gitignore @@ -2,4 +2,5 @@ venv/ __pycache__/ nostr.egg-info/ dist/ -nostr/_version.py \ No newline at end of file +nostr/_version.py +.DS_Store diff --git a/nostr/event.py b/nostr/event.py index 0d36f4a..ce77050 100644 --- a/nostr/event.py +++ b/nostr/event.py @@ -28,7 +28,7 @@ class Event(): signature: str=None) -> None: if not isinstance(content, str): raise TypeError("Argument 'content' must be of type str") - + self.public_key = public_key self.content = content self.created_at = created_at or int(time.time()) diff --git a/nostr/filter.py b/nostr/filter.py index 1233249..f4cb0a5 100644 --- a/nostr/filter.py +++ b/nostr/filter.py @@ -1,68 +1,121 @@ from collections import UserList -from .event import Event +from typing import List + +from .event import Event, EventKind + + class Filter: + """ + NIP-01 filtering. + + Explicitly supports "#e" and "#p" tag filters via `event_refs` and `pubkey_refs`. + + Arbitrary NIP-12 single-letter tag filters are also supported via `add_arbitrary_tag`. + If a particular single-letter tag gains prominence, explicit support should be + added. For example: + # arbitrary tag + filter.add_arbitrary_tag('t', [hashtags]) + + # promoted to explicit support + Filter(hashtag_refs=[hashtags]) + """ def __init__( self, - ids: "list[str]"=None, - kinds: "list[int]"=None, - authors: "list[str]"=None, - since: int=None, - until: int=None, - tags: "dict[str, list[str]]"=None, - limit: int=None) -> None: - self.IDs = ids + event_ids: List[str] = None, + kinds: List[EventKind] = None, + authors: List[str] = None, + since: int = None, + until: int = None, + event_refs: List[str] = None, # the "#e" attr; list of event ids referenced in an "e" tag + pubkey_refs: List[str] = None, # The "#p" attr; list of pubkeys referenced in a "p" tag + limit: int = None) -> None: + self.event_ids = event_ids self.kinds = kinds self.authors = authors self.since = since self.until = until - self.tags = tags + self.event_refs = event_refs + self.pubkey_refs = pubkey_refs self.limit = limit + self.tags = {} + if self.event_refs: + self.add_arbitrary_tag('e', self.event_refs) + if self.pubkey_refs: + self.add_arbitrary_tag('p', self.pubkey_refs) + + + def add_arbitrary_tag(self, tag: str, values: list): + """ + Filter on any arbitrary tag with explicit handling for NIP-01 and NIP-12 + single-letter tags. + """ + # NIP-01 'e' and 'p' tags and any NIP-12 single-letter tags must be prefixed with "#" + tag_key = tag if len(tag) > 1 else f"#{tag}" + self.tags[tag_key] = values + + def matches(self, event: Event) -> bool: - if self.IDs != None and event.id not in self.IDs: + if self.event_ids is not None and event.id not in self.event_ids: return False - if self.kinds != None and event.kind not in self.kinds: + if self.kinds is not None and event.kind not in self.kinds: return False - if self.authors != None and event.public_key not in self.authors: + if self.authors is not None and event.public_key not in self.authors: return False - if self.since != None and event.created_at < self.since: + if self.since is not None and event.created_at < self.since: return False - if self.until != None and event.created_at > self.until: + if self.until is not None and event.created_at > self.until: return False - if self.tags != None and len(event.tags) == 0: + if (self.event_refs is not None or self.pubkey_refs is not None) and len(event.tags) == 0: return False - if self.tags != None: - e_tag_identifiers = [e_tag[0] for e_tag in event.tags] + + if self.tags: + e_tag_identifiers = set([e_tag[0] for e_tag in event.tags]) for f_tag, f_tag_values in self.tags.items(): - if f_tag[1:] not in e_tag_identifiers: + # Omit any NIP-01 or NIP-12 "#" chars on single-letter tags + f_tag = f_tag.replace("#", "") + + if f_tag not in e_tag_identifiers: + # Event is missing a tag type that we're looking for return False + + # Multiple values within f_tag_values are treated as OR search; an Event + # needs to match only one. + # Note: an Event could have multiple entries of the same tag type + # (e.g. a reply to multiple people) so we have to check all of them. + match_found = False for e_tag in event.tags: - if e_tag[1] not in f_tag_values: - return False - + if e_tag[0] == f_tag and e_tag[1] in f_tag_values: + match_found = True + break + if not match_found: + return False + return True + def to_json_object(self) -> dict: res = {} - if self.IDs != None: - res["ids"] = self.IDs - if self.kinds != None: + if self.event_ids is not None: + res["ids"] = self.event_ids + if self.kinds is not None: res["kinds"] = self.kinds - if self.authors != None: + if self.authors is not None: res["authors"] = self.authors - if self.since != None: + if self.since is not None: res["since"] = self.since - if self.until != None: + if self.until is not None: res["until"] = self.until - if self.tags != None: - for tag, values in self.tags.items(): - res[tag] = values - if self.limit != None: + if self.limit is not None: res["limit"] = self.limit + if self.tags: + res.update(self.tags) return res - + + + class Filters(UserList): def __init__(self, initlist: "list[Filter]"=[]) -> None: super().__init__(initlist) @@ -75,5 +128,4 @@ class Filters(UserList): return False def to_json_array(self) -> list: - return [filter.to_json_object() for filter in self.data] - \ No newline at end of file + return [filter.to_json_object() for filter in self.data] \ No newline at end of file diff --git a/test/test_filter.py b/test/test_filter.py new file mode 100644 index 0000000..d1ddcb4 --- /dev/null +++ b/test/test_filter.py @@ -0,0 +1,423 @@ +from typing import List +import pytest +from nostr.event import Event, EventKind +from nostr.filter import Filter, Filters +from nostr.key import PrivateKey + + + +class TestFilter: + def setup_class(self): + self.pk1 = PrivateKey() + self.pk2 = PrivateKey() + + """ pk1 kicks off a thread and interacts with pk2 """ + self.pk1_thread = [ + # Note posted by pk1 + Event( + public_key=self.pk1.public_key.hex(), + content="pk1's first note!" + ), + ] + self.pk1_thread.append( + # Note posted by pk2 in response to pk1's note + Event( + public_key=self.pk2.public_key.hex(), + content="Nice to see you here, pk1!", + tags=[ + ['e', self.pk1_thread[0].id], # Replies reference which note they're directly responding to + ['p', self.pk1.public_key.hex()], # Replies reference who they're responding to + ], + ) + ) + self.pk1_thread.append( + # Next response note by pk1 continuing thread with pk2 + Event( + public_key=self.pk1.public_key.hex(), + content="Thanks! Glad you're here, too, pk2!", + tags=[ + ['e', self.pk1_thread[0].id], # Threads reference the original note + ['e', self.pk1_thread[-1].id], # Replies reference which note they're directly responding to + ['p', self.pk2.public_key.hex()], # Replies reference who they're responding to + ], + ) + ) + + """ pk2 starts a new thread but no one responds """ + self.pk2_thread = [ + # Note posted by pk2 + Event( + public_key=self.pk2.public_key.hex(), + content="pk2's first note!" + ) + ] + self.pk2_thread.append( + # pk2's self-reply + Event( + public_key=self.pk2.public_key.hex(), + content="So... I guess no one's following me.", + tags=[ + ['e', self.pk2_thread[0].id] + ] + ) + ) + + """ pk1 DMs pk2 """ + self.pk1_pk2_dms = [ + # DM sent by pk1 to pk2 + Event( + public_key=self.pk1.public_key.hex(), + content="Hey pk2, here's a secret", + tags=[['p', self.pk2.public_key.hex()]], + kind=EventKind.ENCRYPTED_DIRECT_MESSAGE, + ), + Event( + public_key=self.pk2.public_key.hex(), + content="Thanks! I'll keep it secure.", + tags=[['p', self.pk1.public_key.hex()]], + kind=EventKind.ENCRYPTED_DIRECT_MESSAGE, + ) + ] + + + def test_match_by_event_id(self): + """ should match Events by event_id """ + filter = Filter( + event_ids=[self.pk1_thread[0].id], + ) + assert filter.matches(self.pk1_thread[0]) + + # None of the others should match + for event in self.pk1_thread[1:] + self.pk2_thread + self.pk1_pk2_dms[1:]: + assert filter.matches(event) is False + + + def test_multiple_values_in_same_tag(self): + """ should treat multiple tag values as OR searches """ + filter = Filter( + event_ids=[self.pk1_thread[0].id, self.pk1_pk2_dms[0].id, "some_other_event_id"], + ) + assert filter.matches(self.pk1_thread[0]) + assert filter.matches(self.pk1_pk2_dms[0]) + + # None of the others should match + for event in self.pk1_thread[1:] + self.pk2_thread + self.pk1_pk2_dms[1:]: + assert filter.matches(event) is False + + + def test_match_by_kinds(self): + """ should match Events by kind """ + filter = Filter( + kinds=[EventKind.TEXT_NOTE], + ) + + # Both threads should match + for event in self.pk1_thread + self.pk2_thread: + assert filter.matches(event) + + # DMs should not match + for event in self.pk1_pk2_dms: + assert filter.matches(event) is False + + # Now allow either kind + filter = Filter( + kinds=[EventKind.TEXT_NOTE, EventKind.ENCRYPTED_DIRECT_MESSAGE], + ) + + # Now everything should match + for event in self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms: + assert filter.matches(event) + + + def test_match_by_authors(self): + """ should match Events by author """ + filter = Filter(authors=[self.pk1.public_key.hex()]) + + # Everything sent by pk1 should match + for event in [event for event in (self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms) if event.public_key == self.pk1.public_key.hex()]: + assert filter.matches(event) + + # None of pk2's should match + for event in [event for event in (self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms) if event.public_key == self.pk2.public_key.hex()]: + assert filter.matches(event) is False + + + def test_match_by_event_refs(self): + """ should match Events by event_ref 'e' tags """ + filter = Filter( + event_refs=[self.pk1_thread[0].id], + ) + + # All replies to pk1's initial note should match (even pk1's reply at the end) + assert filter.matches(self.pk1_thread[1]) + assert filter.matches(self.pk1_thread[2]) + + # Everything else should not match + for event in [self.pk1_thread[0]] + self.pk2_thread + self.pk1_pk2_dms: + assert filter.matches(event) is False + + + def test_match_by_pubkey_refs(self): + """ should match Events by pubkey_ref 'p' tags """ + filter = Filter( + pubkey_refs=[self.pk1_thread[0].public_key], + ) + + # pk2's reply in pk1's thread should match + assert filter.matches(self.pk1_thread[1]) + + # pk2's DM reply to pk1 should match + assert filter.matches(self.pk1_pk2_dms[1]) + + # Everything else should not match + for event in [self.pk1_thread[0], self.pk1_thread[2]] + self.pk2_thread + [self.pk1_pk2_dms[0]]: + assert filter.matches(event) is False + + + def test_match_by_arbitrary_single_letter_tag(self): + """ should match NIP-12 arbitrary single-letter tags """ + filter = Filter() + filter.add_arbitrary_tag('x', ["oranges"]) + + # None of our Events match + for event in self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms: + assert filter.matches(event) is False + + # A new Event that has the target tag but the wrong value should not match + event = Event( + public_key=self.pk1.public_key.hex(), + content="Additional event to test with", + tags=[ + ['x', "bananas"] + ] + ) + assert filter.matches(event) is False + + # But a new Event that includes the target should match + event = Event( + public_key=self.pk1.public_key.hex(), + content="Additional event to test with", + tags=[ + ['x', "oranges"] + ] + ) + assert filter.matches(event) + + # Filter shouldn't care if there are other extraneous values + event.tags.append(['x', "pizza"]) + assert filter.matches(event) + + event.tags.append(['y', "honey badger"]) + assert filter.matches(event) + + + def test_match_by_arbitrary_multi_letter_tag(self): + """ should match any arbitrary multi-letter tag """ + filter = Filter() + filter.add_arbitrary_tag('favorites', ["bitcoin"]) + + # None of our Events match + for event in self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms: + assert filter.matches(event) is False + + # A new Event that has the target tag but the wrong value should not match + event = Event( + public_key=self.pk1.public_key.hex(), + content="Additional event to test with", + tags=[ + ['favorites', "shitcoin"] + ] + ) + assert filter.matches(event) is False + + # But a new Event that includes the target should match + event = Event( + public_key=self.pk1.public_key.hex(), + content="Additional event to test with", + tags=[ + ['favorites', "bitcoin"] + ] + ) + assert filter.matches(event) + + # Filter shouldn't care if there are other extraneous values + event.tags.append(['favorites', "sats"]) + assert filter.matches(event) + + event.tags.append(['foo', "bar"]) + assert filter.matches(event) + + + def test_match_by_delegation_tag(self): + """ + should match on delegation tag. + Note: this is to demonstrate that it works w/out special handling, but + arguably Delegation filtering should have its own explicit Filter support. + """ + filter = Filter() + + # Search just for the delegator's pubkey (only aspect of delegation search that is supported this way) + filter.add_arbitrary_tag( + 'delegation', ["8e0d3d3eb2881ec137a11debe736a9086715a8c8beeeda615780064d68bc25dd"] + ) + + # None of our Events match + for event in self.pk1_thread + self.pk2_thread + self.pk1_pk2_dms: + assert filter.matches(event) is False + + # A new Event that has the target tag but the wrong value should not match + event = Event( + public_key=self.pk1.public_key.hex(), + content="Additional event to test with", + tags=[ + [ + 'delegation', + "some_other_delegators_pubkey", + "kind=1&created_at<1675721813", + "cbc49c65fe04a3181d72fb5a9f1c627e329d5f45d300a2dfed1c3e788b7834dad48a6d27d8e244af39c77381334ede97d4fd15abe80f35fda695fd9bd732aa1e" + ] + ] + ) + assert filter.matches(event) is False + + # But a new Event that includes the target should match + event = Event( + public_key=self.pk1.public_key.hex(), + content="Additional event to test with", + tags=[ + [ + 'delegation', + "8e0d3d3eb2881ec137a11debe736a9086715a8c8beeeda615780064d68bc25dd", + "kind=1&created_at<1675721813", + "cbc49c65fe04a3181d72fb5a9f1c627e329d5f45d300a2dfed1c3e788b7834dad48a6d27d8e244af39c77381334ede97d4fd15abe80f35fda695fd9bd732aa1e" + ] + ] + ) + assert filter.matches(event) + + # Filter shouldn't care if there are other extraneous values + event.tags.append(['favorites', "sats"]) + assert filter.matches(event) + + event.tags.append(['foo', "bar"]) + assert filter.matches(event) + + + def test_match_by_authors_and_kinds(self): + """ should match Events by authors AND kinds """ + filter = Filter( + authors=[self.pk1.public_key.hex()], + kinds=[EventKind.TEXT_NOTE], + ) + + # Should match pk1's notes but not pk2's reply + assert filter.matches(self.pk1_thread[0]) + assert filter.matches(self.pk1_thread[1]) is False + assert filter.matches(self.pk1_thread[2]) + + # Should not match anything else + for event in self.pk2_thread + self.pk1_pk2_dms: + assert filter.matches(event) is False + + # Typical search to get all Events sent by a pubkey + filter = Filter( + authors=[self.pk1.public_key.hex()], + kinds=[EventKind.TEXT_NOTE, EventKind.ENCRYPTED_DIRECT_MESSAGE], + ) + + # Should still match pk1's notes but not pk2's reply + assert filter.matches(self.pk1_thread[0]) + assert filter.matches(self.pk1_thread[1]) is False + assert filter.matches(self.pk1_thread[2]) + + # Should not match any of pk2's solo thread + assert filter.matches(self.pk2_thread[0]) is False + assert filter.matches(self.pk2_thread[1]) is False + + # Should match pk1's DM but not pk2's DM reply + assert filter.matches(self.pk1_pk2_dms[0]) + assert filter.matches(self.pk1_pk2_dms[1]) is False + + + def test_match_by_kinds_and_pubkey_refs(self): + """ should match Events by kind AND pubkey_ref 'p' tags """ + filter = Filter( + kinds=[EventKind.TEXT_NOTE], + pubkey_refs=[self.pk2.public_key.hex()], + ) + + # Only pk1's reply to pk2 should match + assert filter.matches(self.pk1_thread[2]) + + # Should not match anything else + for event in self.pk1_thread[:1] + self.pk2_thread + self.pk1_pk2_dms: + assert filter.matches(event) is False + + # Typical search to get all Events sent to a pubkey + filter = Filter( + kinds=[EventKind.TEXT_NOTE, EventKind.ENCRYPTED_DIRECT_MESSAGE], + pubkey_refs=[self.pk2.public_key.hex()], + ) + + # pk1's reply to pk2 should match + assert filter.matches(self.pk1_thread[2]) + + # pk2's DM to pk1 should match + assert filter.matches(self.pk1_pk2_dms[0]) + + # Should not match anything else + for event in self.pk1_thread[:1] + self.pk2_thread + self.pk1_pk2_dms[1:]: + assert filter.matches(event) is False + + + def test_event_refs_json(self): + """ should insert event_refs as "#e" in json """ + filter = Filter(event_refs=["some_event_id"]) + assert "#e" in filter.to_json_object().keys() + assert "e" not in filter.to_json_object().keys() + + + def test_pubkey_refs_json(self): + """ should insert pubkey_refs as "#p" in json """ + filter = Filter(pubkey_refs=["some_pubkey"]) + assert "#p" in filter.to_json_object().keys() + assert "p" not in filter.to_json_object().keys() + + + def test_arbitrary_single_letter_json(self): + """ should prefix NIP-12 arbitrary single-letter tags with "#" in json """ + filter = Filter() + filter.add_arbitrary_tag('x', ["oranges"]) + assert "#x" in filter.to_json_object().keys() + assert "x" not in filter.to_json_object().keys() + + + def test_arbitrary_multi_letter_json(self): + """ should include arbitrary multi-letter tags as-is in json """ + filter = Filter() + filter.add_arbitrary_tag('foo', ["bar"]) + assert "foo" in filter.to_json_object().keys() + + + +# Inherit from TestFilter to get all the same test data +class TestFilters(TestFilter): + + def test_match_by_authors_or_pubkey_refs(self): + """ Should match on authors or pubkey_refs """ + # Typical filters for anything sent by or to a pubkey + filter1 = Filter( + authors=[self.pk1.public_key.hex()], + ) + filter2 = Filter( + pubkey_refs=[self.pk1.public_key.hex()], + ) + filters = Filters([filter1, filter2]) + + # Should match the entire pk1 thread and the DM exchange + for event in self.pk1_thread + self.pk1_pk2_dms: + assert filters.match(event) + + # Should not match anything in pk2's solo thread + assert filters.match(self.pk2_thread[0]) is False + assert filters.match(self.pk2_thread[1]) is False