create message pool class

This commit is contained in:
jeffthibault
2022-08-01 19:37:27 -04:00
parent b46ac26aed
commit d475f0c483

66
nostr/message_pool.py Normal file
View File

@@ -0,0 +1,66 @@
import json
from queue import Queue
from threading import Lock
from .message_type import RelayMessageType
from .event import Event
class EventMessage:
def __init__(self, event: Event, url: str) -> None:
self.event = event
self.url = url
class NoticeMessage:
def __init__(self, content: str, url: str) -> None:
self.content = content
self.url = url
class EndOfStoredEventsMessage:
def __init__(self, subscription_id: str, url: str) -> None:
self.subscription_id = subscription_id
self.url = url
class MessagePool:
def __init__(self) -> None:
self.events: Queue[EventMessage] = Queue()
self.notices: Queue[NoticeMessage] = Queue()
self.eose_notices: Queue[EndOfStoredEventsMessage] = Queue()
self._unique_events: set = set()
self.lock: Lock = Lock()
def add_message(self, message: str, url: str):
self._process_message(message, url)
def get_event(self):
return self.events.get()
def get_notice(self):
return self.notices.get()
def get_eose_notice(self):
return self.eose_notices.get()
def has_events(self):
return self.events.qsize() > 0
def has_notices(self):
return self.notices.qsize() > 0
def has_eose_notices(self):
return self.eose_notices.qsize() > 0
def _process_message(self, message: str, url: str):
message_json = json.loads(message)
message_type = message_json[0]
if message_type == RelayMessageType.EVENT:
e = message_json[2]
event = Event(e['pubkey'], e['content'], e['created_at'], e['kind'], e['tags'], e['id'], e['sig'])
with self.lock:
if not event.id in self._unique_events:
self.events.put(EventMessage(event, url))
self._unique_events.add(event.id)
elif message_type == RelayMessageType.NOTICE:
self.notices.put(NoticeMessage(message_json[1], url))
elif message_type == RelayMessageType.END_OF_STORED_EVENTS:
self.eose_notices.put(EndOfStoredEventsMessage(message_json[1], url))