From 479b776fe29c539a001e9f64b300614b9ea27d27 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Wed, 25 Jan 2023 00:35:48 +0100 Subject: [PATCH 1/3] error checking and reconnect --- nostr/relay.py | 80 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 59 insertions(+), 21 deletions(-) diff --git a/nostr/relay.py b/nostr/relay.py index 9ccb77b..44c05c6 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -1,4 +1,5 @@ import json +import time from threading import Lock from websocket import WebSocketApp from .event import Event @@ -7,44 +8,62 @@ from .message_pool import MessagePool from .message_type import RelayMessageType from .subscription import Subscription + class RelayPolicy: - def __init__(self, should_read: bool=True, should_write: bool=True) -> None: + def __init__(self, should_read: bool = True, should_write: bool = True) -> None: self.should_read = should_read self.should_write = should_write def to_json_object(self) -> dict[str, bool]: - return { - "read": self.should_read, - "write": self.should_write - } + return {"read": self.should_read, "write": self.should_write} + class Relay: def __init__( - self, - url: str, - policy: RelayPolicy, - message_pool: MessagePool, - subscriptions: dict[str, Subscription]={}) -> None: + self, + url: str, + policy: RelayPolicy, + message_pool: MessagePool, + subscriptions: dict[str, Subscription] = {}, + ) -> None: self.url = url self.policy = policy self.message_pool = message_pool self.subscriptions = subscriptions + self.connected: bool = False + self.reconnect: bool = True + self.error_counter: int = 0 + self.error_threshold: int = 0 + self.ssl_options: dict = {} self.lock = Lock() self.ws = WebSocketApp( url, on_open=self._on_open, on_message=self._on_message, on_error=self._on_error, - on_close=self._on_close) + on_close=self._on_close, + ) - def connect(self, ssl_options: dict=None): - self.ws.run_forever(sslopt=ssl_options) + def connect(self, ssl_options: dict = {}): + self.ssl_options = ssl_options + self.ws.run_forever(sslopt=self.ssl_options) def close(self): self.ws.close() + def check_reconnect(self): + try: + self.close() + except: + pass + self.connected = False + if self.reconnect: + time.sleep(1) + self.connect(self.ssl_options) + def publish(self, message: str): - self.ws.send(message) + if self.connected: + self.ws.send(message) def add_subscription(self, id, filters: Filters): with self.lock: @@ -63,25 +82,36 @@ class Relay: return { "url": self.url, "policy": self.policy.to_json_object(), - "subscriptions": [subscription.to_json_object() for subscription in self.subscriptions.values()] + "subscriptions": [ + subscription.to_json_object() + for subscription in self.subscriptions.values() + ], } def _on_open(self, class_obj): + self.connected = True pass def _on_close(self, class_obj, status_code, message): + self.connected = False + self.check_reconnect() pass def _on_message(self, class_obj, message: str): if self._is_valid_message(message): self.message_pool.add_message(message, self.url) - + def _on_error(self, class_obj, error): - pass + self.connected = False + self.error_counter += 1 + if self.error_threshold and self.error_counter > self.error_threshold: + pass + else: + self.check_reconnect() def _is_valid_message(self, message: str) -> bool: message = message.strip("\n") - if not message or message[0] != '[' or message[-1] != ']': + if not message or message[0] != "[" or message[-1] != "]": return False message_json = json.loads(message) @@ -91,21 +121,29 @@ class Relay: if message_type == RelayMessageType.EVENT: if not len(message_json) == 3: return False - + subscription_id = message_json[1] with self.lock: if subscription_id not in self.subscriptions: return False e = message_json[2] - event = Event(e['pubkey'], e['content'], e['created_at'], e['kind'], e['tags'], e['id'], e['sig']) + event = Event( + e["pubkey"], + e["content"], + e["created_at"], + e["kind"], + e["tags"], + e["id"], + e["sig"], + ) if not event.verify(): return False with self.lock: subscription = self.subscriptions[subscription_id] - if not subscription.filters.match(event): + if subscription.filters and not subscription.filters.match(event): return False return True From 3794ef2cc5bdf2e9bbd60fcebdd3d7be81db5ae4 Mon Sep 17 00:00:00 2001 From: calle <93376500+callebtc@users.noreply.github.com> Date: Wed, 25 Jan 2023 01:45:21 +0100 Subject: [PATCH 2/3] Do not reconnect on close --- nostr/relay.py | 1 - 1 file changed, 1 deletion(-) diff --git a/nostr/relay.py b/nostr/relay.py index 44c05c6..da7ecd6 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -94,7 +94,6 @@ class Relay: def _on_close(self, class_obj, status_code, message): self.connected = False - self.check_reconnect() pass def _on_message(self, class_obj, message: str): From 88d436799e2606f70c4f04d0788608fe51c68919 Mon Sep 17 00:00:00 2001 From: callebtc <93376500+callebtc@users.noreply.github.com> Date: Thu, 2 Feb 2023 16:35:57 +0100 Subject: [PATCH 3/3] also reuse proxy --- nostr/relay.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/nostr/relay.py b/nostr/relay.py index 9aa9595..25e2bf8 100644 --- a/nostr/relay.py +++ b/nostr/relay.py @@ -35,6 +35,7 @@ class Relay: self.error_counter: int = 0 self.error_threshold: int = 0 self.ssl_options: dict = {} + self.proxy: dict = {} self.lock = Lock() self.ws = WebSocketApp( url, @@ -44,13 +45,14 @@ class Relay: on_close=self._on_close, ) - def connect(self, ssl_options: dict=None, proxy: dict=None): + def connect(self, ssl_options: dict = None, proxy: dict = None): self.ssl_options = ssl_options + self.proxy = proxy self.ws.run_forever( sslopt=ssl_options, - http_proxy_host=None if proxy is None else proxy.get('host'), - http_proxy_port=None if proxy is None else proxy.get('port'), - proxy_type=None if proxy is None else proxy.get('type') + http_proxy_host=None if proxy is None else proxy.get("host"), + http_proxy_port=None if proxy is None else proxy.get("port"), + proxy_type=None if proxy is None else proxy.get("type"), ) def close(self): @@ -64,7 +66,7 @@ class Relay: self.connected = False if self.reconnect: time.sleep(1) - self.connect(self.ssl_options) + self.connect(self.ssl_options, self.proxy) def publish(self, message: str): if self.connected: