From eb0b5fd8aca9f119b2b8c8bfd4822149e312dbfc Mon Sep 17 00:00:00 2001 From: Tony Giorgio Date: Sun, 5 Mar 2023 16:32:10 -0600 Subject: [PATCH] Add more queues and delete some prev ones --- src/lib.rs | 16 ++++++++++++++ src/nostr.rs | 12 ++++++---- wrangler.toml | 61 +++++++++++++++++++++++++++++++++++++++++---------- 3 files changed, 73 insertions(+), 16 deletions(-) diff --git a/src/lib.rs b/src/lib.rs index f51d972..b969574 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,4 +1,8 @@ use crate::nostr::get_nip11_response; +use crate::nostr::NOSTR_QUEUE_10; +use crate::nostr::NOSTR_QUEUE_7; +use crate::nostr::NOSTR_QUEUE_8; +use crate::nostr::NOSTR_QUEUE_9; pub(crate) use crate::nostr::{ try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5, NOSTR_QUEUE_6, @@ -78,6 +82,10 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"), ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"), ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_7).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_8).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_9).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_10).expect("get queue"), ]; try_queue_event(*event.clone(), nostr_queues).await; console_log!("queued up nostr event: {}", event.id); @@ -187,6 +195,10 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"), ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"), ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_7).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_8).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_9).expect("get queue"), + ctx.env.queue(NOSTR_QUEUE_10).expect("get queue"), ]; try_queue_event(*event.clone(), nostr_queues).await; console_log!("queued up nostr event: {}", event.id); @@ -273,6 +285,10 @@ pub fn queue_number(batch_name: &str) -> Result { NOSTR_QUEUE_4 => Ok(3), NOSTR_QUEUE_5 => Ok(4), NOSTR_QUEUE_6 => Ok(5), + NOSTR_QUEUE_7 => Ok(6), + NOSTR_QUEUE_8 => Ok(7), + NOSTR_QUEUE_9 => Ok(8), + NOSTR_QUEUE_10 => Ok(9), _ => Err("unexpected queue".into()), } } diff --git a/src/nostr.rs b/src/nostr.rs index 7591d12..c3ffdac 100644 --- a/src/nostr.rs +++ b/src/nostr.rs @@ -8,12 +8,16 @@ use std::{time::Duration, vec}; use worker::WebsocketEvent; use worker::{console_log, Cache, Delay, Fetch, Queue, Response, WebSocket}; -pub(crate) const NOSTR_QUEUE: &str = "nostr-events-pub"; +pub(crate) const NOSTR_QUEUE: &str = "nostr-events-pub-1"; pub(crate) const NOSTR_QUEUE_2: &str = "nostr-events-pub-2"; pub(crate) const NOSTR_QUEUE_3: &str = "nostr-events-pub-3"; pub(crate) const NOSTR_QUEUE_4: &str = "nostr-events-pub-4"; pub(crate) const NOSTR_QUEUE_5: &str = "nostr-events-pub-5"; -pub(crate) const NOSTR_QUEUE_6: &str = "nostr-events-pub-6"; +pub(crate) const NOSTR_QUEUE_6: &str = "nostr-events-pub-6-a"; +pub(crate) const NOSTR_QUEUE_7: &str = "nostr-events-pub-7"; +pub(crate) const NOSTR_QUEUE_8: &str = "nostr-events-pub-8"; +pub(crate) const NOSTR_QUEUE_9: &str = "nostr-events-pub-9"; +pub(crate) const NOSTR_QUEUE_10: &str = "nostr-events-pub-10"; const RELAY_LIST_URL: &str = "https://api.nostr.watch/v1/online"; const RELAYS: [&str; 8] = [ "wss://nostr.zebedee.cloud", @@ -209,8 +213,8 @@ fn get_sub_vec_range(original: Vec, range: (usize, usize)) -> Vec (usize, usize) { - let start = 48 * part; - let end = start + 47; + let start = 30 * part; + let end = start + 29; (start as usize, end as usize) } diff --git a/wrangler.toml b/wrangler.toml index fd13e28..272a75d 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -16,8 +16,8 @@ kv_namespaces = [ WORKERS_RS_VERSION = "0.0.11" [[queues.producers]] - queue = "nostr-events-pub" - binding = "nostr-events-pub" + queue = "nostr-events-pub-1" + binding = "nostr-events-pub-1" [[queues.producers]] queue = "nostr-events-pub-2" @@ -36,37 +36,74 @@ WORKERS_RS_VERSION = "0.0.11" binding = "nostr-events-pub-5" [[queues.producers]] - queue = "nostr-events-pub-6" - binding = "nostr-events-pub-6" + queue = "nostr-events-pub-6-a" + binding = "nostr-events-pub-6-a" +[[queues.producers]] + queue = "nostr-events-pub-7" + binding = "nostr-events-pub-7" + +[[queues.producers]] + queue = "nostr-events-pub-8" + binding = "nostr-events-pub-8" + +[[queues.producers]] + queue = "nostr-events-pub-9" + binding = "nostr-events-pub-9" + +[[queues.producers]] + queue = "nostr-events-pub-10" + binding = "nostr-events-pub-10" + +# consumers [[queues.consumers]] - queue = "nostr-events-pub" + queue = "nostr-events-pub-1" max_batch_size = 15 - max_batch_timeout = 15 + max_batch_timeout = 5 # this is the best one, run quicker [[queues.consumers]] queue = "nostr-events-pub-2" - max_batch_size = 15 + max_batch_size = 20 max_batch_timeout = 15 [[queues.consumers]] queue = "nostr-events-pub-3" - max_batch_size = 15 + max_batch_size = 20 max_batch_timeout = 15 [[queues.consumers]] queue = "nostr-events-pub-4" - max_batch_size = 15 + max_batch_size = 20 max_batch_timeout = 15 [[queues.consumers]] queue = "nostr-events-pub-5" - max_batch_size = 15 + max_batch_size = 20 max_batch_timeout = 15 [[queues.consumers]] - queue = "nostr-events-pub-6" - max_batch_size = 15 + queue = "nostr-events-pub-6-a" + max_batch_size = 20 + max_batch_timeout = 15 + +[[queues.consumers]] + queue = "nostr-events-pub-7" + max_batch_size = 20 + max_batch_timeout = 15 + +[[queues.consumers]] + queue = "nostr-events-pub-8" + max_batch_size = 20 + max_batch_timeout = 15 + +[[queues.consumers]] + queue = "nostr-events-pub-9" + max_batch_size = 20 + max_batch_timeout = 15 + +[[queues.consumers]] + queue = "nostr-events-pub-10" + max_batch_size = 20 max_batch_timeout = 15 [build]