From 470afcf282953545639e1280adf69240c46a8138 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Tue, 13 Jun 2023 13:58:29 -0500 Subject: [PATCH] Rescan storage for new NWC events every 10 seconds --- src/lib.rs | 194 ++++++++++++++++++++++++++++++++++++--------------- src/nostr.rs | 17 +++-- src/utils.rs | 7 ++ 3 files changed, 151 insertions(+), 67 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index 0abcb39..cd10026 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,9 +7,12 @@ pub(crate) use crate::nostr::{ try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5, NOSTR_QUEUE_6, }; -use ::nostr::{ClientMessage, Event, Kind, RelayMessage, Tag, TagKind}; +use ::nostr::{ + ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId, Tag, TagKind, +}; use futures::StreamExt; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use worker::*; mod error; @@ -171,6 +174,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { return Response::from_json(&get_nip11_response())?.with_cors(&cors()); } + let ctx = Arc::new(ctx); // For websocket compatibility let pair = WebSocketPair::new()?; let server = pair.server; @@ -299,69 +303,63 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { subscription_id, filters, } => { - // if the user requests a NWC event, we have those stored, - // we should send them to the user - let mut events = vec![]; + // for each filter we handle it every 10 seconds + // by reading storage and sending any new events + // one caveat is that this will send events multiple + // times if they are in multiple filters for filter in filters { - // get all authors and pubkeys - let mut keys = filter.authors.unwrap_or_default(); - keys.extend( - filter - .pubkeys - .unwrap_or_default() - .into_iter() - .map(|p| p.to_string()), - ); + let valid_nwc = { + // has correct kinds + let kinds = filter.kinds.as_ref(); + ( + kinds + .unwrap_or(&vec![]) + .contains(&Kind::WalletConnectResponse) + || kinds.unwrap_or(&vec![]) + .contains(&Kind::WalletConnectRequest) + ) && + // has authors and pubkeys + !filter.authors.as_ref().unwrap_or(&vec![]).is_empty() && + !filter.pubkeys.as_ref().unwrap_or(&vec![]).is_empty() + }; - if filter - .kinds - .clone() - .unwrap_or_default() - .contains(&Kind::WalletConnectRequest) - { - let found_events = get_nwc_events( - &keys, - Kind::WalletConnectRequest, - &ctx, - ) - .await - .unwrap_or_default(); - events.extend(found_events); + if valid_nwc { + let ctx_clone = ctx.clone(); + let sub_id = subscription_id.clone(); + let server_clone = server.clone(); + wasm_bindgen_futures::spawn_local(async move { + console_log!("Got NWC filter!"); + let mut sent_events = vec![]; + loop { + match handle_filter( + &sent_events, + sub_id.clone(), + filter.clone(), + &server_clone, + &ctx_clone, + ) + .await + { + Ok(new_event_ids) => { + // add new events to sent events + sent_events.extend(new_event_ids); + } + Err(e) => console_log!( + "error handling filter: {e}" + ), + } + + utils::delay(10_000).await; + } + }); } - - if filter - .kinds - .unwrap_or_default() - .contains(&Kind::WalletConnectResponse) - { - let found_events = get_nwc_events( - &keys, - Kind::WalletConnectResponse, - &ctx, - ) - .await - .unwrap_or_default(); - events.extend(found_events); - } - } - - // send all found events to the user - for event in events { - console_log!("sending event to client: {}", &event.id); - let relay_msg = RelayMessage::new_event( - subscription_id.clone(), - event, - ); + } { + // if not a nwc filter, we just send EOSE + let relay_msg = RelayMessage::new_eose(subscription_id); server - .send_with_str(&relay_msg.as_json()) + .send_with_str(relay_msg.as_json()) .expect("failed to send response"); } - - console_log!("end of subscription request"); - let relay_msg = RelayMessage::new_eose(subscription_id); - server - .send_with_str(&relay_msg.as_json()) - .expect("failed to send response"); } _ => { console_log!("ignoring other nostr client message types"); @@ -427,6 +425,86 @@ pub fn queue_number(batch_name: &str) -> Result { } } +/// if the user requests a NWC event, we have those stored, +/// we should send them to the user +pub async fn handle_filter( + sent_events: &[EventId], + subscription_id: SubscriptionId, + filter: Filter, + server: &WebSocket, + ctx: &RouteContext<()>, +) -> Result> { + let mut events = vec![]; + // get all authors and pubkeys + let mut keys = filter.authors.unwrap_or_default(); + keys.extend( + filter + .pubkeys + .unwrap_or_default() + .into_iter() + .map(|p| p.to_string()), + ); + + if filter + .kinds + .clone() + .unwrap_or_default() + .contains(&Kind::WalletConnectRequest) + { + let mut found_events = get_nwc_events(&keys, Kind::WalletConnectRequest, ctx) + .await + .unwrap_or_default(); + + // filter out events that have already been sent + found_events.retain(|e| !sent_events.contains(&e.id)); + + events.extend(found_events); + } + + if filter + .kinds + .unwrap_or_default() + .contains(&Kind::WalletConnectResponse) + { + let mut found_events = get_nwc_events(&keys, Kind::WalletConnectResponse, ctx) + .await + .unwrap_or_default(); + + // filter out events that have already been sent + found_events.retain(|e| !sent_events.contains(&e.id)); + + events.extend(found_events); + } + + if events.is_empty() { + return Ok(vec![]); + } + + if !events.is_empty() { + // sort and dedup events + events.sort_by(|a, b| a.created_at.cmp(&b.created_at)); + events.dedup(); + + // send all found events to the user + for event in events.clone() { + console_log!("sending event to client: {}", &event.id); + let relay_msg = RelayMessage::new_event(subscription_id.clone(), event); + server + .send_with_str(&relay_msg.as_json()) + .expect("failed to send response"); + } + } + + console_log!("end of subscription request"); + let relay_msg = RelayMessage::new_eose(subscription_id); + server + .send_with_str(relay_msg.as_json()) + .expect("failed to send response"); + + let sent_event_ids: Vec = events.into_iter().map(|e| e.id).collect(); + Ok(sent_event_ids) +} + pub async fn get_nwc_events( keys: &[String], kind: Kind, diff --git a/src/nostr.rs b/src/nostr.rs index 7534a51..3202257 100644 --- a/src/nostr.rs +++ b/src/nostr.rs @@ -1,12 +1,13 @@ use crate::error::Error; +use crate::utils::delay; use futures::future::Either; use futures::pin_mut; use futures::StreamExt; use nostr::prelude::*; use std::string::ToString; -use std::{time::Duration, vec}; +use std::vec; use worker::WebsocketEvent; -use worker::{console_log, Cache, Delay, Fetch, Queue, Response, WebSocket}; +use worker::{console_log, Cache, Fetch, Queue, Response, WebSocket}; pub(crate) const NOSTR_QUEUE: &str = "nostr-events-pub-1-b"; pub(crate) const NOSTR_QUEUE_2: &str = "nostr-events-pub-2-b"; @@ -78,7 +79,10 @@ async fn send_event_to_relay(messages: Vec, relay: &str) -> Resul let ws_opt = match either { Either::Left((res, _)) => res, - Either::Right(_) => Err(worker::Error::RustError("Connection timeout".to_string())), + Either::Right(_) => { + console_log!("time delay hit, stopping..."); + Err(worker::Error::RustError("Connection timeout".to_string())) + } }; match ws_opt { @@ -121,6 +125,7 @@ async fn send_event_to_relay(messages: Vec, relay: &str) -> Resul } } Either::Right(_) => { + console_log!("time delay hit, stopping..."); // Sleep triggered before we got a websocket response } } @@ -217,9 +222,3 @@ fn find_range_from_part(part: u32) -> (usize, usize) { let end = start + 29; (start as usize, end as usize) } - -async fn delay(delay: u64) { - let delay: Delay = Duration::from_millis(delay).into(); - delay.await; - console_log!("time delay hit, stopping..."); -} diff --git a/src/utils.rs b/src/utils.rs index aab768f..8afb26b 100644 --- a/src/utils.rs +++ b/src/utils.rs @@ -1,4 +1,6 @@ use cfg_if::cfg_if; +use std::time::Duration; +use worker::Delay; cfg_if! { // https://github.com/rustwasm/console_error_panic_hook#readme @@ -10,3 +12,8 @@ cfg_if! { pub fn set_panic_hook() {} } } + +pub async fn delay(delay: u64) { + let delay: Delay = Duration::from_millis(delay).into(); + delay.await; +}