Event batching for 48 relays at a time

This commit is contained in:
Tony Giorgio
2023-02-18 22:28:18 -06:00
parent 6e5baf326b
commit 7be6aa4f3e
3 changed files with 145 additions and 40 deletions

View File

@@ -1,4 +1,7 @@
pub(crate) use crate::nostr::{try_queue_client_msg, NOSTR_QUEUE}; pub(crate) use crate::nostr::{
try_queue_client_msg, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5,
NOSTR_QUEUE_6,
};
use ::nostr::{ClientMessage, Event}; use ::nostr::{ClientMessage, Event};
use futures::StreamExt; use futures::StreamExt;
use worker::*; use worker::*;
@@ -37,8 +40,15 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
match req.text().await { match req.text().await {
Ok(request_text) => { Ok(request_text) => {
if let Ok(client_msg) = ClientMessage::from_json(request_text) { if let Ok(client_msg) = ClientMessage::from_json(request_text) {
let nostr_queue = ctx.env.queue(NOSTR_QUEUE).expect("get queue"); let nostr_queues = vec![
try_queue_client_msg(client_msg, nostr_queue).await ctx.env.queue(NOSTR_QUEUE).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"),
];
try_queue_client_msg(client_msg, nostr_queues).await
} }
} }
Err(e) => { Err(e) => {
@@ -64,8 +74,15 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
continue; continue;
}; };
if let Ok(client_msg) = ClientMessage::from_json(msg.text().unwrap()) { if let Ok(client_msg) = ClientMessage::from_json(msg.text().unwrap()) {
let nostr_queue = ctx.env.queue(NOSTR_QUEUE).expect("get queue"); let nostr_queues = vec![
try_queue_client_msg(client_msg, nostr_queue).await ctx.env.queue(NOSTR_QUEUE).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"),
];
try_queue_client_msg(client_msg, nostr_queues).await
} }
} }
WebsocketEvent::Close(_) => { WebsocketEvent::Close(_) => {
@@ -86,9 +103,12 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
pub async fn main(message_batch: MessageBatch<Event>, _env: Env, _ctx: Context) -> Result<()> { pub async fn main(message_batch: MessageBatch<Event>, _env: Env, _ctx: Context) -> Result<()> {
// Deserialize the message batch // Deserialize the message batch
let messages: Vec<Message<Event>> = message_batch.messages()?; let messages: Vec<Message<Event>> = message_batch.messages()?;
let events: Vec<Event> = messages.iter().map(|m| m.body.clone()).collect(); let mut events: Vec<Event> = messages.iter().map(|m| m.body.clone()).collect();
events.sort();
events.dedup();
match nostr::send_nostr_events(events).await { let part = queue_number(message_batch.queue().as_str())?;
match nostr::send_nostr_events(events, part).await {
Ok(event_ids) => { Ok(event_ids) => {
for event_id in event_ids { for event_id in event_ids {
console_log!("Sent nostr event: {}", event_id) console_log!("Sent nostr event: {}", event_id)
@@ -102,6 +122,18 @@ pub async fn main(message_batch: MessageBatch<Event>, _env: Env, _ctx: Context)
Ok(()) Ok(())
} }
pub fn queue_number(batch_name: &str) -> Result<u32> {
match batch_name {
NOSTR_QUEUE => Ok(0),
NOSTR_QUEUE_2 => Ok(1),
NOSTR_QUEUE_3 => Ok(2),
NOSTR_QUEUE_4 => Ok(3),
NOSTR_QUEUE_5 => Ok(4),
NOSTR_QUEUE_6 => Ok(5),
_ => Err("unexpected queue".into()),
}
}
fn fetch() -> worker::Result<Response> { fn fetch() -> worker::Result<Response> {
Response::empty()?.with_cors(&cors()) Response::empty()?.with_cors(&cors())
} }

View File

@@ -1,10 +1,15 @@
use crate::error::Error; use crate::error::Error;
use futures::pin_mut; use futures::pin_mut;
use nostr::prelude::*; use nostr::prelude::*;
use std::time::Duration; use std::{time::Duration, vec};
use worker::{console_log, Cache, Delay, Fetch, Queue, Response, WebSocket}; use worker::{console_log, Cache, Delay, Fetch, Queue, Response, WebSocket};
pub(crate) const NOSTR_QUEUE: &str = "nostr-events-pub"; pub(crate) const NOSTR_QUEUE: &str = "nostr-events-pub";
pub(crate) const NOSTR_QUEUE_2: &str = "nostr-events-pub-2";
pub(crate) const NOSTR_QUEUE_3: &str = "nostr-events-pub-3";
pub(crate) const NOSTR_QUEUE_4: &str = "nostr-events-pub-4";
pub(crate) const NOSTR_QUEUE_5: &str = "nostr-events-pub-5";
pub(crate) const NOSTR_QUEUE_6: &str = "nostr-events-pub-6";
const RELAY_LIST_URL: &str = "https://api.nostr.watch/v1/online"; const RELAY_LIST_URL: &str = "https://api.nostr.watch/v1/online";
const RELAYS: [&str; 8] = [ const RELAYS: [&str; 8] = [
"wss://nostr.zebedee.cloud", "wss://nostr.zebedee.cloud",
@@ -17,40 +22,45 @@ const RELAYS: [&str; 8] = [
"wss://nostr.wine", "wss://nostr.wine",
]; ];
pub async fn try_queue_client_msg(client_msg: ClientMessage, nostr_queue: Queue) { pub async fn try_queue_client_msg(client_msg: ClientMessage, nostr_queues: Vec<Queue>) {
match client_msg { match client_msg {
ClientMessage::Event(event) => { ClientMessage::Event(event) => {
console_log!("got an event from client: {}", event.id); console_log!("got an event from client: {}", event.id);
for nostr_queue in nostr_queues.iter() {
match queue_nostr_event_with_queue(nostr_queue, *event.clone()).await { match queue_nostr_event_with_queue(nostr_queue, *event.clone()).await {
Ok(_) => { Ok(_) => {}
console_log!("queued up nostr event: {}", event.id)
}
Err(Error::WorkerError(e)) => { Err(Error::WorkerError(e)) => {
console_log!("worker error: {e}"); console_log!("worker error: {e}");
} }
} }
} }
console_log!("queued up nostr event: {}", event.id)
}
_ => { _ => {
console_log!("ignoring other nostr client message types"); console_log!("ignoring other nostr client message types");
} }
} }
} }
pub async fn queue_nostr_event_with_queue(nostr_queue: Queue, event: Event) -> Result<(), Error> { pub async fn queue_nostr_event_with_queue(nostr_queue: &Queue, event: Event) -> Result<(), Error> {
nostr_queue.send(&event).await?; nostr_queue.send(&event).await?;
Ok(()) Ok(())
} }
async fn send_event_to_relay(messages: Vec<ClientMessage>, relay: &str) -> Result<(), Error> { async fn send_event_to_relay(messages: Vec<ClientMessage>, relay: &str) -> Result<(), Error> {
console_log!("connecting to relay: {relay}"); match WebSocket::connect(relay.parse().unwrap()).await {
let ws = WebSocket::connect(relay.parse().unwrap()).await?; Ok(ws) => {
// It's important that we call this before we send our first message, otherwise we will // It's important that we call this before we send our first message, otherwise we will
// not have any event listeners on the socket to receive the echoed message. // not have any event listeners on the socket to receive the echoed message.
let _event_stream = ws.events()?; if let Some(e) = ws.events().err() {
console_log!("Error calling ws events from relay {relay}: {e:?}");
return Err(e.into());
}
ws.accept()?; if let Some(e) = ws.accept().err() {
console_log!("sending event to relay: {relay}"); console_log!("Error accepting ws from relay {relay}: {e:?}");
return Err(e.into());
}
for message in messages { for message in messages {
if let Some(e) = ws.send_with_str(message.as_json()).err() { if let Some(e) = ws.send_with_str(message.as_json()).err() {
@@ -61,11 +71,16 @@ async fn send_event_to_relay(messages: Vec<ClientMessage>, relay: &str) -> Resul
if let Some(_e) = ws.close::<String>(None, None).err() { if let Some(_e) = ws.close::<String>(None, None).err() {
console_log!("Error websocket to relay {relay}") console_log!("Error websocket to relay {relay}")
} }
}
Err(e) => {
console_log!("Error connecting to relay {relay}: {e:?}")
}
};
Ok(()) Ok(())
} }
pub async fn send_nostr_events(events: Vec<Event>) -> Result<Vec<EventId>, Error> { pub async fn send_nostr_events(events: Vec<Event>, part: u32) -> Result<Vec<EventId>, Error> {
let messages: Vec<ClientMessage> = events let messages: Vec<ClientMessage> = events
.iter() .iter()
.map(|e| ClientMessage::new_event(e.clone())) .map(|e| ClientMessage::new_event(e.clone()))
@@ -93,7 +108,7 @@ pub async fn send_nostr_events(events: Vec<Event>) -> Result<Vec<EventId>, Error
// Cache API respects Cache-Control headers. Setting s-max-age to 10 // Cache API respects Cache-Control headers. Setting s-max-age to 10
// will limit the response to be in cache for 10 seconds max // will limit the response to be in cache for 10 seconds max
resp.headers_mut().set("cache-control", "s-maxage=600")?; resp.headers_mut().set("cache-control", "s-maxage=1800")?;
cache.put(RELAY_LIST_URL, resp.cloned()?).await?; cache.put(RELAY_LIST_URL, resp.cloned()?).await?;
match resp.json::<Vec<String>>().await { match resp.json::<Vec<String>>().await {
Ok(r) => r, Ok(r) => r,
@@ -115,9 +130,10 @@ pub async fn send_nostr_events(events: Vec<Event>) -> Result<Vec<EventId>, Error
} }
} }
}; };
// find range of elements for this part
let sub_relays = get_sub_vec_range(relays, find_range_from_part(part));
let mut futures = Vec::new(); let mut futures = Vec::new();
for relay in relays.iter() { for relay in sub_relays.iter() {
let fut = send_event_to_relay(messages.clone(), relay); let fut = send_event_to_relay(messages.clone(), relay);
futures.push(fut); futures.push(fut);
} }
@@ -125,13 +141,25 @@ pub async fn send_nostr_events(events: Vec<Event>) -> Result<Vec<EventId>, Error
let sleep = delay(120_000); let sleep = delay(120_000);
pin_mut!(combined_futures); pin_mut!(combined_futures);
pin_mut!(sleep); pin_mut!(sleep);
console_log!("waiting for futures");
futures::future::select(combined_futures, sleep).await; futures::future::select(combined_futures, sleep).await;
console_log!("futures done");
Ok(events.iter().map(|e| e.id).collect()) Ok(events.iter().map(|e| e.id).collect())
} }
fn get_sub_vec_range(original: Vec<String>, range: (usize, usize)) -> Vec<String> {
let len = original.len();
if range.0 >= len {
return vec![];
}
let end = if range.1 >= len { len - 1 } else { range.1 };
original[range.0..end].to_vec()
}
fn find_range_from_part(part: u32) -> (usize, usize) {
let start = 48 * part;
let end = start + 47;
(start as usize, end as usize)
}
async fn delay(delay: u64) { async fn delay(delay: u64) {
let delay: Delay = Duration::from_millis(delay).into(); let delay: Delay = Duration::from_millis(delay).into();
delay.await; delay.await;

View File

@@ -15,10 +15,55 @@ WORKERS_RS_VERSION = "0.0.11"
queue = "nostr-events-pub" queue = "nostr-events-pub"
binding = "nostr-events-pub" binding = "nostr-events-pub"
[[queues.producers]]
queue = "nostr-events-pub-2"
binding = "nostr-events-pub-2"
[[queues.producers]]
queue = "nostr-events-pub-3"
binding = "nostr-events-pub-3"
[[queues.producers]]
queue = "nostr-events-pub-4"
binding = "nostr-events-pub-4"
[[queues.producers]]
queue = "nostr-events-pub-5"
binding = "nostr-events-pub-5"
[[queues.producers]]
queue = "nostr-events-pub-6"
binding = "nostr-events-pub-6"
[[queues.consumers]] [[queues.consumers]]
queue = "nostr-events-pub" queue = "nostr-events-pub"
max_batch_size = 10 # max events until triggered max_batch_size = 10
max_batch_timeout = 30 # max seconds until triggered max_batch_timeout = 30
[[queues.consumers]]
queue = "nostr-events-pub-2"
max_batch_size = 10
max_batch_timeout = 30
[[queues.consumers]]
queue = "nostr-events-pub-3"
max_batch_size = 10
max_batch_timeout = 30
[[queues.consumers]]
queue = "nostr-events-pub-4"
max_batch_size = 10
max_batch_timeout = 30
[[queues.consumers]]
queue = "nostr-events-pub-5"
max_batch_size = 10
max_batch_timeout = 30
[[queues.consumers]]
queue = "nostr-events-pub-6"
max_batch_size = 10
max_batch_timeout = 30
[build] [build]
command = "cargo install --git https://github.com/CathalMullan/workers-rs worker-build && worker-build --release" command = "cargo install --git https://github.com/CathalMullan/workers-rs worker-build && worker-build --release"