diff --git a/Cargo.toml b/Cargo.toml index 89ff01a..3f069a3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,6 +15,7 @@ cfg-if = "0.1.2" worker = { git = "https://github.com/cloudflare/workers-rs", rev = "b4b9cd1f15feac412b6f9e9e9209458cd3b98430", features = ["queue"] } futures = "0.3.26" nostr = { git = "https://github.com/rust-nostr/nostr", rev = "c333544b1e58b89acb786695d5ca08759ebf5e69" } +serde = { version = "^1.0", features = ["derive"] } # The `console_error_panic_hook` crate provides better debugging of panics by # logging them with `console.error`. This is great for development, but requires diff --git a/src/lib.rs b/src/lib.rs index 76d20ed..503f593 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,10 @@ pub(crate) use crate::nostr::{ - try_queue_client_msg, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5, + try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5, NOSTR_QUEUE_6, }; use ::nostr::{ClientMessage, Event}; use futures::StreamExt; +use serde::{Deserialize, Serialize}; use worker::*; mod error; @@ -18,6 +19,11 @@ fn log_request(req: &Request) { ); } +#[derive(Serialize, Deserialize, Clone)] +pub struct PublishedNote { + date: String, +} + /// Main function for the Cloudflare Worker that triggers off of a HTTP req #[event(fetch)] pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { @@ -40,15 +46,60 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { match req.text().await { Ok(request_text) => { if let Ok(client_msg) = ClientMessage::from_json(request_text) { - let nostr_queues = vec![ - ctx.env.queue(NOSTR_QUEUE).expect("get queue"), - ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"), - ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"), - ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"), - ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"), - ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"), - ]; - try_queue_client_msg(client_msg, nostr_queues).await + match client_msg { + ClientMessage::Event(event) => { + console_log!("got an event from client: {}", event.id); + + // check if we've already published it before + let published_notes = ctx.kv("PUBLISHED_NOTES")?; + match published_notes + .get(event.id.to_string().as_str()) + .json::() + .await? + { + Some(_) => { + console_log!("event already published: {}", event.id); + return fetch(); + } + None => {} + }; + + // broadcast it to all queues + let nostr_queues = vec![ + ctx.env.queue(NOSTR_QUEUE).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"), + ]; + try_queue_event(*event.clone(), nostr_queues).await; + console_log!("queued up nostr event: {}", event.id); + match published_notes + .put( + event.id.to_string().as_str(), + PublishedNote { + date: Date::now().to_string(), + }, + )? + .execute() + .await + { + Ok(_) => { + console_log!("saved published note: {}", event.id); + } + Err(e) => { + console_log!( + "could not save published note: {} - {e:?}", + event.id + ); + } + } + } + _ => { + console_log!("ignoring other nostr client message types"); + } + } } } Err(e) => { @@ -74,15 +125,66 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { continue; }; if let Ok(client_msg) = ClientMessage::from_json(msg.text().unwrap()) { - let nostr_queues = vec![ - ctx.env.queue(NOSTR_QUEUE).expect("get queue"), - ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"), - ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"), - ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"), - ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"), - ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"), - ]; - try_queue_client_msg(client_msg, nostr_queues).await + match client_msg { + ClientMessage::Event(event) => { + console_log!("got an event from client: {}", event.id); + + // check if we've already published it before + let published_notes = + ctx.kv("PUBLISHED_NOTES").expect("get kv"); + match published_notes + .get(event.id.to_string().as_str()) + .json::() + .await + .expect("lookup value") + { + Some(_) => { + console_log!( + "event already published: {}", + event.id + ); + continue; + } + None => {} + }; + + // broadcast it to all queues + let nostr_queues = vec![ + ctx.env.queue(NOSTR_QUEUE).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"), + ]; + try_queue_event(*event.clone(), nostr_queues).await; + console_log!("queued up nostr event: {}", event.id); + match published_notes + .put( + event.id.to_string().as_str(), + PublishedNote { + date: Date::now().to_string(), + }, + ) + .expect("saved note") + .execute() + .await + { + Ok(_) => { + console_log!("saved published note: {}", event.id); + } + Err(e) => { + console_log!( + "could not save published note: {} - {e:?}", + event.id + ); + } + } + } + _ => { + console_log!("ignoring other nostr client message types"); + } + } } } WebsocketEvent::Close(_) => { diff --git a/src/nostr.rs b/src/nostr.rs index c25e1d0..397129e 100644 --- a/src/nostr.rs +++ b/src/nostr.rs @@ -22,22 +22,13 @@ const RELAYS: [&str; 8] = [ "wss://nostr.wine", ]; -pub async fn try_queue_client_msg(client_msg: ClientMessage, nostr_queues: Vec) { - match client_msg { - ClientMessage::Event(event) => { - console_log!("got an event from client: {}", event.id); - for nostr_queue in nostr_queues.iter() { - match queue_nostr_event_with_queue(nostr_queue, *event.clone()).await { - Ok(_) => {} - Err(Error::WorkerError(e)) => { - console_log!("worker error: {e}"); - } - } +pub async fn try_queue_event(event: Event, nostr_queues: Vec) { + for nostr_queue in nostr_queues.iter() { + match queue_nostr_event_with_queue(nostr_queue, event.clone()).await { + Ok(_) => {} + Err(Error::WorkerError(e)) => { + console_log!("worker error: {e}"); } - console_log!("queued up nostr event: {}", event.id) - } - _ => { - console_log!("ignoring other nostr client message types"); } } } diff --git a/wrangler.toml b/wrangler.toml index c01b706..58028af 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -8,6 +8,10 @@ routes = [ { pattern = "nostr.mutinywallet.com/event", zone_id = "2b9268714ce8d1c4431e8046d4ba55d3" } ] +kv_namespaces = [ + { binding = "PUBLISHED_NOTES", id = "afa24a392a5a41f6b1655507dfd9b97a", preview_id = "0b334aece8d74c3ab90e3e99db569ce8" } +] + [vars] WORKERS_RS_VERSION = "0.0.11"