Finish handling filter merging

This commit is contained in:
Tony Giorgio
2023-08-09 15:08:22 -05:00
parent 4886de8de0
commit abd0ed24d7

View File

@@ -186,6 +186,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
console_log!("accepted websocket, about to spawn event stream"); console_log!("accepted websocket, about to spawn event stream");
wasm_bindgen_futures::spawn_local(async move { wasm_bindgen_futures::spawn_local(async move {
let running_thread = Arc::new(AtomicBool::new(false)); let running_thread = Arc::new(AtomicBool::new(false));
let new_subscription_req = Arc::new(AtomicBool::new(false));
let requested_filters = Arc::new(Mutex::new(Filter::new())); let requested_filters = Arc::new(Mutex::new(Filter::new()));
let mut event_stream = server.events().expect("stream error"); let mut event_stream = server.events().expect("stream error");
console_log!("spawned event stream, waiting for first message.."); console_log!("spawned event stream, waiting for first message..");
@@ -309,6 +310,8 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
subscription_id, subscription_id,
filters, filters,
} => { } => {
new_subscription_req.swap(true, Ordering::Relaxed);
console_log!("got a new client request sub: {}, len: {}", subscription_id, filters.len());
// for each filter we handle it every 10 seconds // for each filter we handle it every 10 seconds
// by reading storage and sending any new events // by reading storage and sending any new events
// one caveat is that this will send events multiple // one caveat is that this will send events multiple
@@ -337,7 +340,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
// object. This is a bit of a hack but we only // object. This is a bit of a hack but we only
// check certain sub filters for NWC. // check certain sub filters for NWC.
combine_filters(master_filter, &filter); combine_filters(master_filter, &filter);
drop(master_guard); console_log!("New filter count: {}", master_filter.pubkeys.as_ref().map_or(0, Vec::len));
valid = true; valid = true;
} }
} }
@@ -346,6 +349,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
// spawn_local already going with filters // spawn_local already going with filters
// when other filters are added in, it should // when other filters are added in, it should
// be picked up in the master filter // be picked up in the master filter
let mut sent_event_count = 0;
if !running_thread.load(Ordering::Relaxed) && valid { if !running_thread.load(Ordering::Relaxed) && valid {
// set running thread to true // set running thread to true
running_thread.swap(true, Ordering::Relaxed); running_thread.swap(true, Ordering::Relaxed);
@@ -354,12 +358,12 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
let sub_id = subscription_id.clone(); let sub_id = subscription_id.clone();
let server_clone = server.clone(); let server_clone = server.clone();
let master_clone = requested_filters.clone(); let master_clone = requested_filters.clone();
let new_subscription_req_clone = new_subscription_req.clone();
wasm_bindgen_futures::spawn_local(async move { wasm_bindgen_futures::spawn_local(async move {
console_log!("Got NWC filter!");
let mut sent_events = vec![]; let mut sent_events = vec![];
loop { loop {
let master = master_clone.lock().await; let master = master_clone.lock().await;
console_log!("Looping through filter handling..."); console_log!("Checking filters: {}", master.pubkeys.as_ref().map_or(0, Vec::len));
match handle_filter( match handle_filter(
&sent_events, &sent_events,
sub_id.clone(), sub_id.clone(),
@@ -371,15 +375,25 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
Ok(new_event_ids) => { Ok(new_event_ids) => {
// add new events to sent events // add new events to sent events
sent_events.extend(new_event_ids); sent_events.extend(new_event_ids);
// send EOSE if necessary
if new_subscription_req_clone.load(Ordering::Relaxed) || sent_event_count != sent_events.len() {
let relay_msg = RelayMessage::new_eose(sub_id.clone());
server_clone
.send_with_str(relay_msg.as_json())
.expect("failed to send response");
sent_event_count = sent_events.len();
new_subscription_req_clone.swap(false, Ordering::Relaxed);
}
} }
Err(e) => console_log!( Err(e) => console_log!(
"error handling filter: {e}" "error handling filter: {e}"
), ),
} }
utils::delay(10_000).await; drop(master);
utils::delay(5_000).await;
} }
}); });
} else { } else if !valid {
// if not a nwc filter, we just send EOSE // if not a nwc filter, we just send EOSE
let relay_msg = RelayMessage::new_eose(subscription_id); let relay_msg = RelayMessage::new_eose(subscription_id);
server server
@@ -521,12 +535,6 @@ pub async fn handle_filter(
} }
} }
console_log!("end of subscription request");
let relay_msg = RelayMessage::new_eose(subscription_id);
server
.send_with_str(relay_msg.as_json())
.expect("failed to send response");
let sent_event_ids: Vec<EventId> = events.into_iter().map(|e| e.id).collect(); let sent_event_ids: Vec<EventId> = events.into_iter().map(|e| e.id).collect();
Ok(sent_event_ids) Ok(sent_event_ids)
} }
@@ -663,54 +671,62 @@ pub async fn delete_nwc_response(event: &Event, ctx: &RouteContext<()>) -> Resul
Ok(()) Ok(())
} }
// Helper function to extend a vector without duplicates
fn extend_without_duplicates<T: PartialEq + Clone>(master: &mut Vec<T>, new: &Vec<T>) {
for item in new {
if !master.contains(item) {
master.push(item.clone());
}
}
}
fn combine_filters(master_filter: &mut Filter, new_filter: &Filter) { fn combine_filters(master_filter: &mut Filter, new_filter: &Filter) {
// Check and extend for IDs
if let Some(vec) = &new_filter.ids { if let Some(vec) = &new_filter.ids {
master_filter let master_vec = master_filter.ids.get_or_insert_with(Vec::new);
.ids extend_without_duplicates(master_vec, vec);
.get_or_insert_with(Vec::new)
.extend(vec.clone());
} }
// Check and extend for authors
if let Some(vec) = &new_filter.authors { if let Some(vec) = &new_filter.authors {
master_filter let master_vec = master_filter.authors.get_or_insert_with(Vec::new);
.authors extend_without_duplicates(master_vec, vec);
.get_or_insert_with(Vec::new)
.extend(vec.clone());
} }
// Check and extend for kinds
if let Some(vec) = &new_filter.kinds { if let Some(vec) = &new_filter.kinds {
master_filter let master_vec = master_filter.kinds.get_or_insert_with(Vec::new);
.kinds extend_without_duplicates(master_vec, vec);
.get_or_insert_with(Vec::new)
.extend(vec.clone());
} }
// Check and extend for events
if let Some(vec) = &new_filter.events { if let Some(vec) = &new_filter.events {
master_filter let master_vec = master_filter.events.get_or_insert_with(Vec::new);
.events extend_without_duplicates(master_vec, vec);
.get_or_insert_with(Vec::new)
.extend(vec.clone());
} }
// Check and extend for pubkeys
if let Some(vec) = &new_filter.pubkeys { if let Some(vec) = &new_filter.pubkeys {
master_filter let master_vec = master_filter.pubkeys.get_or_insert_with(Vec::new);
.pubkeys extend_without_duplicates(master_vec, vec);
.get_or_insert_with(Vec::new)
.extend(vec.clone());
} }
// Check and extend for hashtags
if let Some(vec) = &new_filter.hashtags { if let Some(vec) = &new_filter.hashtags {
master_filter let master_vec = master_filter.hashtags.get_or_insert_with(Vec::new);
.hashtags extend_without_duplicates(master_vec, vec);
.get_or_insert_with(Vec::new)
.extend(vec.clone());
} }
// Check and extend for references
if let Some(vec) = &new_filter.references { if let Some(vec) = &new_filter.references {
master_filter let master_vec = master_filter.references.get_or_insert_with(Vec::new);
.references extend_without_duplicates(master_vec, vec);
.get_or_insert_with(Vec::new)
.extend(vec.clone());
} }
// Check and extend for identifiers
if let Some(vec) = &new_filter.identifiers { if let Some(vec) = &new_filter.identifiers {
master_filter let master_vec = master_filter.identifiers.get_or_insert_with(Vec::new);
.identifiers extend_without_duplicates(master_vec, vec);
.get_or_insert_with(Vec::new)
.extend(vec.clone());
} }
} }