Merge pull request #28 from MutinyWallet/nwc-sql

Nwc sql
This commit is contained in:
Tony Giorgio
2023-08-11 16:16:00 -05:00
committed by GitHub
6 changed files with 354 additions and 151 deletions

View File

@@ -2,6 +2,7 @@
name = "blastr"
version = "0.0.0"
edition = "2018"
resolver = "2"
[lib]
crate-type = ["cdylib", "rlib"]
@@ -11,11 +12,12 @@ default = ["console_error_panic_hook"]
[dependencies]
cfg-if = "0.1.2"
worker = { version = "0.0.13", features = ["queue"] }
worker = { version = "0.0.18", features = ["queue", "d1"] }
futures = "0.3.26"
futures-util = { version = "0.3", default-features = false }
nostr = { version = "0.22.0", default-features = false, features = ["nip11"] }
serde = { version = "^1.0", features = ["derive"] }
serde_json = "1.0.67"
# The `console_error_panic_hook` crate provides better debugging of panics by
# logging them with `console.error`. This is great for development, but requires

View File

@@ -0,0 +1,15 @@
-- Migration number: 0000 2023-08-10T16:43:44.275Z
DROP TABLE IF EXISTS event;
CREATE TABLE IF NOT EXISTS event (
id BLOB PRIMARY KEY,
created_at INTEGER NOT NULL,
pubkey BLOB NOT NULL,
kind INTEGER NOT NULL,
content TEXT NOT NULL,
sig BLOB NOT NULL,
deleted INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS pubkey_index ON event(pubkey);
CREATE INDEX IF NOT EXISTS kind_index ON event(kind);

15
migrations/0001_tag.sql Normal file
View File

@@ -0,0 +1,15 @@
-- Migration number: 0001 2023-08-11T20:22:11.351Z
DROP TABLE IF EXISTS tag;
CREATE TABLE IF NOT EXISTS tag (
id INTEGER PRIMARY KEY,
event_id BLOB NOT NULL,
name TEXT NOT NULL,
value TEXT,
FOREIGN KEY(event_id) REFERENCES event(id) ON DELETE CASCADE
UNIQUE(event_id, name, value)
);
CREATE INDEX IF NOT EXISTS tag_event_index ON tag(event_id);
CREATE INDEX IF NOT EXISTS tag_name_index ON tag(name);
CREATE INDEX IF NOT EXISTS tag_value_index ON tag(value);

262
src/db.rs Normal file
View File

@@ -0,0 +1,262 @@
use std::collections::HashMap;
use ::nostr::{Event, Kind, RelayMessage};
use nostr::{EventId, Tag, TagKind, Timestamp};
use serde::Deserialize;
use serde_json::Value;
use wasm_bindgen::prelude::JsValue;
use worker::D1Database;
use worker::*;
#[derive(Debug, Deserialize)]
struct EventRow {
id: EventId,
pubkey: nostr::secp256k1::XOnlyPublicKey,
created_at: Timestamp,
kind: Kind,
tags: Option<Vec<Tag>>,
content: String,
sig: nostr::secp256k1::schnorr::Signature,
}
#[derive(Deserialize)]
struct TagRow {
event_id: EventId,
name: String,
value: String,
}
pub async fn get_nwc_events(keys: &[String], kind: Kind, db: &D1Database) -> Result<Vec<Event>> {
// Determine the event kind
match kind {
Kind::WalletConnectResponse => (),
Kind::WalletConnectRequest => (),
_ => return Ok(vec![]), // skip other event types
};
console_log!("querying for ({keys:?}) and {}", kind.as_u32());
// Query for the events first, without the tags
let placeholders: String = keys.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let query_str = format!(
r#"
SELECT * FROM event
WHERE pubkey IN ({}) AND kind = ? AND deleted = 0
"#,
placeholders
);
let mut stmt = db.prepare(&query_str);
let mut bindings = Vec::with_capacity(keys.len() + 1); // +1 for the kind afterwards
for key in keys.iter() {
bindings.push(JsValue::from_str(key));
}
bindings.push(JsValue::from_f64(kind.as_u32() as f64));
stmt = stmt.bind(&bindings)?;
let result = stmt.all().await.map_err(|e| {
console_log!("Failed to fetch nwc events: {}", e);
format!("Failed to fetch nwc events: {}", e)
})?;
let mut events: Vec<Event> = result
.results::<Value>()?
.iter()
.map(|row| {
let e: EventRow = serde_json::from_value(row.clone()).map_err(|e| {
console_log!("failed to parse event: {}", e);
worker::Error::from(format!(
"Failed to deserialize event from row ({}): {}",
row, e
))
})?;
Ok(Event {
id: e.id,
pubkey: e.pubkey,
created_at: e.created_at,
kind: e.kind,
tags: e.tags.unwrap_or_default(),
content: e.content,
sig: e.sig,
})
})
.collect::<Result<Vec<Event>>>()?;
// Now get all the tags for all the events found
let event_ids: Vec<EventId> = events.iter().map(|e| e.id).collect();
let placeholders: String = event_ids.iter().map(|_| "?").collect::<Vec<_>>().join(", ");
let tag_query_str = format!(
r#"
SELECT event_id, name, value FROM tag
WHERE event_id IN ({})
ORDER BY id ASC
"#,
placeholders
);
let mut tag_stmt = db.prepare(&tag_query_str);
let bindings: Vec<JsValue> = event_ids
.iter()
.map(|id| JsValue::from_str(&id.to_string()))
.collect();
tag_stmt = tag_stmt.bind(&bindings)?;
let tag_result = tag_stmt
.all()
.await
.map_err(|e| format!("Failed to fetch tags: {}", e))?;
let tags: Vec<TagRow> = tag_result.results::<TagRow>()?;
let mut tags_map: HashMap<EventId, Vec<Tag>> = HashMap::new();
for tag_row in tags {
if let Ok(tag) = Tag::parse(vec![tag_row.name, tag_row.value]) {
tags_map
.entry(tag_row.event_id)
.or_insert_with(Vec::new)
.push(tag);
}
}
for event in &mut events {
if let Some(tags) = tags_map.remove(&event.id) {
event.tags.extend(tags);
}
}
// Tag ordering could screw up signature, though it shouldn't matter
// for NWC messages because those should only have one tag.
// Also we insert tags in order and do an ORDER BY id so it should be fine.
events.retain(|event| match event.verify() {
Ok(_) => true,
Err(e) => {
console_log!("Verification failed for event with id {}: {}", event.id, e);
false
}
});
Ok(events)
}
pub async fn handle_nwc_event(event: Event, db: &D1Database) -> Result<Option<RelayMessage>> {
// Create the main event insertion query.
let event_insert_query = worker::query!(
db,
r#"
INSERT OR IGNORE INTO event (id, created_at, pubkey, kind, content, sig)
VALUES (?, ?, ?, ?, ?, ?)
"#,
&event.id,
&event.created_at,
&event.pubkey,
&event.kind,
&event.content,
&event.sig
)?;
// Create a vector of tag insertion queries.
let mut tag_insert_queries: Vec<_> = event
.tags
.iter()
.map(|tag| {
worker::query!(
db,
r#"
INSERT OR IGNORE INTO tag (event_id, name, value)
VALUES (?, ?, ?)
"#,
&event.id,
&tag.kind().to_string(),
&tag.as_vec().get(1)
)
.expect("should compile query")
})
.collect();
// Combine the main event and tag insertion queries.
let mut batch_queries = vec![event_insert_query];
batch_queries.append(&mut tag_insert_queries);
// Run the batch queries.
let mut results = db.batch(batch_queries).await?.into_iter();
// Check the result of the main event insertion.
if let Some(error_msg) = results.next().and_then(|res| res.error()) {
console_log!("error saving nwc event to event table: {}", error_msg);
let relay_msg = RelayMessage::new_ok(event.id, false, &error_msg);
return Ok(Some(relay_msg));
}
// Check the results for the tag insertions.
for tag_insert_result in results {
if let Some(error_msg) = tag_insert_result.error() {
console_log!("error saving tag to tag table: {}", error_msg);
let relay_msg = RelayMessage::new_ok(event.id, false, &error_msg);
return Ok(Some(relay_msg));
}
}
let relay_msg = RelayMessage::new_ok(event.id, true, "");
Ok(Some(relay_msg))
}
/// When a NWC request has been fulfilled, soft delete the request from the database
pub async fn delete_nwc_request(event: Event, db: &D1Database) -> Result<()> {
// Filter only relevant events
match event.kind {
Kind::WalletConnectResponse => (),
_ => return Ok(()), // skip other event types
};
let p_tag = event.tags.iter().find(|t| t.kind() == TagKind::P).cloned();
let e_tag = event.tags.iter().find(|t| t.kind() == TagKind::E).cloned();
if let Some(Tag::PubKey(pubkey, ..)) = p_tag {
if let Some(Tag::Event(event_id, ..)) = e_tag {
// Soft delete the event based on pubkey and event_id
match worker::query!(
db,
"UPDATE event SET deleted = 1 WHERE pubkey = ? AND id = ?",
&pubkey.to_string(),
&event_id
)?
.run()
.await
{
Ok(_) => (),
Err(e) => {
console_log!("error soft deleting nwc event from database: {e}");
return Ok(());
}
}
}
}
Ok(())
}
/// When a NWC response has been fulfilled, soft delete the response from the database
pub async fn delete_nwc_response(event: &Event, db: &D1Database) -> Result<()> {
// Filter only relevant events
match event.kind {
Kind::WalletConnectResponse => (),
_ => return Ok(()), // skip other event types
};
// Soft delete the event based on pubkey and id
match worker::query!(
db,
"UPDATE event SET deleted = 1 WHERE pubkey = ? AND id = ?",
&event.pubkey.to_string(),
&event.id
)?
.run()
.await
{
Ok(_) => console_log!("soft deleted nwc response event: {}", event.id),
Err(e) => {
console_log!("error soft deleting nwc event from database: {e}");
return Ok(());
}
}
Ok(())
}

View File

@@ -1,15 +1,13 @@
use crate::nostr::get_nip11_response;
use crate::nostr::NOSTR_QUEUE_10;
use crate::nostr::NOSTR_QUEUE_7;
use crate::nostr::NOSTR_QUEUE_8;
use crate::nostr::NOSTR_QUEUE_9;
pub(crate) use crate::nostr::{
try_queue_event, NOSTR_QUEUE, NOSTR_QUEUE_2, NOSTR_QUEUE_3, NOSTR_QUEUE_4, NOSTR_QUEUE_5,
NOSTR_QUEUE_6,
};
use ::nostr::{
ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId, Tag, TagKind,
};
use crate::{db::delete_nwc_request, nostr::NOSTR_QUEUE_10};
use crate::{db::get_nwc_events, nostr::NOSTR_QUEUE_7};
use crate::{db::handle_nwc_event, nostr::get_nip11_response};
use ::nostr::{ClientMessage, Event, EventId, Filter, Kind, RelayMessage, SubscriptionId};
use futures::StreamExt;
use futures_util::lock::Mutex;
use serde::{Deserialize, Serialize};
@@ -19,6 +17,7 @@ use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use worker::*;
mod db;
mod error;
mod nostr;
mod utils;
@@ -66,6 +65,19 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
ClientMessage::Event(event) => {
console_log!("got an event from client: {}", event.id);
match event.verify() {
Ok(()) => (),
Err(e) => {
console_log!("could not verify event {}: {}", event.id, e);
let relay_msg = RelayMessage::new_ok(
event.id,
false,
"invalid event",
);
return relay_response(relay_msg);
}
}
// check if disallowed event kind
if DISALLOWED_EVENT_KINDS.contains(&event.kind.as_u32()) {
console_log!(
@@ -100,12 +112,14 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
return relay_response(relay_msg);
};
let db = ctx.d1("DB")?;
// if the event is a nostr wallet connect event, we
// should save it and not send to other relays.
if let Some(relay_msg) =
handle_nwc_event(*event.clone(), &ctx).await?
handle_nwc_event(*event.clone(), &db).await?
{
if let Err(e) = delete_nwc_request(*event, &ctx).await {
if let Err(e) = delete_nwc_request(*event, &db).await {
console_log!("failed to delete nwc request: {e}");
}
return relay_response(relay_msg);
@@ -204,6 +218,21 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
match client_msg {
ClientMessage::Event(event) => {
console_log!("got an event from client: {}", event.id);
match event.verify() {
Ok(()) => (),
Err(e) => {
console_log!("could not verify event {}: {}", event.id, e);
let relay_msg = RelayMessage::new_ok(
event.id,
false,
"disallowed event kind",
);
server
.send_with_str(&relay_msg.as_json())
.expect("failed to send response");
continue;
}
}
// check if disallowed event kind
if DISALLOWED_EVENT_KINDS.contains(&event.kind.as_u32()) {
@@ -246,10 +275,12 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
continue;
};
let db = ctx.d1("DB").expect("should have DB");
// if the event is a nostr wallet connect event, we
// should save it and not send to other relays.
if let Some(response) =
handle_nwc_event(*event.clone(), &ctx)
handle_nwc_event(*event.clone(), &db)
.await
.expect("failed to handle nwc event")
{
@@ -257,7 +288,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
.send_with_str(&response.as_json())
.expect("failed to send response");
if let Err(e) = delete_nwc_request(*event, &ctx).await {
if let Err(e) = delete_nwc_request(*event, &db).await {
console_log!("failed to delete nwc request: {e}");
}
@@ -354,7 +385,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
// set running thread to true
running_thread.swap(true, Ordering::Relaxed);
let ctx_clone = ctx.clone();
let db = ctx.d1("DB").expect("should have DB");
let sub_id = subscription_id.clone();
let server_clone = server.clone();
let master_clone = requested_filters.clone();
@@ -369,7 +400,7 @@ pub async fn main(req: Request, env: Env, _ctx: Context) -> Result<Response> {
sub_id.clone(),
master.clone(),
&server_clone,
&ctx_clone,
&db,
).await
{
Ok(new_event_ids) => {
@@ -472,7 +503,7 @@ pub async fn handle_filter(
subscription_id: SubscriptionId,
filter: Filter,
server: &WebSocket,
ctx: &RouteContext<()>,
db: &D1Database,
) -> Result<Vec<EventId>> {
let mut events = vec![];
// get all authors and pubkeys
@@ -491,7 +522,7 @@ pub async fn handle_filter(
.unwrap_or_default()
.contains(&Kind::WalletConnectRequest)
{
let mut found_events = get_nwc_events(&keys, Kind::WalletConnectRequest, ctx)
let mut found_events = get_nwc_events(&keys, Kind::WalletConnectRequest, db)
.await
.unwrap_or_default();
@@ -506,7 +537,7 @@ pub async fn handle_filter(
.unwrap_or_default()
.contains(&Kind::WalletConnectResponse)
{
let mut found_events = get_nwc_events(&keys, Kind::WalletConnectResponse, ctx)
let mut found_events = get_nwc_events(&keys, Kind::WalletConnectResponse, db)
.await
.unwrap_or_default();
@@ -539,138 +570,6 @@ pub async fn handle_filter(
Ok(sent_event_ids)
}
pub async fn get_nwc_events(
keys: &[String],
kind: Kind,
ctx: &RouteContext<()>,
) -> Result<Vec<Event>> {
let kv_store = match kind {
Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?,
Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?,
_ => return Ok(vec![]), // skip other event types, todo, we may want to store info events as well
};
let mut events = vec![];
for key in keys {
let nwc_events = kv_store
.get(key)
.json::<Vec<Event>>()
.await?
.unwrap_or_default();
for nwc_event in nwc_events {
// delete responses since we don't care after sending
if kind == Kind::WalletConnectResponse {
if let Err(e) = delete_nwc_response(&nwc_event, ctx).await {
console_log!("failed to delete nwc response: {e}");
}
}
events.push(nwc_event);
}
}
Ok(events)
}
pub async fn handle_nwc_event(
event: Event,
ctx: &RouteContext<()>,
) -> Result<Option<RelayMessage>> {
let kv_store = match event.kind {
Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?,
Kind::WalletConnectRequest => ctx.kv("NWC_REQUESTS")?,
_ => return Ok(None), // skip other event types, todo, we may want to store info events as well
};
console_log!("got a wallet connect event: {}", event.id);
let key = &event.pubkey.to_string();
let new_nwc_responses = match kv_store.get(key).json::<Vec<Event>>().await {
Ok(Some(mut current)) => {
current.push(event.clone());
current
}
Ok(None) => vec![event.clone()],
Err(e) => {
console_log!("error getting nwc events from KV: {e}");
let relay_msg =
RelayMessage::new_ok(event.id, false, "error: could not save published note");
return Ok(Some(relay_msg));
}
};
// save new vector of events
if let Err(e) = kv_store.put(key, new_nwc_responses)?.execute().await {
console_log!("error saving nwc: {e}");
let relay_msg =
RelayMessage::new_ok(event.id, false, "error: could not save published note");
return Ok(Some(relay_msg));
}
console_log!("saved nwc event: {}", event.id);
let relay_msg = RelayMessage::new_ok(event.id, true, "");
Ok(Some(relay_msg))
}
/// When a NWC request has been fulfilled, delete the request from KV
pub async fn delete_nwc_request(event: Event, ctx: &RouteContext<()>) -> Result<()> {
let kv_store = match event.kind {
Kind::WalletConnectResponse => ctx.kv("NWC_REQUESTS")?,
_ => return Ok(()), // skip other event types
};
let p_tag = event.tags.iter().find(|t| t.kind() == TagKind::P).cloned();
let e_tag = event.tags.iter().find(|t| t.kind() == TagKind::E).cloned();
if let Some(Tag::PubKey(pubkey, ..)) = p_tag {
if let Some(Tag::Event(event_id, ..)) = e_tag {
let key = &pubkey.to_string();
match kv_store.get(key).json::<Vec<Event>>().await {
Ok(Some(current)) => {
let new_events: Vec<Event> =
current.into_iter().filter(|e| e.id != event_id).collect();
// save new vector of events
kv_store.put(key, new_events)?.execute().await?;
console_log!("deleted nwc request event: {}", event_id);
}
Ok(None) => return Ok(()),
Err(e) => {
console_log!("error getting nwc events from KV: {e}");
return Ok(());
}
};
};
};
Ok(())
}
pub async fn delete_nwc_response(event: &Event, ctx: &RouteContext<()>) -> Result<()> {
let kv_store = match event.kind {
Kind::WalletConnectResponse => ctx.kv("NWC_RESPONSES")?,
_ => return Ok(()), // skip other event types
};
let key = &event.pubkey.to_string();
match kv_store.get(key).json::<Vec<Event>>().await {
Ok(Some(current)) => {
let new_events: Vec<Event> = current.into_iter().filter(|e| e.id != event.id).collect();
// save new vector of events
kv_store.put(key, new_events)?.execute().await?;
console_log!("deleted nwc response event: {}", event.id);
}
Ok(None) => return Ok(()),
Err(e) => {
console_log!("error getting nwc events from KV: {e}");
return Ok(());
}
};
Ok(())
}
// Helper function to extend a vector without duplicates
fn extend_without_duplicates<T: PartialEq + Clone>(master: &mut Vec<T>, new: &Vec<T>) {
for item in new {

View File

@@ -29,12 +29,22 @@ kv_namespaces = [
{ binding = "NWC_RESPONSES", id = "5b434d5eced84abaad1c9a44448ac71c", preview_id = "af27b55b58754562b4250dcd3682547b" },
]
[[d1_databases]]
binding = "DB"
database_name = "blastr-db"
database_id = "82736a30-841d-4c24-87d8-788763dacb01"
[[env.staging.d1_databases]]
binding = "DB"
database_name = "blastr-db-staging"
database_id = "ba4c227b-edf2-46a4-99fa-4b7bfa036800"
[env.staging.vars]
WORKERS_RS_VERSION = "0.0.13"
WORKERS_RS_VERSION = "0.0.18"
ENVIRONMENT = "staging"
[vars]
WORKERS_RS_VERSION = "0.0.13"
WORKERS_RS_VERSION = "0.0.18"
ENVIRONMENT = "production"
# Replace with all the queues you created, if you named them different.
@@ -172,4 +182,4 @@ ENVIRONMENT = "production"
binding = "nostr-events-pub-10-b"
[build]
command = "cargo install -q worker-build --version 0.0.9 && worker-build --release"
command = "cargo install -q worker-build --version 0.0.10 && worker-build --release"