Merge pull request #19 from MutinyWallet/nwc-relay

Store Nostr Wallet Connect Events
This commit is contained in:
Tony Giorgio
2023-05-26 11:15:00 -05:00
committed by GitHub
4 changed files with 204 additions and 5 deletions

View File

@@ -13,7 +13,7 @@ default = ["console_error_panic_hook"]
cfg-if = "0.1.2"
worker = { version = "0.0.13", features = ["queue"] }
futures = "0.3.26"
nostr = { version = "0.19.5", default-features = false, features = ["nip11"] }
nostr = { version = "0.22.0", default-features = false, features = ["nip11"] }
serde = { version = "^1.0", features = ["derive"] }
# The `console_error_panic_hook` crate provides better debugging of panics by

View File

@@ -41,9 +41,17 @@ Right now some of these are hardcoded for us since they have to map from the `wr
This doesn't rebroadcast events that have already been broadcasted before. So we have a KV for that.
We also have a KV for storing the NWC requests and responses to get around ephemeral events.
```
wrangler kv:namespace create PUBLISHED_NOTES
wrangler kv:namespace create PUBLISHED_NOTES --preview
wrangler kv:namespace create NWC_REQUESTS
wrangler kv:namespace create NWC_REQUESTS --preview
wrangler kv:namespace create NWC_RESPONSES
wrangler kv:namespace create NWC_RESPONSES --preview
```
#### Queues

View File

@@ -7,7 +7,7 @@ pub(crate) use crate::nostr::{
try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5,
NOSTR_QUEUE_6,
};
use ::nostr::{ClientMessage, Event, RelayMessage};
use ::nostr::{ClientMessage, Event, Kind, RelayMessage, Tag, TagKind};
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use worker::*;
@@ -93,6 +93,17 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
return relay_response(relay_msg);
};
// if the event is a nostr wallet connect event, we
// should save it and not send to other relays.
if let Some(relay_msg) =
handle_nwc_event(*event.clone(), &ctx).await?
{
if let Err(e) = delete_nwc_request(*event, &ctx).await {
console_log!("failed to delete nwc request: {e}");
}
return relay_response(relay_msg);
};
// broadcast it to all queues
let nostr_queues = vec![
ctx.env.queue(NOSTR_QUEUE).expect("get queue"),
@@ -224,6 +235,24 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
continue;
};
// if the event is a nostr wallet connect event, we
// should save it and not send to other relays.
if let Some(response) =
handle_nwc_event(*event.clone(), &ctx)
.await
.expect("failed to handle nwc event")
{
server
.send_with_str(&response.as_json())
.expect("failed to send response");
if let Err(e) = delete_nwc_request(*event, &ctx).await {
console_log!("failed to delete nwc request: {e}");
}
continue;
};
// broadcast it to all queues
let nostr_queues = vec![
ctx.env.queue(NOSTR_QUEUE).expect("get queue"),
@@ -267,9 +296,68 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
.expect("failed to send response");
}
ClientMessage::Req {
subscription_id, ..
subscription_id,
filters,
} => {
console_log!("ignoring nostr subscription request");
// if the user requests a NWC event, we have those stored,
// we should send them to the user
let mut events = vec![];
for filter in filters {
// get all authors and pubkeys
let mut keys = filter.authors.unwrap_or_default();
keys.extend(
filter
.pubkeys
.unwrap_or_default()
.into_iter()
.map(|p| p.to_string()),
);
if filter
.kinds
.clone()
.unwrap_or_default()
.contains(&Kind::WalletConnectRequest)
{
let found_events = get_nwc_events(
&keys,
Kind::WalletConnectRequest,
&ctx,
)
.await
.unwrap_or_default();
events.extend(found_events);
}
if filter
.kinds
.unwrap_or_default()
.contains(&Kind::WalletConnectResponse)
{
let found_events = get_nwc_events(
&keys,
Kind::WalletConnectResponse,
&ctx,
)
.await
.unwrap_or_default();
events.extend(found_events);
}
}
// send all found events to the user
for event in events {
console_log!("sending event to client: {}", &event.id);
let relay_msg = RelayMessage::new_event(
subscription_id.clone(),
event,
);
server
.send_with_str(&relay_msg.as_json())
.expect("failed to send response");
}
console_log!("end of subscription request");
let relay_msg = RelayMessage::new_eose(subscription_id);
server
.send_with_str(&relay_msg.as_json())
@@ -339,6 +427,107 @@ pub fn queue_number(batch_name: &str) -> Result<u32> {
}
}
pub async fn get_nwc_events(
keys: &[String],
kind: Kind,
ctx: &RouteContext<()>,
) -> Result<Vec<Event>> {
let kv_store = match kind {
Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?,
Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?,
_ => return Ok(vec![]), // skip other event types, todo, we may want to store info events as well
};
let mut events = vec![];
for key in keys {
let nwc_events = kv_store
.get(key)
.json::<Vec<Event>>()
.await?
.unwrap_or_default();
for nwc_event in nwc_events {
events.push(nwc_event);
}
}
Ok(events)
}
pub async fn handle_nwc_event(
event: Event,
ctx: &RouteContext<()>,
) -> Result<Option<RelayMessage>> {
let kv_store = match event.kind {
Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?,
Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?,
_ => return Ok(None), // skip other event types, todo, we may want to store info events as well
};
console_log!("got a wallet connect event: {}", event.id);
let key = &event.pubkey.to_string();
let new_nwc_responses = match kv_store.get(key).json::<Vec<Event>>().await {
Ok(Some(mut current)) => {
current.push(event.clone());
current
}
Ok(None) => vec![event.clone()],
Err(e) => {
console_log!("error getting nwc events from KV: {e}");
let relay_msg =
RelayMessage::new_ok(event.id, false, "error: could not save published note");
return Ok(Some(relay_msg));
}
};
// save new vector of events
if let Err(e) = kv_store.put(key, new_nwc_responses)?.execute().await {
console_log!("error saving nwc: {e}");
let relay_msg =
RelayMessage::new_ok(event.id, false, "error: could not save published note");
return Ok(Some(relay_msg));
}
console_log!("saved nwc event: {}", event.id);
let relay_msg = RelayMessage::new_ok(event.id, true, "");
Ok(Some(relay_msg))
}
/// When a NWC request has been fulfilled, delete the request from KV
pub async fn delete_nwc_request(event: Event, ctx: &RouteContext<()>) -> Result<()> {
let kv_store = match event.kind {
Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?,
_ => return Ok(()), // skip other event types
};
let p_tag = event.tags.iter().find(|t| t.kind() == TagKind::P).cloned();
let e_tag = event.tags.iter().find(|t| t.kind() == TagKind::E).cloned();
if let Some(Tag::PubKey(pubkey, ..)) = p_tag {
if let Some(Tag::Event(event_id, ..)) = e_tag {
let key = &pubkey.to_string();
match kv_store.get(key).json::<Vec<Event>>().await {
Ok(Some(current)) => {
let new_events: Vec<Event> =
current.into_iter().filter(|e| e.id != event_id).collect();
// save new vector of events
kv_store.put(key, new_events)?.execute().await?;
console_log!("deleted nwc event: {}", event_id);
}
Ok(None) => return Ok(()),
Err(e) => {
console_log!("error getting nwc events from KV: {e}");
return Ok(());
}
};
};
};
Ok(())
}
fn relay_response(msg: RelayMessage) -> worker::Result<Response> {
Response::from_json(&msg)?.with_cors(&cors())
}

View File

@@ -12,7 +12,9 @@ routes = [
# create the queues with `wrangler kv:namespace create PUBLISHED_NOTES` and the same command with the `--preview` flag.
# put your queue IDs below
kv_namespaces = [
{ binding = "PUBLISHED_NOTES", id = "afa24a392a5a41f6b1655507dfd9b97a", preview_id = "0b334aece8d74c3ab90e3e99db569ce8" }
{ binding = "PUBLISHED_NOTES", id = "afa24a392a5a41f6b1655507dfd9b97a", preview_id = "0b334aece8d74c3ab90e3e99db569ce8" },
{ binding = "NWC_REQUESTS", id = "e5bd788ddc16410bb108df0f2ae89e62", preview_id = "63d99f551a464ff78bbbbee7113cb658" },
{ binding = "NWC_RESPONSES", id = "5b434d5eced84abaad1c9a44448ac71c", preview_id = "af27b55b58754562b4250dcd3682547b" },
]
[vars]