From 2e118ae01d0fb3c7c3b82213ea7e731daa11bf32 Mon Sep 17 00:00:00 2001 From: Tony Giorgio Date: Wed, 12 Jul 2023 15:11:41 -0500 Subject: [PATCH] Add consecutive filters Co-authored-by: Tony Giorgio --- .gitignore | 1 + Cargo.toml | 3 +- src/lib.rs | 130 ++++++++++++++++++++++++++++++++++++++++++----------- 3 files changed, 106 insertions(+), 28 deletions(-) diff --git a/.gitignore b/.gitignore index e5bfd1e..798ccf7 100644 --- a/.gitignore +++ b/.gitignore @@ -20,3 +20,4 @@ build/ /target /dist .dev.vars +.wrangler diff --git a/Cargo.toml b/Cargo.toml index 6fac34e..9e46f5b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -11,8 +11,9 @@ default = ["console_error_panic_hook"] [dependencies] cfg-if = "0.1.2" -worker = { version = "0.0.13", features = ["queue"] } +worker = { version = "0.0.13", features = ["queue"] } futures = "0.3.26" +futures-util = { version = "0.3", default-features = false } nostr = { version = "0.22.0", default-features = false, features = ["nip11"] } serde = { version = "^1.0", features = ["derive"] } diff --git a/src/lib.rs b/src/lib.rs index cd10026..68a0772 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -11,7 +11,10 @@ use ::nostr::{ ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId, Tag, TagKind, }; use futures::StreamExt; +use futures_util::lock::Mutex; use serde::{Deserialize, Serialize}; +use std::ops::DerefMut; +use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; use worker::*; @@ -181,6 +184,8 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { server.accept()?; console_log!("accepted websocket, about to spawn event stream"); wasm_bindgen_futures::spawn_local(async move { + let running_thread = Arc::new(AtomicBool::new(false)); + let requested_filters = Arc::new(Mutex::new(Filter::new())); let mut event_stream = server.events().expect("stream error"); console_log!("spawned event stream, waiting for first message.."); while let Some(event) = event_stream.next().await { @@ -307,6 +312,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { // by reading storage and sending any new events // one caveat is that this will send events multiple // times if they are in multiple filters + let mut valid = false; for filter in filters { let valid_nwc = { // has correct kinds @@ -324,36 +330,55 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { }; if valid_nwc { - let ctx_clone = ctx.clone(); - let sub_id = subscription_id.clone(); - let server_clone = server.clone(); - wasm_bindgen_futures::spawn_local(async move { - console_log!("Got NWC filter!"); - let mut sent_events = vec![]; - loop { - match handle_filter( - &sent_events, - sub_id.clone(), - filter.clone(), - &server_clone, - &ctx_clone, - ) - .await - { - Ok(new_event_ids) => { - // add new events to sent events - sent_events.extend(new_event_ids); - } - Err(e) => console_log!( + let mut master_guard = requested_filters.lock().await; + let master_filter = master_guard.deref_mut(); + // now add the new filters to the main filter + // object. This is a bit of a hack but we only + // check certain sub filters for NWC. + combine_filters(master_filter, &filter); + drop(master_guard); + valid = true; + } + } + + // only spin up a new one if there's not a + // spawn_local already going with filters + // when other filters are added in, it should + // be picked up in the master filter + if !running_thread.load(Ordering::Relaxed) && valid { + // set running thread to true + running_thread.swap(true, Ordering::Relaxed); + + let ctx_clone = ctx.clone(); + let sub_id = subscription_id.clone(); + let server_clone = server.clone(); + let master_clone = requested_filters.clone(); + wasm_bindgen_futures::spawn_local(async move { + console_log!("Got NWC filter!"); + let mut sent_events = vec![]; + loop { + let master = master_clone.lock().await; + console_log!("Looping through filter handling..."); + match handle_filter( + &sent_events, + sub_id.clone(), + master.clone(), + &server_clone, + &ctx_clone, + ).await + { + Ok(new_event_ids) => { + // add new events to sent events + sent_events.extend(new_event_ids); + } + Err(e) => console_log!( "error handling filter: {e}" ), - } - - utils::delay(10_000).await; } - }); - } - } { + utils::delay(10_000).await; + } + }); + } else { // if not a nwc filter, we just send EOSE let relay_msg = RelayMessage::new_eose(subscription_id); server @@ -637,6 +662,57 @@ pub async fn delete_nwc_response(event: &Event, ctx: &RouteContext<()>) -> Resul Ok(()) } +fn combine_filters(master_filter: &mut Filter, new_filter: &Filter) { + if let Some(vec) = &new_filter.ids { + master_filter + .ids + .get_or_insert_with(Vec::new) + .extend(vec.clone()); + } + if let Some(vec) = &new_filter.authors { + master_filter + .authors + .get_or_insert_with(Vec::new) + .extend(vec.clone()); + } + if let Some(vec) = &new_filter.kinds { + master_filter + .kinds + .get_or_insert_with(Vec::new) + .extend(vec.clone()); + } + if let Some(vec) = &new_filter.events { + master_filter + .events + .get_or_insert_with(Vec::new) + .extend(vec.clone()); + } + if let Some(vec) = &new_filter.pubkeys { + master_filter + .pubkeys + .get_or_insert_with(Vec::new) + .extend(vec.clone()); + } + if let Some(vec) = &new_filter.hashtags { + master_filter + .hashtags + .get_or_insert_with(Vec::new) + .extend(vec.clone()); + } + if let Some(vec) = &new_filter.references { + master_filter + .references + .get_or_insert_with(Vec::new) + .extend(vec.clone()); + } + if let Some(vec) = &new_filter.identifiers { + master_filter + .identifiers + .get_or_insert_with(Vec::new) + .extend(vec.clone()); + } +} + fn relay_response(msg: RelayMessage) -> worker::Result { Response::from_json(&msg)?.with_cors(&cors()) }