diff --git a/crates/cdk-common/src/database/mint/mod.rs b/crates/cdk-common/src/database/mint/mod.rs index 41326f35..974f5e00 100644 --- a/crates/cdk-common/src/database/mint/mod.rs +++ b/crates/cdk-common/src/database/mint/mod.rs @@ -88,7 +88,7 @@ pub trait QuotesDatabase { &self, quote_id: &Uuid, state: MeltQuoteState, - ) -> Result; + ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err>; /// Get all [`mint::MeltQuote`]s async fn get_melt_quotes(&self) -> Result, Self::Err>; /// Remove [`mint::MeltQuote`] @@ -114,6 +114,9 @@ pub trait ProofsDatabase { type Err: Into + From; /// Add [`Proofs`] + /// + /// Adds proofs to the database. The database should error if the proof already exits, with a + /// `AttemptUpdateSpentProof` if the proof is already spent or a `Duplicate` error otherwise. async fn add_proofs(&self, proof: Proofs, quote_id: Option) -> Result<(), Self::Err>; /// Remove [`Proofs`] async fn remove_proofs( diff --git a/crates/cdk-common/src/database/mod.rs b/crates/cdk-common/src/database/mod.rs index 745c8ef3..4574e424 100644 --- a/crates/cdk-common/src/database/mod.rs +++ b/crates/cdk-common/src/database/mod.rs @@ -22,6 +22,11 @@ pub enum Error { /// Database Error #[error(transparent)] Database(Box), + + /// Duplicate entry + #[error("Duplicate entry")] + Duplicate, + /// DHKE error #[error(transparent)] DHKE(#[from] crate::dhke::Error), diff --git a/crates/cdk-redb/src/mint/mod.rs b/crates/cdk-redb/src/mint/mod.rs index 0c7b266e..7f9d2608 100644 --- a/crates/cdk-redb/src/mint/mod.rs +++ b/crates/cdk-redb/src/mint/mod.rs @@ -470,12 +470,13 @@ impl MintQuotesDatabase for MintRedbDatabase { &self, quote_id: &Uuid, state: MeltQuoteState, - ) -> Result { + ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> { let write_txn = self.db.begin_write().map_err(Error::from)?; let current_state; + let mut melt_quote: mint::MeltQuote; + { - let mut melt_quote: mint::MeltQuote; let mut table = write_txn .open_table(MELT_QUOTES_TABLE) .map_err(Error::from)?; @@ -506,7 +507,7 @@ impl MintQuotesDatabase for MintRedbDatabase { } write_txn.commit().map_err(Error::from)?; - Ok(current_state) + Ok((current_state, melt_quote)) } async fn get_melt_quotes(&self) -> Result, Self::Err> { diff --git a/crates/cdk-sqlite/src/common.rs b/crates/cdk-sqlite/src/common.rs index 1071fbee..e04a00de 100644 --- a/crates/cdk-sqlite/src/common.rs +++ b/crates/cdk-sqlite/src/common.rs @@ -47,6 +47,8 @@ impl ResourceManager for SqliteConnectionManager { "#, )?; + conn.busy_timeout(Duration::from_secs(10))?; + Ok(conn) } } @@ -81,7 +83,7 @@ pub fn create_sqlite_pool( ) }; - Pool::new(config, max_size, Duration::from_secs(5)) + Pool::new(config, max_size, Duration::from_secs(10)) } /// Migrates the migration generated by `build.rs` diff --git a/crates/cdk-sqlite/src/mint/async_rusqlite.rs b/crates/cdk-sqlite/src/mint/async_rusqlite.rs index f05d80c9..3cd3593a 100644 --- a/crates/cdk-sqlite/src/mint/async_rusqlite.rs +++ b/crates/cdk-sqlite/src/mint/async_rusqlite.rs @@ -4,7 +4,7 @@ use std::sync::{mpsc as std_mpsc, Arc, Mutex}; use std::thread::spawn; use std::time::Instant; -use rusqlite::Connection; +use rusqlite::{ffi, Connection, ErrorCode, TransactionBehavior}; use tokio::sync::{mpsc, oneshot}; use crate::common::SqliteConnectionManager; @@ -202,6 +202,26 @@ fn rusqlite_spawn_worker_threads( Ok(ok) => reply_to.send(ok), Err(err) => { tracing::error!("Failed query with error {:?}", err); + let err = if let Error::Sqlite(rusqlite::Error::SqliteFailure( + ffi::Error { + code, + extended_code, + }, + _, + )) = &err + { + if *code == ErrorCode::ConstraintViolation + && (*extended_code == ffi::SQLITE_CONSTRAINT_PRIMARYKEY + || *extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE) + { + Error::Duplicate + } else { + err + } + } else { + err + }; + reply_to.send(DbResponse::Error(err)) } }; @@ -262,7 +282,7 @@ fn rusqlite_worker_manager( } }; - let tx = match conn.transaction() { + let tx = match conn.transaction_with_behavior(TransactionBehavior::Immediate) { Ok(tx) => tx, Err(err) => { tracing::error!("Failed to begin a transaction: {:?}", err); @@ -300,7 +320,10 @@ fn rusqlite_worker_manager( tracing::trace!("Tx {}: Commit", tx_id); let _ = reply_to.send(match tx.commit() { Ok(()) => DbResponse::Ok, - Err(err) => DbResponse::Error(err.into()), + Err(err) => { + tracing::error!("Failed commit {:?}", err); + DbResponse::Error(err.into()) + } }); break; } @@ -308,7 +331,10 @@ fn rusqlite_worker_manager( tracing::trace!("Tx {}: Rollback", tx_id); let _ = reply_to.send(match tx.rollback() { Ok(()) => DbResponse::Ok, - Err(err) => DbResponse::Error(err.into()), + Err(err) => { + tracing::error!("Failed rollback {:?}", err); + DbResponse::Error(err.into()) + } }); break; } @@ -319,7 +345,35 @@ fn rusqlite_worker_manager( tracing::trace!("Tx {}: SQL {}", tx_id, sql.sql); let _ = match process_query(&tx, sql) { Ok(ok) => reply_to.send(ok), - Err(err) => reply_to.send(DbResponse::Error(err)), + Err(err) => { + tracing::error!( + "Tx {}: Failed query with error {:?}", + tx_id, + err + ); + let err = if let Error::Sqlite( + rusqlite::Error::SqliteFailure( + ffi::Error { + code, + extended_code, + }, + _, + ), + ) = &err + { + if *code == ErrorCode::ConstraintViolation + && (*extended_code == ffi::SQLITE_CONSTRAINT_PRIMARYKEY + || *extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE) + { + Error::Duplicate + } else { + err + } + } else { + err + }; + reply_to.send(DbResponse::Error(err)) + } }; } } diff --git a/crates/cdk-sqlite/src/mint/error.rs b/crates/cdk-sqlite/src/mint/error.rs index ed78fee7..eea510a5 100644 --- a/crates/cdk-sqlite/src/mint/error.rs +++ b/crates/cdk-sqlite/src/mint/error.rs @@ -9,6 +9,10 @@ pub enum Error { #[error(transparent)] Sqlite(#[from] rusqlite::Error), + /// Duplicate entry + #[error("Record already exists")] + Duplicate, + /// Pool error #[error(transparent)] Pool(#[from] crate::pool::Error), @@ -98,6 +102,9 @@ pub enum Error { impl From for cdk_common::database::Error { fn from(e: Error) -> Self { - Self::Database(Box::new(e)) + match e { + Error::Duplicate => Self::Duplicate, + e => Self::Database(Box::new(e)), + } } } diff --git a/crates/cdk-sqlite/src/mint/mod.rs b/crates/cdk-sqlite/src/mint/mod.rs index a3fd7587..c5681cd8 100644 --- a/crates/cdk-sqlite/src/mint/mod.rs +++ b/crates/cdk-sqlite/src/mint/mod.rs @@ -674,10 +674,10 @@ ON CONFLICT(request_lookup_id) DO UPDATE SET &self, quote_id: &Uuid, state: MeltQuoteState, - ) -> Result { + ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> { let transaction = self.pool.begin().await?; - let quote = query( + let mut quote = query( r#" SELECT id, @@ -732,7 +732,10 @@ ON CONFLICT(request_lookup_id) DO UPDATE SET } }; - Ok(quote.state) + let old_state = quote.state; + quote.state = state; + + Ok((old_state, quote)) } async fn remove_melt_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err> { @@ -813,10 +816,30 @@ impl MintProofsDatabase for MintSqliteDatabase { let current_time = unix_time(); + // Check any previous proof, this query should return None in order to proceed storing + // Any result here would error + match query(r#"SELECT state FROM proof WHERE y IN (:ys) LIMIT 1"#) + .bind_vec( + ":ys", + proofs + .iter() + .map(|y| y.y().map(|y| y.to_bytes().to_vec())) + .collect::>()?, + ) + .pluck(&transaction) + .await? + .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str))) + .transpose()? + { + Some(State::Spent) => Err(database::Error::AttemptUpdateSpentProof), + Some(_) => Err(database::Error::Duplicate), + None => Ok(()), // no previous record + }?; + for proof in proofs { query( r#" - INSERT OR IGNORE INTO proof + INSERT INTO proof (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time) VALUES (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time) @@ -852,10 +875,11 @@ impl MintProofsDatabase for MintSqliteDatabase { let total_deleted = query( r#" - DELETE FROM proof WHERE y IN (:ys) AND state != 'SPENT' + DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state) "#, ) .bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect()) + .bind_vec(":exclude_state", vec![State::Spent.to_string()]) .execute(&transaction) .await?; @@ -974,7 +998,11 @@ impl MintProofsDatabase for MintSqliteDatabase { if current_states.len() != ys.len() { transaction.rollback().await?; - tracing::warn!("Attempted to update state of non-existent proof"); + tracing::warn!( + "Attempted to update state of non-existent proof {} {}", + current_states.len(), + ys.len() + ); return Err(database::Error::ProofNotFound); } diff --git a/crates/cdk/src/mint/melt.rs b/crates/cdk/src/mint/melt.rs index 7293e9fb..42450471 100644 --- a/crates/cdk/src/mint/melt.rs +++ b/crates/cdk/src/mint/melt.rs @@ -297,7 +297,7 @@ impl Mint { &self, melt_request: &MeltRequest, ) -> Result { - let state = self + let (state, quote) = self .localstore .update_melt_quote_state(melt_request.quote(), MeltQuoteState::Pending) .await?; @@ -309,12 +309,6 @@ impl Mint { MeltQuoteState::Unknown => Err(Error::UnknownPaymentState), }?; - let quote = self - .localstore - .get_melt_quote(melt_request.quote()) - .await? - .ok_or(Error::UnknownQuote)?; - self.pubsub_manager .melt_quote_status("e, None, None, MeltQuoteState::Pending); @@ -347,11 +341,23 @@ impl Mint { )); } - self.localstore + if let Some(err) = self + .localstore .add_proofs(melt_request.inputs().clone(), None) - .await?; + .await + .err() + { + return match err { + cdk_common::database::Error::Duplicate => Err(Error::TokenPending), + cdk_common::database::Error::AttemptUpdateSpentProof => { + Err(Error::TokenAlreadySpent) + } + err => Err(Error::Database(err)), + }; + } self.check_ys_spendable(&input_ys, State::Pending).await?; + for proof in melt_request.inputs() { self.pubsub_manager .proof_state((proof.y()?, State::Pending)); diff --git a/crates/cdk/src/mint/swap.rs b/crates/cdk/src/mint/swap.rs index 2fef698f..3b08f555 100644 --- a/crates/cdk/src/mint/swap.rs +++ b/crates/cdk/src/mint/swap.rs @@ -24,9 +24,20 @@ impl Mint { // After swap request is fully validated, add the new proofs to DB let input_ys = swap_request.inputs().ys()?; - self.localstore + if let Some(err) = self + .localstore .add_proofs(swap_request.inputs().clone(), None) - .await?; + .await + .err() + { + return match err { + cdk_common::database::Error::Duplicate => Err(Error::TokenPending), + cdk_common::database::Error::AttemptUpdateSpentProof => { + Err(Error::TokenAlreadySpent) + } + err => Err(Error::Database(err)), + }; + } self.check_ys_spendable(&input_ys, State::Pending).await?; let mut promises = Vec::with_capacity(swap_request.outputs().len());