count events and queue

This commit is contained in:
callebtc
2023-02-07 23:22:18 +01:00
4 changed files with 26 additions and 24 deletions

11
main.py
View File

@@ -20,15 +20,7 @@ async def dm():
) )
client = NostrClient(privatekey_hex=pk) client = NostrClient(privatekey_hex=pk)
# await asyncio.sleep(1)
filters = {
"since": int(
time.mktime(
(datetime.datetime.now() - datetime.timedelta(hours=1)).timetuple()
)
)
}
t = threading.Thread( t = threading.Thread(
target=client.get_dm, target=client.get_dm,
args=( args=(
@@ -59,6 +51,7 @@ async def post():
print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}") print(f"From {event.public_key[:3]}..{event.public_key[-3:]}: {event.content}")
sender_client = NostrClient(privatekey_hex=pk) sender_client = NostrClient(privatekey_hex=pk)
# await asyncio.sleep(1)
to_pubk_hex = ( to_pubk_hex = (
input( input(

View File

@@ -21,12 +21,10 @@ from . import cbc
class NostrClient: class NostrClient:
relays = [ relays = [
# "wss://relay.snort.social", "wss://lnbits.link/nostrrelay/client",
"wss://nostr-pub.wellorder.net", "wss://nostr-pub.wellorder.net",
"wss://nostr.zebedee.cloud", "wss://nostr.zebedee.cloud",
"wss://nostr.mom", "wss://no.str.cr",
# "wss://wss://lnbits.link/nostrrelay/client"
# "wss://no.str.cr",
] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr" ] # ["wss://nostr.oxtr.dev"] # ["wss://relay.nostr.info"] "wss://nostr-pub.wellorder.net" "ws://91.237.88.218:2700", "wss://nostrrr.bublina.eu.org", ""wss://nostr-relay.freeberty.net"", , "wss://nostr.oxtr.dev", "wss://relay.nostr.info", "wss://nostr-pub.wellorder.net" , "wss://relayer.fiatjaf.com", "wss://nodestr.fmt.wiz.biz/", "wss://no.str.cr"
relay_manager = RelayManager() relay_manager = RelayManager()
private_key: PrivateKey private_key: PrivateKey
@@ -46,6 +44,7 @@ class NostrClient:
self.relay_manager.open_connections( self.relay_manager.open_connections(
{"cert_reqs": ssl.CERT_NONE} {"cert_reqs": ssl.CERT_NONE}
) # NOTE: This disables ssl certificate verification ) # NOTE: This disables ssl certificate verification
self.relay_manager.start_message_workers()
def close(self): def close(self):
self.relay_manager.close_connections() self.relay_manager.close_connections()

View File

@@ -1,5 +1,6 @@
import json import json
import time import time
from queue import Queue
from threading import Lock from threading import Lock
from websocket import WebSocketApp from websocket import WebSocketApp
from .event import Event from .event import Event
@@ -40,6 +41,7 @@ class Relay:
self.ssl_options: dict = {} self.ssl_options: dict = {}
self.proxy: dict = {} self.proxy: dict = {}
self.lock = Lock() self.lock = Lock()
self.queue = Queue()
self.ws = WebSocketApp( self.ws = WebSocketApp(
url, url,
on_open=self._on_open, on_open=self._on_open,
@@ -52,17 +54,16 @@ class Relay:
def connect(self, ssl_options: dict = None, proxy: dict = None): def connect(self, ssl_options: dict = None, proxy: dict = None):
self.ssl_options = ssl_options self.ssl_options = ssl_options
print(self.url, "🟢")
self.proxy = proxy self.proxy = proxy
self.ws.run_forever( self.ws.run_forever(
sslopt=ssl_options, sslopt=ssl_options,
http_proxy_host=None if proxy is None else proxy.get("host"), http_proxy_host=None if proxy is None else proxy.get("host"),
http_proxy_port=None if proxy is None else proxy.get("port"), http_proxy_port=None if proxy is None else proxy.get("port"),
proxy_type=None if proxy is None else proxy.get("type"), proxy_type=None if proxy is None else proxy.get("type"),
ping_interval=5,
) )
def close(self): def close(self):
print(self.url, "🔴")
self.ws.close() self.ws.close()
def check_reconnect(self): def check_reconnect(self):
@@ -78,14 +79,21 @@ class Relay:
@property @property
def ping(self): def ping(self):
if self.connected: if self.connected:
return int(self.ws.last_ping_tm - self.ws.last_pong_tm) return int((self.ws.last_pong_tm - self.ws.last_ping_tm) * 1000)
else: else:
return 0 return 0
def publish(self, message: str): def publish(self, message: str):
if self.connected: self.queue.put(message)
self.num_sent_events += 1
self.ws.send(message) def queue_worker(self):
while True:
if self.connected:
message = self.queue.get()
self.num_sent_events += 1
self.ws.send(message)
else:
time.sleep(0.1)
def add_subscription(self, id, filters: Filters): def add_subscription(self, id, filters: Filters):
with self.lock: with self.lock:
@@ -122,11 +130,8 @@ class Relay:
if self._is_valid_message(message): if self._is_valid_message(message):
self.num_received_events += 1 self.num_received_events += 1
self.message_pool.add_message(message, self.url) self.message_pool.add_message(message, self.url)
else:
print(self.url, "invalid message", message)
def _on_error(self, class_obj, error): def _on_error(self, class_obj, error):
print(self.url, "🚫", error)
self.connected = False self.connected = False
self.error_counter += 1 self.error_counter += 1
if self.error_threshold and self.error_counter > self.error_threshold: if self.error_threshold and self.error_counter > self.error_threshold:
@@ -135,11 +140,9 @@ class Relay:
self.check_reconnect() self.check_reconnect()
def _on_ping(self, class_obj, message): def _on_ping(self, class_obj, message):
print(self.url, "ping", message)
return return
def _on_pong(self, class_obj, message): def _on_pong(self, class_obj, message):
print(self.url, "pong", message)
return return
def _is_valid_message(self, message: str) -> bool: def _is_valid_message(self, message: str) -> bool:

View File

@@ -43,6 +43,13 @@ class RelayManager:
name=f"{relay.url}-thread", name=f"{relay.url}-thread",
).start() ).start()
def start_message_workers(self):
for relay in self.relays.values():
threading.Thread(
target=relay.queue_worker,
name=f"{relay.url}-queue",
).start()
def close_connections(self): def close_connections(self):
for relay in self.relays.values(): for relay in self.relays.values():
relay.close() relay.close()