mirror of
https://github.com/aljazceru/blastr.git
synced 2025-12-17 05:54:26 +01:00
Implement nostr event db
This commit is contained in:
@@ -12,11 +12,12 @@ default = ["console_error_panic_hook"]
|
|||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
cfg-if = "0.1.2"
|
cfg-if = "0.1.2"
|
||||||
worker = { version = "0.0.18", features = ["queue"] }
|
worker = { version = "0.0.18", features = ["queue", "d1"] }
|
||||||
futures = "0.3.26"
|
futures = "0.3.26"
|
||||||
futures-util = { version = "0.3", default-features = false }
|
futures-util = { version = "0.3", default-features = false }
|
||||||
nostr = { version = "0.22.0", default-features = false, features = ["nip11"] }
|
nostr = { version = "0.22.0", default-features = false, features = ["nip11"] }
|
||||||
serde = { version = "^1.0", features = ["derive"] }
|
serde = { version = "^1.0", features = ["derive"] }
|
||||||
|
serde_json = "1.0.67"
|
||||||
|
|
||||||
# The `console_error_panic_hook` crate provides better debugging of panics by
|
# The `console_error_panic_hook` crate provides better debugging of panics by
|
||||||
# logging them with `console.error`. This is great for development, but requires
|
# logging them with `console.error`. This is great for development, but requires
|
||||||
|
|||||||
15
migrations/0000_events.sql
Normal file
15
migrations/0000_events.sql
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
-- Migration number: 0000 2023-08-10T16:43:44.275Z
|
||||||
|
DROP TABLE IF EXISTS event;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS event (
|
||||||
|
id BLOB PRIMARY KEY,
|
||||||
|
created_at INTEGER NOT NULL,
|
||||||
|
pubkey BLOB NOT NULL,
|
||||||
|
kind INTEGER NOT NULL,
|
||||||
|
content TEXT NOT NULL,
|
||||||
|
sig BLOB NOT NULL,
|
||||||
|
deleted INTEGER NOT NULL DEFAULT 0
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS pubkey_index ON event(pubkey);
|
||||||
|
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);
|
||||||
15
migrations/0001_tag.sql
Normal file
15
migrations/0001_tag.sql
Normal file
@@ -0,0 +1,15 @@
|
|||||||
|
-- Migration number: 0001 2023-08-11T20:22:11.351Z
|
||||||
|
DROP TABLE IF EXISTS tag;
|
||||||
|
|
||||||
|
CREATE TABLE IF NOT EXISTS tag (
|
||||||
|
id INTEGER PRIMARY KEY,
|
||||||
|
event_id BLOB NOT NULL,
|
||||||
|
name TEXT NOT NULL,
|
||||||
|
value TEXT,
|
||||||
|
FOREIGN KEY(event_id) REFERENCES event(id) ON DELETE CASCADE
|
||||||
|
UNIQUE(event_id, name, value)
|
||||||
|
);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS tag_event_index ON tag(event_id);
|
||||||
|
CREATE INDEX IF NOT EXISTS tag_name_index ON tag(name);
|
||||||
|
CREATE INDEX IF NOT EXISTS tag_value_index ON tag(value);
|
||||||
262
src/db.rs
Normal file
262
src/db.rs
Normal file
@@ -0,0 +1,262 @@
|
|||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use ::nostr::{Event, Kind, RelayMessage};
|
||||||
|
use nostr::{EventId, Tag, TagKind, Timestamp};
|
||||||
|
use serde::Deserialize;
|
||||||
|
use serde_json::Value;
|
||||||
|
use wasm_bindgen::prelude::JsValue;
|
||||||
|
use worker::D1Database;
|
||||||
|
use worker::*;
|
||||||
|
|
||||||
|
#[derive(Debug, Deserialize)]
|
||||||
|
struct EventRow {
|
||||||
|
id: EventId,
|
||||||
|
pubkey: nostr::secp256k1::XOnlyPublicKey,
|
||||||
|
created_at: Timestamp,
|
||||||
|
kind: Kind,
|
||||||
|
tags: Option<Vec<Tag>>,
|
||||||
|
content: String,
|
||||||
|
sig: nostr::secp256k1::schnorr::Signature,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Deserialize)]
|
||||||
|
struct TagRow {
|
||||||
|
event_id: EventId,
|
||||||
|
name: String,
|
||||||
|
value: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_nwc_events(keys: &[String], kind: Kind, db: &D1Database) -> Result<Vec<Event>> {
|
||||||
|
// Determine the event kind
|
||||||
|
match kind {
|
||||||
|
Kind::WalletConnectResponse => (),
|
||||||
|
Kind::WalletConnectRequest => (),
|
||||||
|
_ => return Ok(vec![]), // skip other event types
|
||||||
|
};
|
||||||
|
|
||||||
|
console_log!("querying for ({keys:?}) and {}", kind.as_u32());
|
||||||
|
|
||||||
|
// Query for the events first, without the tags
|
||||||
|
let placeholders: String = keys.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
|
||||||
|
let query_str = format!(
|
||||||
|
r#"
|
||||||
|
SELECT * FROM event
|
||||||
|
WHERE pubkey IN ({}) AND kind = ? AND deleted = 0
|
||||||
|
"#,
|
||||||
|
placeholders
|
||||||
|
);
|
||||||
|
let mut stmt = db.prepare(&query_str);
|
||||||
|
let mut bindings = Vec::with_capacity(keys.len() + 1); // +1 for the kind afterwards
|
||||||
|
for key in keys.iter() {
|
||||||
|
bindings.push(JsValue::from_str(key));
|
||||||
|
}
|
||||||
|
bindings.push(JsValue::from_f64(kind.as_u32() as f64));
|
||||||
|
stmt = stmt.bind(&bindings)?;
|
||||||
|
|
||||||
|
let result = stmt.all().await.map_err(|e| {
|
||||||
|
console_log!("Failed to fetch nwc events: {}", e);
|
||||||
|
format!("Failed to fetch nwc events: {}", e)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let mut events: Vec<Event> = result
|
||||||
|
.results::<Value>()?
|
||||||
|
.iter()
|
||||||
|
.map(|row| {
|
||||||
|
let e: EventRow = serde_json::from_value(row.clone()).map_err(|e| {
|
||||||
|
console_log!("failed to parse event: {}", e);
|
||||||
|
worker::Error::from(format!(
|
||||||
|
"Failed to deserialize event from row ({}): {}",
|
||||||
|
row, e
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
Ok(Event {
|
||||||
|
id: e.id,
|
||||||
|
pubkey: e.pubkey,
|
||||||
|
created_at: e.created_at,
|
||||||
|
kind: e.kind,
|
||||||
|
tags: e.tags.unwrap_or_default(),
|
||||||
|
content: e.content,
|
||||||
|
sig: e.sig,
|
||||||
|
})
|
||||||
|
})
|
||||||
|
.collect::<Result<Vec<Event>>>()?;
|
||||||
|
|
||||||
|
// Now get all the tags for all the events found
|
||||||
|
let event_ids: Vec<EventId> = events.iter().map(|e| e.id).collect();
|
||||||
|
let placeholders: String = event_ids.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
|
||||||
|
let tag_query_str = format!(
|
||||||
|
r#"
|
||||||
|
SELECT event_id, name, value FROM tag
|
||||||
|
WHERE event_id IN ({})
|
||||||
|
ORDER BY id ASC
|
||||||
|
"#,
|
||||||
|
placeholders
|
||||||
|
);
|
||||||
|
let mut tag_stmt = db.prepare(&tag_query_str);
|
||||||
|
let bindings: Vec<JsValue> = event_ids
|
||||||
|
.iter()
|
||||||
|
.map(|id| JsValue::from_str(&id.to_string()))
|
||||||
|
.collect();
|
||||||
|
tag_stmt = tag_stmt.bind(&bindings)?;
|
||||||
|
|
||||||
|
let tag_result = tag_stmt
|
||||||
|
.all()
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Failed to fetch tags: {}", e))?;
|
||||||
|
|
||||||
|
let tags: Vec<TagRow> = tag_result.results::<TagRow>()?;
|
||||||
|
let mut tags_map: HashMap<EventId, Vec<Tag>> = HashMap::new();
|
||||||
|
|
||||||
|
for tag_row in tags {
|
||||||
|
if let Ok(tag) = Tag::parse(vec![tag_row.name, tag_row.value]) {
|
||||||
|
tags_map
|
||||||
|
.entry(tag_row.event_id)
|
||||||
|
.or_insert_with(Vec::new)
|
||||||
|
.push(tag);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for event in &mut events {
|
||||||
|
if let Some(tags) = tags_map.remove(&event.id) {
|
||||||
|
event.tags.extend(tags);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tag ordering could screw up signature, though it shouldn't matter
|
||||||
|
// for NWC messages because those should only have one tag.
|
||||||
|
// Also we insert tags in order and do an ORDER BY id so it should be fine.
|
||||||
|
events.retain(|event| match event.verify() {
|
||||||
|
Ok(_) => true,
|
||||||
|
Err(e) => {
|
||||||
|
console_log!("Verification failed for event with id {}: {}", event.id, e);
|
||||||
|
false
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(events)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn handle_nwc_event(event: Event, db: &D1Database) -> Result<Option<RelayMessage>> {
|
||||||
|
// Create the main event insertion query.
|
||||||
|
let event_insert_query = worker::query!(
|
||||||
|
db,
|
||||||
|
r#"
|
||||||
|
INSERT OR IGNORE INTO event (id, created_at, pubkey, kind, content, sig)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?)
|
||||||
|
"#,
|
||||||
|
&event.id,
|
||||||
|
&event.created_at,
|
||||||
|
&event.pubkey,
|
||||||
|
&event.kind,
|
||||||
|
&event.content,
|
||||||
|
&event.sig
|
||||||
|
)?;
|
||||||
|
|
||||||
|
// Create a vector of tag insertion queries.
|
||||||
|
let mut tag_insert_queries: Vec<_> = event
|
||||||
|
.tags
|
||||||
|
.iter()
|
||||||
|
.map(|tag| {
|
||||||
|
worker::query!(
|
||||||
|
db,
|
||||||
|
r#"
|
||||||
|
INSERT OR IGNORE INTO tag (event_id, name, value)
|
||||||
|
VALUES (?, ?, ?)
|
||||||
|
"#,
|
||||||
|
&event.id,
|
||||||
|
&tag.kind().to_string(),
|
||||||
|
&tag.as_vec().get(1)
|
||||||
|
)
|
||||||
|
.expect("should compile query")
|
||||||
|
})
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
// Combine the main event and tag insertion queries.
|
||||||
|
let mut batch_queries = vec![event_insert_query];
|
||||||
|
batch_queries.append(&mut tag_insert_queries);
|
||||||
|
|
||||||
|
// Run the batch queries.
|
||||||
|
let mut results = db.batch(batch_queries).await?.into_iter();
|
||||||
|
|
||||||
|
// Check the result of the main event insertion.
|
||||||
|
if let Some(error_msg) = results.next().and_then(|res| res.error()) {
|
||||||
|
console_log!("error saving nwc event to event table: {}", error_msg);
|
||||||
|
let relay_msg = RelayMessage::new_ok(event.id, false, &error_msg);
|
||||||
|
return Ok(Some(relay_msg));
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check the results for the tag insertions.
|
||||||
|
for tag_insert_result in results {
|
||||||
|
if let Some(error_msg) = tag_insert_result.error() {
|
||||||
|
console_log!("error saving tag to tag table: {}", error_msg);
|
||||||
|
let relay_msg = RelayMessage::new_ok(event.id, false, &error_msg);
|
||||||
|
return Ok(Some(relay_msg));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let relay_msg = RelayMessage::new_ok(event.id, true, "");
|
||||||
|
Ok(Some(relay_msg))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When a NWC request has been fulfilled, soft delete the request from the database
|
||||||
|
pub async fn delete_nwc_request(event: Event, db: &D1Database) -> Result<()> {
|
||||||
|
// Filter only relevant events
|
||||||
|
match event.kind {
|
||||||
|
Kind::WalletConnectResponse => (),
|
||||||
|
_ => return Ok(()), // skip other event types
|
||||||
|
};
|
||||||
|
|
||||||
|
let p_tag = event.tags.iter().find(|t| t.kind() == TagKind::P).cloned();
|
||||||
|
let e_tag = event.tags.iter().find(|t| t.kind() == TagKind::E).cloned();
|
||||||
|
|
||||||
|
if let Some(Tag::PubKey(pubkey, ..)) = p_tag {
|
||||||
|
if let Some(Tag::Event(event_id, ..)) = e_tag {
|
||||||
|
// Soft delete the event based on pubkey and event_id
|
||||||
|
match worker::query!(
|
||||||
|
db,
|
||||||
|
"UPDATE event SET deleted = 1 WHERE pubkey = ? AND id = ?",
|
||||||
|
&pubkey.to_string(),
|
||||||
|
&event_id
|
||||||
|
)?
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => (),
|
||||||
|
Err(e) => {
|
||||||
|
console_log!("error soft deleting nwc event from database: {e}");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// When a NWC response has been fulfilled, soft delete the response from the database
|
||||||
|
pub async fn delete_nwc_response(event: &Event, db: &D1Database) -> Result<()> {
|
||||||
|
// Filter only relevant events
|
||||||
|
match event.kind {
|
||||||
|
Kind::WalletConnectResponse => (),
|
||||||
|
_ => return Ok(()), // skip other event types
|
||||||
|
};
|
||||||
|
|
||||||
|
// Soft delete the event based on pubkey and id
|
||||||
|
match worker::query!(
|
||||||
|
db,
|
||||||
|
"UPDATE event SET deleted = 1 WHERE pubkey = ? AND id = ?",
|
||||||
|
&event.pubkey.to_string(),
|
||||||
|
&event.id
|
||||||
|
)?
|
||||||
|
.run()
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
Ok(_) => console_log!("soft deleted nwc response event: {}", event.id),
|
||||||
|
Err(e) => {
|
||||||
|
console_log!("error soft deleting nwc event from database: {e}");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
165
src/lib.rs
165
src/lib.rs
@@ -1,15 +1,13 @@
|
|||||||
use crate::nostr::get_nip11_response;
|
|
||||||
use crate::nostr::NOSTR_QUEUE_10;
|
|
||||||
use crate::nostr::NOSTR_QUEUE_7;
|
|
||||||
use crate::nostr::NOSTR_QUEUE_8;
|
use crate::nostr::NOSTR_QUEUE_8;
|
||||||
use crate::nostr::NOSTR_QUEUE_9;
|
use crate::nostr::NOSTR_QUEUE_9;
|
||||||
pub(crate) use crate::nostr::{
|
pub(crate) use crate::nostr::{
|
||||||
try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5,
|
try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5,
|
||||||
NOSTR_QUEUE_6,
|
NOSTR_QUEUE_6,
|
||||||
};
|
};
|
||||||
use ::nostr::{
|
use crate::{db::delete_nwc_request, nostr::NOSTR_QUEUE_10};
|
||||||
ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId, Tag, TagKind,
|
use crate::{db::get_nwc_events, nostr::NOSTR_QUEUE_7};
|
||||||
};
|
use crate::{db::handle_nwc_event, nostr::get_nip11_response};
|
||||||
|
use ::nostr::{ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId};
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use futures_util::lock::Mutex;
|
use futures_util::lock::Mutex;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
@@ -19,6 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use worker::*;
|
use worker::*;
|
||||||
|
|
||||||
|
mod db;
|
||||||
mod error;
|
mod error;
|
||||||
mod nostr;
|
mod nostr;
|
||||||
mod utils;
|
mod utils;
|
||||||
@@ -113,12 +112,14 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
|
|||||||
return relay_response(relay_msg);
|
return relay_response(relay_msg);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let db = ctx.d1("DB")?;
|
||||||
|
|
||||||
// if the event is a nostr wallet connect event, we
|
// if the event is a nostr wallet connect event, we
|
||||||
// should save it and not send to other relays.
|
// should save it and not send to other relays.
|
||||||
if let Some(relay_msg) =
|
if let Some(relay_msg) =
|
||||||
handle_nwc_event(*event.clone(), &ctx).await?
|
handle_nwc_event(*event.clone(), &db).await?
|
||||||
{
|
{
|
||||||
if let Err(e) = delete_nwc_request(*event, &ctx).await {
|
if let Err(e) = delete_nwc_request(*event, &db).await {
|
||||||
console_log!("failed to delete nwc request: {e}");
|
console_log!("failed to delete nwc request: {e}");
|
||||||
}
|
}
|
||||||
return relay_response(relay_msg);
|
return relay_response(relay_msg);
|
||||||
@@ -274,10 +275,12 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
|
|||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let db = ctx.d1("DB").expect("should have DB");
|
||||||
|
|
||||||
// if the event is a nostr wallet connect event, we
|
// if the event is a nostr wallet connect event, we
|
||||||
// should save it and not send to other relays.
|
// should save it and not send to other relays.
|
||||||
if let Some(response) =
|
if let Some(response) =
|
||||||
handle_nwc_event(*event.clone(), &ctx)
|
handle_nwc_event(*event.clone(), &db)
|
||||||
.await
|
.await
|
||||||
.expect("failed to handle nwc event")
|
.expect("failed to handle nwc event")
|
||||||
{
|
{
|
||||||
@@ -285,7 +288,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
|
|||||||
.send_with_str(&response.as_json())
|
.send_with_str(&response.as_json())
|
||||||
.expect("failed to send response");
|
.expect("failed to send response");
|
||||||
|
|
||||||
if let Err(e) = delete_nwc_request(*event, &ctx).await {
|
if let Err(e) = delete_nwc_request(*event, &db).await {
|
||||||
console_log!("failed to delete nwc request: {e}");
|
console_log!("failed to delete nwc request: {e}");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -382,7 +385,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
|
|||||||
// set running thread to true
|
// set running thread to true
|
||||||
running_thread.swap(true, Ordering::Relaxed);
|
running_thread.swap(true, Ordering::Relaxed);
|
||||||
|
|
||||||
let ctx_clone = ctx.clone();
|
let db = ctx.d1("DB").expect("should have DB");
|
||||||
let sub_id = subscription_id.clone();
|
let sub_id = subscription_id.clone();
|
||||||
let server_clone = server.clone();
|
let server_clone = server.clone();
|
||||||
let master_clone = requested_filters.clone();
|
let master_clone = requested_filters.clone();
|
||||||
@@ -397,7 +400,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
|
|||||||
sub_id.clone(),
|
sub_id.clone(),
|
||||||
master.clone(),
|
master.clone(),
|
||||||
&server_clone,
|
&server_clone,
|
||||||
&ctx_clone,
|
&db,
|
||||||
).await
|
).await
|
||||||
{
|
{
|
||||||
Ok(new_event_ids) => {
|
Ok(new_event_ids) => {
|
||||||
@@ -500,7 +503,7 @@ pub async fn handle_filter(
|
|||||||
subscription_id: SubscriptionId,
|
subscription_id: SubscriptionId,
|
||||||
filter: Filter,
|
filter: Filter,
|
||||||
server: &WebSocket,
|
server: &WebSocket,
|
||||||
ctx: &RouteContext<()>,
|
db: &D1Database,
|
||||||
) -> Result<Vec<EventId>> {
|
) -> Result<Vec<EventId>> {
|
||||||
let mut events = vec![];
|
let mut events = vec![];
|
||||||
// get all authors and pubkeys
|
// get all authors and pubkeys
|
||||||
@@ -519,7 +522,7 @@ pub async fn handle_filter(
|
|||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.contains(&Kind::WalletConnectRequest)
|
.contains(&Kind::WalletConnectRequest)
|
||||||
{
|
{
|
||||||
let mut found_events = get_nwc_events(&keys, Kind::WalletConnectRequest, ctx)
|
let mut found_events = get_nwc_events(&keys, Kind::WalletConnectRequest, db)
|
||||||
.await
|
.await
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
@@ -534,7 +537,7 @@ pub async fn handle_filter(
|
|||||||
.unwrap_or_default()
|
.unwrap_or_default()
|
||||||
.contains(&Kind::WalletConnectResponse)
|
.contains(&Kind::WalletConnectResponse)
|
||||||
{
|
{
|
||||||
let mut found_events = get_nwc_events(&keys, Kind::WalletConnectResponse, ctx)
|
let mut found_events = get_nwc_events(&keys, Kind::WalletConnectResponse, db)
|
||||||
.await
|
.await
|
||||||
.unwrap_or_default();
|
.unwrap_or_default();
|
||||||
|
|
||||||
@@ -567,138 +570,6 @@ pub async fn handle_filter(
|
|||||||
Ok(sent_event_ids)
|
Ok(sent_event_ids)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn get_nwc_events(
|
|
||||||
keys: &[String],
|
|
||||||
kind: Kind,
|
|
||||||
ctx: &RouteContext<()>,
|
|
||||||
) -> Result<Vec<Event>> {
|
|
||||||
let kv_store = match kind {
|
|
||||||
Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?,
|
|
||||||
Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?,
|
|
||||||
_ => return Ok(vec![]), // skip other event types, todo, we may want to store info events as well
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut events = vec![];
|
|
||||||
for key in keys {
|
|
||||||
let nwc_events = kv_store
|
|
||||||
.get(key)
|
|
||||||
.json::<Vec<Event>>()
|
|
||||||
.await?
|
|
||||||
.unwrap_or_default();
|
|
||||||
for nwc_event in nwc_events {
|
|
||||||
// delete responses since we don't care after sending
|
|
||||||
if kind == Kind::WalletConnectResponse {
|
|
||||||
if let Err(e) = delete_nwc_response(&nwc_event, ctx).await {
|
|
||||||
console_log!("failed to delete nwc response: {e}");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
events.push(nwc_event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(events)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn handle_nwc_event(
|
|
||||||
event: Event,
|
|
||||||
ctx: &RouteContext<()>,
|
|
||||||
) -> Result<Option<RelayMessage>> {
|
|
||||||
let kv_store = match event.kind {
|
|
||||||
Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?,
|
|
||||||
Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?,
|
|
||||||
_ => return Ok(None), // skip other event types, todo, we may want to store info events as well
|
|
||||||
};
|
|
||||||
|
|
||||||
console_log!("got a wallet connect event: {}", event.id);
|
|
||||||
|
|
||||||
let key = &event.pubkey.to_string();
|
|
||||||
|
|
||||||
let new_nwc_responses = match kv_store.get(key).json::<Vec<Event>>().await {
|
|
||||||
Ok(Some(mut current)) => {
|
|
||||||
current.push(event.clone());
|
|
||||||
current
|
|
||||||
}
|
|
||||||
Ok(None) => vec![event.clone()],
|
|
||||||
Err(e) => {
|
|
||||||
console_log!("error getting nwc events from KV: {e}");
|
|
||||||
let relay_msg =
|
|
||||||
RelayMessage::new_ok(event.id, false, "error: could not save published note");
|
|
||||||
return Ok(Some(relay_msg));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
// save new vector of events
|
|
||||||
if let Err(e) = kv_store.put(key, new_nwc_responses)?.execute().await {
|
|
||||||
console_log!("error saving nwc: {e}");
|
|
||||||
let relay_msg =
|
|
||||||
RelayMessage::new_ok(event.id, false, "error: could not save published note");
|
|
||||||
return Ok(Some(relay_msg));
|
|
||||||
}
|
|
||||||
console_log!("saved nwc event: {}", event.id);
|
|
||||||
|
|
||||||
let relay_msg = RelayMessage::new_ok(event.id, true, "");
|
|
||||||
Ok(Some(relay_msg))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// When a NWC request has been fulfilled, delete the request from KV
|
|
||||||
pub async fn delete_nwc_request(event: Event, ctx: &RouteContext<()>) -> Result<()> {
|
|
||||||
let kv_store = match event.kind {
|
|
||||||
Kind::WalletConnectResponse => ctx.kv("NWC_REQUESTS")?,
|
|
||||||
_ => return Ok(()), // skip other event types
|
|
||||||
};
|
|
||||||
|
|
||||||
let p_tag = event.tags.iter().find(|t| t.kind() == TagKind::P).cloned();
|
|
||||||
let e_tag = event.tags.iter().find(|t| t.kind() == TagKind::E).cloned();
|
|
||||||
|
|
||||||
if let Some(Tag::PubKey(pubkey, ..)) = p_tag {
|
|
||||||
if let Some(Tag::Event(event_id, ..)) = e_tag {
|
|
||||||
let key = &pubkey.to_string();
|
|
||||||
match kv_store.get(key).json::<Vec<Event>>().await {
|
|
||||||
Ok(Some(current)) => {
|
|
||||||
let new_events: Vec<Event> =
|
|
||||||
current.into_iter().filter(|e| e.id != event_id).collect();
|
|
||||||
|
|
||||||
// save new vector of events
|
|
||||||
kv_store.put(key, new_events)?.execute().await?;
|
|
||||||
console_log!("deleted nwc request event: {}", event_id);
|
|
||||||
}
|
|
||||||
Ok(None) => return Ok(()),
|
|
||||||
Err(e) => {
|
|
||||||
console_log!("error getting nwc events from KV: {e}");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn delete_nwc_response(event: &Event, ctx: &RouteContext<()>) -> Result<()> {
|
|
||||||
let kv_store = match event.kind {
|
|
||||||
Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?,
|
|
||||||
_ => return Ok(()), // skip other event types
|
|
||||||
};
|
|
||||||
|
|
||||||
let key = &event.pubkey.to_string();
|
|
||||||
match kv_store.get(key).json::<Vec<Event>>().await {
|
|
||||||
Ok(Some(current)) => {
|
|
||||||
let new_events: Vec<Event> = current.into_iter().filter(|e| e.id != event.id).collect();
|
|
||||||
|
|
||||||
// save new vector of events
|
|
||||||
kv_store.put(key, new_events)?.execute().await?;
|
|
||||||
console_log!("deleted nwc response event: {}", event.id);
|
|
||||||
}
|
|
||||||
Ok(None) => return Ok(()),
|
|
||||||
Err(e) => {
|
|
||||||
console_log!("error getting nwc events from KV: {e}");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
// Helper function to extend a vector without duplicates
|
// Helper function to extend a vector without duplicates
|
||||||
fn extend_without_duplicates<T: PartialEq + Clone>(master: &mut Vec<T>, new: &Vec<T>) {
|
fn extend_without_duplicates<T: PartialEq + Clone>(master: &mut Vec<T>, new: &Vec<T>) {
|
||||||
for item in new {
|
for item in new {
|
||||||
|
|||||||
@@ -29,6 +29,16 @@ kv_namespaces = [
|
|||||||
{ binding = "NWC_RESPONSES", id = "5b434d5eced84abaad1c9a44448ac71c", preview_id = "af27b55b58754562b4250dcd3682547b" },
|
{ binding = "NWC_RESPONSES", id = "5b434d5eced84abaad1c9a44448ac71c", preview_id = "af27b55b58754562b4250dcd3682547b" },
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[d1_databases]]
|
||||||
|
binding = "DB"
|
||||||
|
database_name = "blastr-db"
|
||||||
|
database_id = "82736a30-841d-4c24-87d8-788763dacb01"
|
||||||
|
|
||||||
|
[[env.staging.d1_databases]]
|
||||||
|
binding = "DB"
|
||||||
|
database_name = "blastr-db-staging"
|
||||||
|
database_id = "ba4c227b-edf2-46a4-99fa-4b7bfa036800"
|
||||||
|
|
||||||
[env.staging.vars]
|
[env.staging.vars]
|
||||||
WORKERS_RS_VERSION = "0.0.18"
|
WORKERS_RS_VERSION = "0.0.18"
|
||||||
ENVIRONMENT = "staging"
|
ENVIRONMENT = "staging"
|
||||||
|
|||||||
Reference in New Issue
Block a user