diff --git a/src/lib.rs b/src/lib.rs index 479ed94..76d20ed 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,7 @@ -pub(crate) use crate::nostr::{try_queue_client_msg, NOSTR_QUEUE}; +pub(crate) use crate::nostr::{ + try_queue_client_msg, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5, + NOSTR_QUEUE_6, +}; use ::nostr::{ClientMessage, Event}; use futures::StreamExt; use worker::*; @@ -37,8 +40,15 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { match req.text().await { Ok(request_text) => { if let Ok(client_msg) = ClientMessage::from_json(request_text) { - let nostr_queue = ctx.env.queue(NOSTR_QUEUE).expect("get queue"); - try_queue_client_msg(client_msg, nostr_queue).await + let nostr_queues = vec![ + ctx.env.queue(NOSTR_QUEUE).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"), + ]; + try_queue_client_msg(client_msg, nostr_queues).await } } Err(e) => { @@ -64,8 +74,15 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { continue; }; if let Ok(client_msg) = ClientMessage::from_json(msg.text().unwrap()) { - let nostr_queue = ctx.env.queue(NOSTR_QUEUE).expect("get queue"); - try_queue_client_msg(client_msg, nostr_queue).await + let nostr_queues = vec![ + ctx.env.queue(NOSTR_QUEUE).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"), + ]; + try_queue_client_msg(client_msg, nostr_queues).await } } WebsocketEvent::Close(_) => { @@ -86,9 +103,12 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { pub async fn main(message_batch: MessageBatch, _env: Env, _ctx: Context) -> Result<()> { // Deserialize the message batch let messages: Vec> = message_batch.messages()?; - let events: Vec = messages.iter().map(|m| m.body.clone()).collect(); + let mut events: Vec = messages.iter().map(|m| m.body.clone()).collect(); + events.sort(); + events.dedup(); - match nostr::send_nostr_events(events).await { + let part = queue_number(message_batch.queue().as_str())?; + match nostr::send_nostr_events(events, part).await { Ok(event_ids) => { for event_id in event_ids { console_log!("Sent nostr event: {}", event_id) @@ -102,6 +122,18 @@ pub async fn main(message_batch: MessageBatch, _env: Env, _ctx: Context) Ok(()) } +pub fn queue_number(batch_name: &str) -> Result { + match batch_name { + NOSTR_QUEUE => Ok(0), + NOSTR_QUEUE_2 => Ok(1), + NOSTR_QUEUE_3 => Ok(2), + NOSTR_QUEUE_4 => Ok(3), + NOSTR_QUEUE_5 => Ok(4), + NOSTR_QUEUE_6 => Ok(5), + _ => Err("unexpected queue".into()), + } +} + fn fetch() -> worker::Result { Response::empty()?.with_cors(&cors()) } diff --git a/src/nostr.rs b/src/nostr.rs index fea205f..ac35966 100644 --- a/src/nostr.rs +++ b/src/nostr.rs @@ -1,10 +1,15 @@ use crate::error::Error; use futures::pin_mut; use nostr::prelude::*; -use std::time::Duration; +use std::{time::Duration, vec}; use worker::{console_log, Cache, Delay, Fetch, Queue, Response, WebSocket}; pub(crate) const NOSTR_QUEUE: &str = "nostr-events-pub"; +pub(crate) const NOSTR_QUEUE_2: &str = "nostr-events-pub-2"; +pub(crate) const NOSTR_QUEUE_3: &str = "nostr-events-pub-3"; +pub(crate) const NOSTR_QUEUE_4: &str = "nostr-events-pub-4"; +pub(crate) const NOSTR_QUEUE_5: &str = "nostr-events-pub-5"; +pub(crate) const NOSTR_QUEUE_6: &str = "nostr-events-pub-6"; const RELAY_LIST_URL: &str = "https://api.nostr.watch/v1/online"; const RELAYS: [&str; 8] = [ "wss://nostr.zebedee.cloud", @@ -17,18 +22,19 @@ const RELAYS: [&str; 8] = [ "wss://nostr.wine", ]; -pub async fn try_queue_client_msg(client_msg: ClientMessage, nostr_queue: Queue) { +pub async fn try_queue_client_msg(client_msg: ClientMessage, nostr_queues: Vec) { match client_msg { ClientMessage::Event(event) => { console_log!("got an event from client: {}", event.id); - match queue_nostr_event_with_queue(nostr_queue, *event.clone()).await { - Ok(_) => { - console_log!("queued up nostr event: {}", event.id) - } - Err(Error::WorkerError(e)) => { - console_log!("worker error: {e}"); + for nostr_queue in nostr_queues.iter() { + match queue_nostr_event_with_queue(nostr_queue, *event.clone()).await { + Ok(_) => {} + Err(Error::WorkerError(e)) => { + console_log!("worker error: {e}"); + } } } + console_log!("queued up nostr event: {}", event.id) } _ => { console_log!("ignoring other nostr client message types"); @@ -36,36 +42,45 @@ pub async fn try_queue_client_msg(client_msg: ClientMessage, nostr_queue: Queue) } } -pub async fn queue_nostr_event_with_queue(nostr_queue: Queue, event: Event) -> Result<(), Error> { +pub async fn queue_nostr_event_with_queue(nostr_queue: &Queue, event: Event) -> Result<(), Error> { nostr_queue.send(&event).await?; Ok(()) } async fn send_event_to_relay(messages: Vec, relay: &str) -> Result<(), Error> { - console_log!("connecting to relay: {relay}"); - let ws = WebSocket::connect(relay.parse().unwrap()).await?; + match WebSocket::connect(relay.parse().unwrap()).await { + Ok(ws) => { + // It's important that we call this before we send our first message, otherwise we will + // not have any event listeners on the socket to receive the echoed message. + if let Some(e) = ws.events().err() { + console_log!("Error calling ws events from relay {relay}: {e:?}"); + return Err(e.into()); + } - // It's important that we call this before we send our first message, otherwise we will - // not have any event listeners on the socket to receive the echoed message. - let _event_stream = ws.events()?; + if let Some(e) = ws.accept().err() { + console_log!("Error accepting ws from relay {relay}: {e:?}"); + return Err(e.into()); + } - ws.accept()?; - console_log!("sending event to relay: {relay}"); + for message in messages { + if let Some(e) = ws.send_with_str(message.as_json()).err() { + console_log!("Error sending event to relay {relay}: {e:?}") + } + } - for message in messages { - if let Some(e) = ws.send_with_str(message.as_json()).err() { - console_log!("Error sending event to relay {relay}: {e:?}") + if let Some(_e) = ws.close::(None, None).err() { + console_log!("Error websocket to relay {relay}") + } } - } - - if let Some(_e) = ws.close::(None, None).err() { - console_log!("Error websocket to relay {relay}") - } + Err(e) => { + console_log!("Error connecting to relay {relay}: {e:?}") + } + }; Ok(()) } -pub async fn send_nostr_events(events: Vec) -> Result, Error> { +pub async fn send_nostr_events(events: Vec, part: u32) -> Result, Error> { let messages: Vec = events .iter() .map(|e| ClientMessage::new_event(e.clone())) @@ -93,7 +108,7 @@ pub async fn send_nostr_events(events: Vec) -> Result, Error // Cache API respects Cache-Control headers. Setting s-max-age to 10 // will limit the response to be in cache for 10 seconds max - resp.headers_mut().set("cache-control", "s-maxage=600")?; + resp.headers_mut().set("cache-control", "s-maxage=1800")?; cache.put(RELAY_LIST_URL, resp.cloned()?).await?; match resp.json::>().await { Ok(r) => r, @@ -115,9 +130,10 @@ pub async fn send_nostr_events(events: Vec) -> Result, Error } } }; - + // find range of elements for this part + let sub_relays = get_sub_vec_range(relays, find_range_from_part(part)); let mut futures = Vec::new(); - for relay in relays.iter() { + for relay in sub_relays.iter() { let fut = send_event_to_relay(messages.clone(), relay); futures.push(fut); } @@ -125,13 +141,25 @@ pub async fn send_nostr_events(events: Vec) -> Result, Error let sleep = delay(120_000); pin_mut!(combined_futures); pin_mut!(sleep); - - console_log!("waiting for futures"); futures::future::select(combined_futures, sleep).await; - console_log!("futures done"); Ok(events.iter().map(|e| e.id).collect()) } +fn get_sub_vec_range(original: Vec, range: (usize, usize)) -> Vec { + let len = original.len(); + if range.0 >= len { + return vec![]; + } + let end = if range.1 >= len { len - 1 } else { range.1 }; + original[range.0..end].to_vec() +} + +fn find_range_from_part(part: u32) -> (usize, usize) { + let start = 48 * part; + let end = start + 47; + (start as usize, end as usize) +} + async fn delay(delay: u64) { let delay: Delay = Duration::from_millis(delay).into(); delay.await; diff --git a/wrangler.toml b/wrangler.toml index 9226897..c01b706 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -15,10 +15,55 @@ WORKERS_RS_VERSION = "0.0.11" queue = "nostr-events-pub" binding = "nostr-events-pub" +[[queues.producers]] + queue = "nostr-events-pub-2" + binding = "nostr-events-pub-2" + +[[queues.producers]] + queue = "nostr-events-pub-3" + binding = "nostr-events-pub-3" + +[[queues.producers]] + queue = "nostr-events-pub-4" + binding = "nostr-events-pub-4" + +[[queues.producers]] + queue = "nostr-events-pub-5" + binding = "nostr-events-pub-5" + +[[queues.producers]] + queue = "nostr-events-pub-6" + binding = "nostr-events-pub-6" + [[queues.consumers]] queue = "nostr-events-pub" - max_batch_size = 10 # max events until triggered - max_batch_timeout = 30 # max seconds until triggered + max_batch_size = 10 + max_batch_timeout = 30 + +[[queues.consumers]] + queue = "nostr-events-pub-2" + max_batch_size = 10 + max_batch_timeout = 30 + +[[queues.consumers]] + queue = "nostr-events-pub-3" + max_batch_size = 10 + max_batch_timeout = 30 + +[[queues.consumers]] + queue = "nostr-events-pub-4" + max_batch_size = 10 + max_batch_timeout = 30 + +[[queues.consumers]] + queue = "nostr-events-pub-5" + max_batch_size = 10 + max_batch_timeout = 30 + +[[queues.consumers]] + queue = "nostr-events-pub-6" + max_batch_size = 10 + max_batch_timeout = 30 [build] command = "cargo install --git https://github.com/CathalMullan/workers-rs worker-build && worker-build --release"