diff --git a/src/lib.rs b/src/lib.rs index f791dfb..6832376 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -186,6 +186,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { console_log!("accepted websocket, about to spawn event stream"); wasm_bindgen_futures::spawn_local(async move { let running_thread = Arc::new(AtomicBool::new(false)); + let new_subscription_req = Arc::new(AtomicBool::new(false)); let requested_filters = Arc::new(Mutex::new(Filter::new())); let mut event_stream = server.events().expect("stream error"); console_log!("spawned event stream, waiting for first message.."); @@ -309,6 +310,8 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { subscription_id, filters, } => { + new_subscription_req.swap(true, Ordering::Relaxed); + console_log!("got a new client request sub: {}, len: {}", subscription_id, filters.len()); // for each filter we handle it every 10 seconds // by reading storage and sending any new events // one caveat is that this will send events multiple @@ -337,7 +340,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { // object. This is a bit of a hack but we only // check certain sub filters for NWC. combine_filters(master_filter, &filter); - drop(master_guard); + console_log!("New filter count: {}", master_filter.pubkeys.as_ref().map_or(0, Vec::len)); valid = true; } } @@ -346,6 +349,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { // spawn_local already going with filters // when other filters are added in, it should // be picked up in the master filter + let mut sent_event_count = 0; if !running_thread.load(Ordering::Relaxed) && valid { // set running thread to true running_thread.swap(true, Ordering::Relaxed); @@ -354,12 +358,12 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { let sub_id = subscription_id.clone(); let server_clone = server.clone(); let master_clone = requested_filters.clone(); + let new_subscription_req_clone = new_subscription_req.clone(); wasm_bindgen_futures::spawn_local(async move { - console_log!("Got NWC filter!"); let mut sent_events = vec![]; loop { let master = master_clone.lock().await; - console_log!("Looping through filter handling..."); + console_log!("Checking filters: {}", master.pubkeys.as_ref().map_or(0, Vec::len)); match handle_filter( &sent_events, sub_id.clone(), @@ -371,15 +375,25 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { Ok(new_event_ids) => { // add new events to sent events sent_events.extend(new_event_ids); + // send EOSE if necessary + if new_subscription_req_clone.load(Ordering::Relaxed) || sent_event_count != sent_events.len() { + let relay_msg = RelayMessage::new_eose(sub_id.clone()); + server_clone + .send_with_str(relay_msg.as_json()) + .expect("failed to send response"); + sent_event_count = sent_events.len(); + new_subscription_req_clone.swap(false, Ordering::Relaxed); + } } Err(e) => console_log!( "error handling filter: {e}" ), } - utils::delay(10_000).await; + drop(master); + utils::delay(5_000).await; } }); - } else { + } else if !valid { // if not a nwc filter, we just send EOSE let relay_msg = RelayMessage::new_eose(subscription_id); server @@ -521,12 +535,6 @@ pub async fn handle_filter( } } - console_log!("end of subscription request"); - let relay_msg = RelayMessage::new_eose(subscription_id); - server - .send_with_str(relay_msg.as_json()) - .expect("failed to send response"); - let sent_event_ids: Vec = events.into_iter().map(|e| e.id).collect(); Ok(sent_event_ids) } @@ -663,54 +671,62 @@ pub async fn delete_nwc_response(event: &Event, ctx: &RouteContext<()>) -> Resul Ok(()) } +// Helper function to extend a vector without duplicates +fn extend_without_duplicates(master: &mut Vec, new: &Vec) { + for item in new { + if !master.contains(item) { + master.push(item.clone()); + } + } +} + fn combine_filters(master_filter: &mut Filter, new_filter: &Filter) { + // Check and extend for IDs if let Some(vec) = &new_filter.ids { - master_filter - .ids - .get_or_insert_with(Vec::new) - .extend(vec.clone()); + let master_vec = master_filter.ids.get_or_insert_with(Vec::new); + extend_without_duplicates(master_vec, vec); } + + // Check and extend for authors if let Some(vec) = &new_filter.authors { - master_filter - .authors - .get_or_insert_with(Vec::new) - .extend(vec.clone()); + let master_vec = master_filter.authors.get_or_insert_with(Vec::new); + extend_without_duplicates(master_vec, vec); } + + // Check and extend for kinds if let Some(vec) = &new_filter.kinds { - master_filter - .kinds - .get_or_insert_with(Vec::new) - .extend(vec.clone()); + let master_vec = master_filter.kinds.get_or_insert_with(Vec::new); + extend_without_duplicates(master_vec, vec); } + + // Check and extend for events if let Some(vec) = &new_filter.events { - master_filter - .events - .get_or_insert_with(Vec::new) - .extend(vec.clone()); + let master_vec = master_filter.events.get_or_insert_with(Vec::new); + extend_without_duplicates(master_vec, vec); } + + // Check and extend for pubkeys if let Some(vec) = &new_filter.pubkeys { - master_filter - .pubkeys - .get_or_insert_with(Vec::new) - .extend(vec.clone()); + let master_vec = master_filter.pubkeys.get_or_insert_with(Vec::new); + extend_without_duplicates(master_vec, vec); } + + // Check and extend for hashtags if let Some(vec) = &new_filter.hashtags { - master_filter - .hashtags - .get_or_insert_with(Vec::new) - .extend(vec.clone()); + let master_vec = master_filter.hashtags.get_or_insert_with(Vec::new); + extend_without_duplicates(master_vec, vec); } + + // Check and extend for references if let Some(vec) = &new_filter.references { - master_filter - .references - .get_or_insert_with(Vec::new) - .extend(vec.clone()); + let master_vec = master_filter.references.get_or_insert_with(Vec::new); + extend_without_duplicates(master_vec, vec); } + + // Check and extend for identifiers if let Some(vec) = &new_filter.identifiers { - master_filter - .identifiers - .get_or_insert_with(Vec::new) - .extend(vec.clone()); + let master_vec = master_filter.identifiers.get_or_insert_with(Vec::new); + extend_without_duplicates(master_vec, vec); } }