mirror of
https://github.com/aljazceru/blastr.git
synced 2025-12-17 05:54:26 +01:00
Add consecutive filters
Co-authored-by: Tony Giorgio <tonygiorgio@protonmail.com>
This commit is contained in:
committed by
benthecarman
parent
9b6c079636
commit
2e118ae01d
1
.gitignore
vendored
1
.gitignore
vendored
@@ -20,3 +20,4 @@ build/
|
|||||||
/target
|
/target
|
||||||
/dist
|
/dist
|
||||||
.dev.vars
|
.dev.vars
|
||||||
|
.wrangler
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ default = ["console_error_panic_hook"]
|
|||||||
cfg-if = "0.1.2"
|
cfg-if = "0.1.2"
|
||||||
worker = { version = "0.0.13", features = ["queue"] }
|
worker = { version = "0.0.13", features = ["queue"] }
|
||||||
futures = "0.3.26"
|
futures = "0.3.26"
|
||||||
|
futures-util = { version = "0.3", default-features = false }
|
||||||
nostr = { version = "0.22.0", default-features = false, features = ["nip11"] }
|
nostr = { version = "0.22.0", default-features = false, features = ["nip11"] }
|
||||||
serde = { version = "^1.0", features = ["derive"] }
|
serde = { version = "^1.0", features = ["derive"] }
|
||||||
|
|
||||||
|
|||||||
88
src/lib.rs
88
src/lib.rs
@@ -11,7 +11,10 @@ use ::nostr::{
|
|||||||
ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId, Tag, TagKind,
|
ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId, Tag, TagKind,
|
||||||
};
|
};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
|
use futures_util::lock::Mutex;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::ops::DerefMut;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use worker::*;
|
use worker::*;
|
||||||
|
|
||||||
@@ -181,6 +184,8 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
|
|||||||
server.accept()?;
|
server.accept()?;
|
||||||
console_log!("accepted websocket, about to spawn event stream");
|
console_log!("accepted websocket, about to spawn event stream");
|
||||||
wasm_bindgen_futures::spawn_local(async move {
|
wasm_bindgen_futures::spawn_local(async move {
|
||||||
|
let running_thread = Arc::new(AtomicBool::new(false));
|
||||||
|
let requested_filters = Arc::new(Mutex::new(Filter::new()));
|
||||||
let mut event_stream = server.events().expect("stream error");
|
let mut event_stream = server.events().expect("stream error");
|
||||||
console_log!("spawned event stream, waiting for first message..");
|
console_log!("spawned event stream, waiting for first message..");
|
||||||
while let Some(event) = event_stream.next().await {
|
while let Some(event) = event_stream.next().await {
|
||||||
@@ -307,6 +312,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
|
|||||||
// by reading storage and sending any new events
|
// by reading storage and sending any new events
|
||||||
// one caveat is that this will send events multiple
|
// one caveat is that this will send events multiple
|
||||||
// times if they are in multiple filters
|
// times if they are in multiple filters
|
||||||
|
let mut valid = false;
|
||||||
for filter in filters {
|
for filter in filters {
|
||||||
let valid_nwc = {
|
let valid_nwc = {
|
||||||
// has correct kinds
|
// has correct kinds
|
||||||
@@ -324,21 +330,42 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
|
|||||||
};
|
};
|
||||||
|
|
||||||
if valid_nwc {
|
if valid_nwc {
|
||||||
|
let mut master_guard = requested_filters.lock().await;
|
||||||
|
let master_filter = master_guard.deref_mut();
|
||||||
|
// now add the new filters to the main filter
|
||||||
|
// object. This is a bit of a hack but we only
|
||||||
|
// check certain sub filters for NWC.
|
||||||
|
combine_filters(master_filter, &filter);
|
||||||
|
drop(master_guard);
|
||||||
|
valid = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// only spin up a new one if there's not a
|
||||||
|
// spawn_local already going with filters
|
||||||
|
// when other filters are added in, it should
|
||||||
|
// be picked up in the master filter
|
||||||
|
if !running_thread.load(Ordering::Relaxed) && valid {
|
||||||
|
// set running thread to true
|
||||||
|
running_thread.swap(true, Ordering::Relaxed);
|
||||||
|
|
||||||
let ctx_clone = ctx.clone();
|
let ctx_clone = ctx.clone();
|
||||||
let sub_id = subscription_id.clone();
|
let sub_id = subscription_id.clone();
|
||||||
let server_clone = server.clone();
|
let server_clone = server.clone();
|
||||||
|
let master_clone = requested_filters.clone();
|
||||||
wasm_bindgen_futures::spawn_local(async move {
|
wasm_bindgen_futures::spawn_local(async move {
|
||||||
console_log!("Got NWC filter!");
|
console_log!("Got NWC filter!");
|
||||||
let mut sent_events = vec![];
|
let mut sent_events = vec![];
|
||||||
loop {
|
loop {
|
||||||
|
let master = master_clone.lock().await;
|
||||||
|
console_log!("Looping through filter handling...");
|
||||||
match handle_filter(
|
match handle_filter(
|
||||||
&sent_events,
|
&sent_events,
|
||||||
sub_id.clone(),
|
sub_id.clone(),
|
||||||
filter.clone(),
|
master.clone(),
|
||||||
&server_clone,
|
&server_clone,
|
||||||
&ctx_clone,
|
&ctx_clone,
|
||||||
)
|
).await
|
||||||
.await
|
|
||||||
{
|
{
|
||||||
Ok(new_event_ids) => {
|
Ok(new_event_ids) => {
|
||||||
// add new events to sent events
|
// add new events to sent events
|
||||||
@@ -348,12 +375,10 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
|
|||||||
"error handling filter: {e}"
|
"error handling filter: {e}"
|
||||||
),
|
),
|
||||||
}
|
}
|
||||||
|
|
||||||
utils::delay(10_000).await;
|
utils::delay(10_000).await;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
} else {
|
||||||
} {
|
|
||||||
// if not a nwc filter, we just send EOSE
|
// if not a nwc filter, we just send EOSE
|
||||||
let relay_msg = RelayMessage::new_eose(subscription_id);
|
let relay_msg = RelayMessage::new_eose(subscription_id);
|
||||||
server
|
server
|
||||||
@@ -637,6 +662,57 @@ pub async fn delete_nwc_response(event: &Event, ctx: &RouteContext<()>) -> Resul
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn combine_filters(master_filter: &mut Filter, new_filter: &Filter) {
|
||||||
|
if let Some(vec) = &new_filter.ids {
|
||||||
|
master_filter
|
||||||
|
.ids
|
||||||
|
.get_or_insert_with(Vec::new)
|
||||||
|
.extend(vec.clone());
|
||||||
|
}
|
||||||
|
if let Some(vec) = &new_filter.authors {
|
||||||
|
master_filter
|
||||||
|
.authors
|
||||||
|
.get_or_insert_with(Vec::new)
|
||||||
|
.extend(vec.clone());
|
||||||
|
}
|
||||||
|
if let Some(vec) = &new_filter.kinds {
|
||||||
|
master_filter
|
||||||
|
.kinds
|
||||||
|
.get_or_insert_with(Vec::new)
|
||||||
|
.extend(vec.clone());
|
||||||
|
}
|
||||||
|
if let Some(vec) = &new_filter.events {
|
||||||
|
master_filter
|
||||||
|
.events
|
||||||
|
.get_or_insert_with(Vec::new)
|
||||||
|
.extend(vec.clone());
|
||||||
|
}
|
||||||
|
if let Some(vec) = &new_filter.pubkeys {
|
||||||
|
master_filter
|
||||||
|
.pubkeys
|
||||||
|
.get_or_insert_with(Vec::new)
|
||||||
|
.extend(vec.clone());
|
||||||
|
}
|
||||||
|
if let Some(vec) = &new_filter.hashtags {
|
||||||
|
master_filter
|
||||||
|
.hashtags
|
||||||
|
.get_or_insert_with(Vec::new)
|
||||||
|
.extend(vec.clone());
|
||||||
|
}
|
||||||
|
if let Some(vec) = &new_filter.references {
|
||||||
|
master_filter
|
||||||
|
.references
|
||||||
|
.get_or_insert_with(Vec::new)
|
||||||
|
.extend(vec.clone());
|
||||||
|
}
|
||||||
|
if let Some(vec) = &new_filter.identifiers {
|
||||||
|
master_filter
|
||||||
|
.identifiers
|
||||||
|
.get_or_insert_with(Vec::new)
|
||||||
|
.extend(vec.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn relay_response(msg: RelayMessage) -> worker::Result<Response> {
|
fn relay_response(msg: RelayMessage) -> worker::Result<Response> {
|
||||||
Response::from_json(&msg)?.with_cors(&cors())
|
Response::from_json(&msg)?.with_cors(&cors())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user