From 9874c1e0076df6779bc6a9128f725754c3f86d78 Mon Sep 17 00:00:00 2001 From: benthecarman Date: Wed, 24 May 2023 17:38:56 -0500 Subject: [PATCH] Store Nostr Wallet Connect Events This makes blastr store nostr wallet connect events so we can use them without having to worry about them being epheremeral. --- Cargo.toml | 2 +- README.md | 8 +++ src/lib.rs | 193 +++++++++++++++++++++++++++++++++++++++++++++++++- wrangler.toml | 4 +- 4 files changed, 203 insertions(+), 4 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3693480..6fac34e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -13,7 +13,7 @@ default = ["console_error_panic_hook"] cfg-if = "0.1.2" worker = { version = "0.0.13", features = ["queue"] } futures = "0.3.26" -nostr = { version = "0.19.5", default-features = false, features = ["nip11"] } +nostr = { version = "0.22.0", default-features = false, features = ["nip11"] } serde = { version = "^1.0", features = ["derive"] } # The `console_error_panic_hook` crate provides better debugging of panics by diff --git a/README.md b/README.md index 9741aee..0755a5b 100644 --- a/README.md +++ b/README.md @@ -41,9 +41,17 @@ Right now some of these are hardcoded for us since they have to map from the `wr This doesn't rebroadcast events that have already been broadcasted before. So we have a KV for that. +We also have a KV for storing the NWC requests and responses to get around ephemeral events. + ``` wrangler kv:namespace create PUBLISHED_NOTES wrangler kv:namespace create PUBLISHED_NOTES --preview + +wrangler kv:namespace create NWC_REQUESTS +wrangler kv:namespace create NWC_REQUESTS --preview + +wrangler kv:namespace create NWC_RESPONSES +wrangler kv:namespace create NWC_RESPONSES --preview ``` #### Queues diff --git a/src/lib.rs b/src/lib.rs index eff4a5c..e6d09c6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -7,7 +7,7 @@ pub(crate) use crate::nostr::{ try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5, NOSTR_QUEUE_6, }; -use ::nostr::{ClientMessage, Event, RelayMessage}; +use ::nostr::{ClientMessage, Event, Kind, RelayMessage, Tag, TagKind}; use futures::StreamExt; use serde::{Deserialize, Serialize}; use worker::*; @@ -93,6 +93,17 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { return relay_response(relay_msg); }; + // if the event is a nostr wallet connect event, we + // should save it and not send to other relays. + if let Some(relay_msg) = + handle_nwc_event(*event.clone(), &ctx).await? + { + if let Err(e) = delete_nwc_request(*event, &ctx).await { + console_log!("failed to delete nwc request: {e}"); + } + return relay_response(relay_msg); + }; + // broadcast it to all queues let nostr_queues = vec![ ctx.env.queue(NOSTR_QUEUE).expect("get queue"), @@ -224,6 +235,24 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { continue; }; + // if the event is a nostr wallet connect event, we + // should save it and not send to other relays. + if let Some(response) = + handle_nwc_event(*event.clone(), &ctx) + .await + .expect("failed to handle nwc event") + { + server + .send_with_str(&response.as_json()) + .expect("failed to send response"); + + if let Err(e) = delete_nwc_request(*event, &ctx).await { + console_log!("failed to delete nwc request: {e}"); + } + + continue; + }; + // broadcast it to all queues let nostr_queues = vec![ ctx.env.queue(NOSTR_QUEUE).expect("get queue"), @@ -267,8 +296,67 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result { .expect("failed to send response"); } ClientMessage::Req { - subscription_id, .. + subscription_id, + filters, } => { + // if the user requests a NWC event, we have those stored, + // we should send them to the user + let mut events = vec![]; + for filter in filters { + // get all authors and pubkeys + let mut keys = filter.authors.unwrap_or_default(); + keys.extend( + filter + .pubkeys + .unwrap_or_default() + .into_iter() + .map(|p| p.to_string()), + ); + + if filter + .kinds + .clone() + .unwrap_or_default() + .contains(&Kind::WalletConnectRequest) + { + let found_events = get_nwc_events( + &keys, + Kind::WalletConnectRequest, + &ctx, + ) + .await + .unwrap_or_default(); + events.extend(found_events); + } + + if filter + .kinds + .unwrap_or_default() + .contains(&Kind::WalletConnectResponse) + { + let found_events = get_nwc_events( + &keys, + Kind::WalletConnectResponse, + &ctx, + ) + .await + .unwrap_or_default(); + events.extend(found_events); + } + } + + // send all found evens to the user + for event in events { + console_log!("sending event to client: {}", &event.id); + let relay_msg = RelayMessage::new_event( + subscription_id.clone(), + event, + ); + server + .send_with_str(&relay_msg.as_json()) + .expect("failed to send response"); + } + console_log!("ignoring nostr subscription request"); let relay_msg = RelayMessage::new_eose(subscription_id); server @@ -339,6 +427,107 @@ pub fn queue_number(batch_name: &str) -> Result { } } +pub async fn get_nwc_events( + keys: &[String], + kind: Kind, + ctx: &RouteContext<()>, +) -> Result> { + let kv_store = match kind { + Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?, + Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?, + _ => return Ok(vec![]), // skip other event types, todo, we may want to store info events as well + }; + + let mut events = vec![]; + for key in keys { + let nwc_events = kv_store + .get(key) + .json::>() + .await? + .unwrap_or_default(); + for nwc_event in nwc_events { + events.push(nwc_event); + } + } + + Ok(events) +} + +pub async fn handle_nwc_event( + event: Event, + ctx: &RouteContext<()>, +) -> Result> { + let kv_store = match event.kind { + Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?, + Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?, + _ => return Ok(None), // skip other event types, todo, we may want to store info events as well + }; + + console_log!("got a wallet connect event: {}", event.id); + + let key = &event.pubkey.to_string(); + + let new_nwc_responses = match kv_store.get(key).json::>().await { + Ok(Some(mut current)) => { + current.push(event.clone()); + current + } + Ok(None) => vec![event.clone()], + Err(e) => { + console_log!("error getting nwc events from KV: {e}"); + let relay_msg = + RelayMessage::new_ok(event.id, false, "error: could not save published note"); + return Ok(Some(relay_msg)); + } + }; + + // save new vector of events + if let Err(e) = kv_store.put(key, new_nwc_responses)?.execute().await { + console_log!("error saving nwc: {e}"); + let relay_msg = + RelayMessage::new_ok(event.id, false, "error: could not save published note"); + return Ok(Some(relay_msg)); + } + console_log!("saved nwc event: {}", event.id); + + let relay_msg = RelayMessage::new_ok(event.id, true, ""); + Ok(Some(relay_msg)) +} + +/// When a NWC request has been fulfilled, delete the request from KV +pub async fn delete_nwc_request(event: Event, ctx: &RouteContext<()>) -> Result<()> { + let kv_store = match event.kind { + Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?, + _ => return Ok(()), // skip other event types + }; + + let p_tag = event.tags.iter().find(|t| t.kind() == TagKind::P).cloned(); + let e_tag = event.tags.iter().find(|t| t.kind() == TagKind::E).cloned(); + + if let Some(Tag::PubKey(pubkey, ..)) = p_tag { + if let Some(Tag::Event(event_id, ..)) = e_tag { + let key = &pubkey.to_string(); + match kv_store.get(key).json::>().await { + Ok(Some(current)) => { + let new_events: Vec = + current.into_iter().filter(|e| e.id != event_id).collect(); + + // save new vector of events + kv_store.put(key, new_events)?.execute().await?; + console_log!("deleted nwc event: {}", event_id); + } + Ok(None) => return Ok(()), + Err(e) => { + console_log!("error getting nwc events from KV: {e}"); + return Ok(()); + } + }; + }; + }; + + Ok(()) +} + fn relay_response(msg: RelayMessage) -> worker::Result { Response::from_json(&msg)?.with_cors(&cors()) } diff --git a/wrangler.toml b/wrangler.toml index 1ef894a..83b945b 100644 --- a/wrangler.toml +++ b/wrangler.toml @@ -12,7 +12,9 @@ routes = [ # create the queues with `wrangler kv:namespace create PUBLISHED_NOTES` and the same command with the `--preview` flag. # put your queue IDs below kv_namespaces = [ - { binding = "PUBLISHED_NOTES", id = "afa24a392a5a41f6b1655507dfd9b97a", preview_id = "0b334aece8d74c3ab90e3e99db569ce8" } + { binding = "PUBLISHED_NOTES", id = "afa24a392a5a41f6b1655507dfd9b97a", preview_id = "0b334aece8d74c3ab90e3e99db569ce8" }, + { binding = "NWC_REQUESTS", id = "e5bd788ddc16410bb108df0f2ae89e62", preview_id = "63d99f551a464ff78bbbbee7113cb658" }, + { binding = "NWC_RESPONSES", id = "5b434d5eced84abaad1c9a44448ac71c", preview_id = "af27b55b58754562b4250dcd3682547b" }, ] [vars]