Don't broadcast events more than once

This commit is contained in:
Tony Giorgio
2023-02-18 23:10:45 -06:00
parent 0fca0de944
commit de5d42d62f
4 changed files with 132 additions and 34 deletions

View File

@@ -1,9 +1,10 @@
pub(crate) use crate::nostr::{
try_queue_client_msg, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5,
try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5,
NOSTR_QUEUE_6,
};
use ::nostr::{ClientMessage, Event};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use worker::*;
mod error;
@@ -18,6 +19,11 @@ fn log_request(req: &Request) {
);
}
#[derive(Serialize, Deserialize, Clone)]
pub struct PublishedNote {
date: String,
}
/// Main function for the Cloudflare Worker that triggers off of a HTTP req
#[event(fetch)]
pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
@@ -40,15 +46,60 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
match req.text().await {
Ok(request_text) => {
if let Ok(client_msg) = ClientMessage::from_json(request_text) {
let nostr_queues = vec![
ctx.env.queue(NOSTR_QUEUE).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"),
];
try_queue_client_msg(client_msg, nostr_queues).await
match client_msg {
ClientMessage::Event(event) => {
console_log!("got an event from client: {}", event.id);
// check if we've already published it before
let published_notes = ctx.kv("PUBLISHED_NOTES")?;
match published_notes
.get(event.id.to_string().as_str())
.json::<PublishedNote>()
.await?
{
Some(_) => {
console_log!("event already published: {}", event.id);
return fetch();
}
None => {}
};
// broadcast it to all queues
let nostr_queues = vec![
ctx.env.queue(NOSTR_QUEUE).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"),
];
try_queue_event(*event.clone(), nostr_queues).await;
console_log!("queued up nostr event: {}", event.id);
match published_notes
.put(
event.id.to_string().as_str(),
PublishedNote {
date: Date::now().to_string(),
},
)?
.execute()
.await
{
Ok(_) => {
console_log!("saved published note: {}", event.id);
}
Err(e) => {
console_log!(
"could not save published note: {} - {e:?}",
event.id
);
}
}
}
_ => {
console_log!("ignoring other nostr client message types");
}
}
}
}
Err(e) => {
@@ -74,15 +125,66 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
continue;
};
if let Ok(client_msg) = ClientMessage::from_json(msg.text().unwrap()) {
let nostr_queues = vec![
ctx.env.queue(NOSTR_QUEUE).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"),
];
try_queue_client_msg(client_msg, nostr_queues).await
match client_msg {
ClientMessage::Event(event) => {
console_log!("got an event from client: {}", event.id);
// check if we've already published it before
let published_notes =
ctx.kv("PUBLISHED_NOTES").expect("get kv");
match published_notes
.get(event.id.to_string().as_str())
.json::<PublishedNote>()
.await
.expect("lookup value")
{
Some(_) => {
console_log!(
"event already published: {}",
event.id
);
continue;
}
None => {}
};
// broadcast it to all queues
let nostr_queues = vec![
ctx.env.queue(NOSTR_QUEUE).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_2).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_3).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_4).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_5).expect("get queue"),
ctx.env.queue(NOSTR_QUEUE_6).expect("get queue"),
];
try_queue_event(*event.clone(), nostr_queues).await;
console_log!("queued up nostr event: {}", event.id);
match published_notes
.put(
event.id.to_string().as_str(),
PublishedNote {
date: Date::now().to_string(),
},
)
.expect("saved note")
.execute()
.await
{
Ok(_) => {
console_log!("saved published note: {}", event.id);
}
Err(e) => {
console_log!(
"could not save published note: {} - {e:?}",
event.id
);
}
}
}
_ => {
console_log!("ignoring other nostr client message types");
}
}
}
}
WebsocketEvent::Close(_) => {