From 238b09d56a5bb3163b3d92e3eba813759c20b670 Mon Sep 17 00:00:00 2001 From: C Date: Sat, 28 Jun 2025 08:07:47 -0300 Subject: [PATCH] Split the database trait into read and transactions. (#826) * Split the database trait into read and transactions. The transaction traits will encapsulate all database changes and also expect READ-and-lock operations to read and lock records from the database for exclusive access, thereby avoiding race conditions. The Transaction trait expects a `rollback` operation on Drop unless the transaction has been committed. * fix: melt quote duplicate error This change stops a second melt quote from being created if there is an existing valid melt quote for an invoice already. If the first melt quote has expired then we allow for a new melt quote to be created. --------- Co-authored-by: thesimplekid --- .../cdk-cli/src/sub_commands/check_pending.rs | 2 +- .../cdk-common/src/database/mint/auth/mod.rs | 74 +- crates/cdk-common/src/database/mint/mod.rs | 199 +++- crates/cdk-common/src/database/mint/test.rs | 24 +- crates/cdk-common/src/database/mod.rs | 11 +- .../src/init_auth_mint.rs | 12 +- .../src/init_pure_tests.rs | 9 +- .../tests/happy_path_mint_wallet.rs | 4 +- crates/cdk-integration-tests/tests/mint.rs | 8 +- crates/cdk-mint-rpc/src/proto/server.rs | 12 +- crates/cdk-mintd/src/main.rs | 11 +- crates/cdk-signatory/src/common.rs | 13 +- crates/cdk-signatory/src/db_signatory.rs | 13 +- crates/cdk-sqlite/src/mint/auth/mod.rs | 308 +++--- crates/cdk-sqlite/src/mint/memory.rs | 24 +- crates/cdk-sqlite/src/mint/mod.rs | 994 ++++++++++-------- crates/cdk/src/lib.rs | 2 +- crates/cdk/src/mint/auth/mod.rs | 17 +- crates/cdk/src/mint/check_spendable.rs | 80 +- crates/cdk/src/mint/issue/issue_nut04.rs | 120 +-- crates/cdk/src/mint/ln.rs | 25 +- crates/cdk/src/mint/melt.rs | 262 ++--- crates/cdk/src/mint/mod.rs | 23 +- crates/cdk/src/mint/proof_writer.rs | 214 ++++ crates/cdk/src/mint/start_up_check.rs | 18 +- crates/cdk/src/mint/swap.rs | 74 +- crates/cdk/src/mint/verification.rs | 16 +- misc/itests.sh | 2 +- 28 files changed, 1483 insertions(+), 1088 deletions(-) create mode 100644 crates/cdk/src/mint/proof_writer.rs diff --git a/crates/cdk-cli/src/sub_commands/check_pending.rs b/crates/cdk-cli/src/sub_commands/check_pending.rs index 437bee94..ad56a546 100644 --- a/crates/cdk-cli/src/sub_commands/check_pending.rs +++ b/crates/cdk-cli/src/sub_commands/check_pending.rs @@ -26,7 +26,7 @@ pub async fn check_pending(multi_mint_wallet: &MultiMintWallet) -> Result<()> { // Try to reclaim any proofs that are no longer pending match wallet.reclaim_unspent(pending_proofs).await { Ok(()) => println!("Successfully reclaimed pending proofs"), - Err(e) => println!("Error reclaimed pending proofs: {}", e), + Err(e) => println!("Error reclaimed pending proofs: {e}"), } } Ok(()) diff --git a/crates/cdk-common/src/database/mint/auth/mod.rs b/crates/cdk-common/src/database/mint/auth/mod.rs index 43754948..a71257ee 100644 --- a/crates/cdk-common/src/database/mint/auth/mod.rs +++ b/crates/cdk-common/src/database/mint/auth/mod.rs @@ -5,61 +5,79 @@ use std::collections::HashMap; use async_trait::async_trait; use cashu::{AuthRequired, ProtectedEndpoint}; +use super::DbTransactionFinalizer; use crate::database::Error; use crate::mint::MintKeySetInfo; use crate::nuts::nut07::State; use crate::nuts::{AuthProof, BlindSignature, Id, PublicKey}; +/// Mint Database transaction +#[async_trait] +pub trait MintAuthTransaction: DbTransactionFinalizer { + /// Add Active Keyset + async fn set_active_keyset(&mut self, id: Id) -> Result<(), Error>; + + /// Add [`MintKeySetInfo`] + async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), Error>; + + /// Add spent [`AuthProof`] + async fn add_proof(&mut self, proof: AuthProof) -> Result<(), Error>; + + /// Update [`AuthProof`]s state + async fn update_proof_state( + &mut self, + y: &PublicKey, + proofs_state: State, + ) -> Result, Error>; + + /// Add [`BlindSignature`] + async fn add_blind_signatures( + &mut self, + blinded_messages: &[PublicKey], + blind_signatures: &[BlindSignature], + ) -> Result<(), Error>; + + /// Add protected endpoints + async fn add_protected_endpoints( + &mut self, + protected_endpoints: HashMap, + ) -> Result<(), Error>; + + /// Removed Protected endpoints + async fn remove_protected_endpoints( + &mut self, + protected_endpoints: Vec, + ) -> Result<(), Error>; +} + /// Mint Database trait #[async_trait] pub trait MintAuthDatabase { /// Mint Database Error type Err: Into + From; - /// Add Active Keyset - async fn set_active_keyset(&self, id: Id) -> Result<(), Self::Err>; + + /// Begins a transaction + async fn begin_transaction<'a>( + &'a self, + ) -> Result + Send + Sync + 'a>, Self::Err>; + /// Get Active Keyset async fn get_active_keyset_id(&self) -> Result, Self::Err>; - /// Add [`MintKeySetInfo`] - async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err>; /// Get [`MintKeySetInfo`] async fn get_keyset_info(&self, id: &Id) -> Result, Self::Err>; /// Get [`MintKeySetInfo`]s async fn get_keyset_infos(&self) -> Result, Self::Err>; - /// Add spent [`AuthProof`] - async fn add_proof(&self, proof: AuthProof) -> Result<(), Self::Err>; /// Get [`AuthProof`] state async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result>, Self::Err>; - /// Update [`AuthProof`]s state - async fn update_proof_state( - &self, - y: &PublicKey, - proofs_state: State, - ) -> Result, Self::Err>; - /// Add [`BlindSignature`] - async fn add_blind_signatures( - &self, - blinded_messages: &[PublicKey], - blind_signatures: &[BlindSignature], - ) -> Result<(), Self::Err>; /// Get [`BlindSignature`]s async fn get_blind_signatures( &self, blinded_messages: &[PublicKey], ) -> Result>, Self::Err>; - /// Add protected endpoints - async fn add_protected_endpoints( - &self, - protected_endpoints: HashMap, - ) -> Result<(), Self::Err>; - /// Removed Protected endpoints - async fn remove_protected_endpoints( - &self, - protected_endpoints: Vec, - ) -> Result<(), Self::Err>; /// Get auth for protected_endpoint async fn get_auth_for_endpoint( &self, diff --git a/crates/cdk-common/src/database/mint/mod.rs b/crates/cdk-common/src/database/mint/mod.rs index a72b9b1b..95eb3db0 100644 --- a/crates/cdk-common/src/database/mint/mod.rs +++ b/crates/cdk-common/src/database/mint/mod.rs @@ -21,7 +21,17 @@ mod auth; pub mod test; #[cfg(feature = "auth")] -pub use auth::MintAuthDatabase; +pub use auth::{MintAuthDatabase, MintAuthTransaction}; + +/// KeysDatabaseWriter +#[async_trait] +pub trait KeysDatabaseTransaction<'a, Error>: DbTransactionFinalizer { + /// Add Active Keyset + async fn set_active_keyset(&mut self, unit: CurrencyUnit, id: Id) -> Result<(), Error>; + + /// Add [`MintKeySetInfo`] + async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), Error>; +} /// Mint Keys Database trait #[async_trait] @@ -29,36 +39,84 @@ pub trait KeysDatabase { /// Mint Keys Database Error type Err: Into + From; - /// Add Active Keyset - async fn set_active_keyset(&self, unit: CurrencyUnit, id: Id) -> Result<(), Self::Err>; + /// Beings a transaction + async fn begin_transaction<'a>( + &'a self, + ) -> Result + Send + Sync + 'a>, Error>; + /// Get Active Keyset async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result, Self::Err>; + /// Get all Active Keyset async fn get_active_keysets(&self) -> Result, Self::Err>; - /// Add [`MintKeySetInfo`] - async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err>; + /// Get [`MintKeySetInfo`] async fn get_keyset_info(&self, id: &Id) -> Result, Self::Err>; + /// Get [`MintKeySetInfo`]s async fn get_keyset_infos(&self) -> Result, Self::Err>; } +/// Mint Quote Database writer trait +#[async_trait] +pub trait QuotesTransaction<'a> { + /// Mint Quotes Database Error + type Err: Into + From; + + /// Get [`MintMintQuote`] and lock it for update in this transaction + async fn get_mint_quote(&mut self, quote_id: &Uuid) + -> Result, Self::Err>; + /// Add [`MintMintQuote`] + async fn add_or_replace_mint_quote(&mut self, quote: MintMintQuote) -> Result<(), Self::Err>; + /// Update state of [`MintMintQuote`] + async fn update_mint_quote_state( + &mut self, + quote_id: &Uuid, + state: MintQuoteState, + ) -> Result; + /// Remove [`MintMintQuote`] + async fn remove_mint_quote(&mut self, quote_id: &Uuid) -> Result<(), Self::Err>; + /// Get [`mint::MeltQuote`] and lock it for update in this transaction + async fn get_melt_quote( + &mut self, + quote_id: &Uuid, + ) -> Result, Self::Err>; + /// Add [`mint::MeltQuote`] + async fn add_melt_quote(&mut self, quote: mint::MeltQuote) -> Result<(), Self::Err>; + + /// Updates the request lookup id for a melt quote + async fn update_melt_quote_request_lookup_id( + &mut self, + quote_id: &Uuid, + new_request_lookup_id: &str, + ) -> Result<(), Self::Err>; + + /// Update [`mint::MeltQuote`] state + /// + /// It is expected for this function to fail if the state is already set to the new state + async fn update_melt_quote_state( + &mut self, + quote_id: &Uuid, + new_state: MeltQuoteState, + ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err>; + /// Remove [`mint::MeltQuote`] + async fn remove_melt_quote(&mut self, quote_id: &Uuid) -> Result<(), Self::Err>; + /// Get all [`MintMintQuote`]s and lock it for update in this transaction + async fn get_mint_quote_by_request( + &mut self, + request: &str, + ) -> Result, Self::Err>; +} + /// Mint Quote Database trait #[async_trait] pub trait QuotesDatabase { /// Mint Quotes Database Error type Err: Into + From; - /// Add [`MintMintQuote`] - async fn add_mint_quote(&self, quote: MintMintQuote) -> Result<(), Self::Err>; /// Get [`MintMintQuote`] async fn get_mint_quote(&self, quote_id: &Uuid) -> Result, Self::Err>; - /// Update state of [`MintMintQuote`] - async fn update_mint_quote_state( - &self, - quote_id: &Uuid, - state: MintQuoteState, - ) -> Result; + /// Get all [`MintMintQuote`]s async fn get_mint_quote_by_request( &self, @@ -76,25 +134,36 @@ pub trait QuotesDatabase { &self, state: MintQuoteState, ) -> Result, Self::Err>; - /// Remove [`MintMintQuote`] - async fn remove_mint_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err>; - - /// Add [`mint::MeltQuote`] - async fn add_melt_quote(&self, quote: mint::MeltQuote) -> Result<(), Self::Err>; /// Get [`mint::MeltQuote`] async fn get_melt_quote(&self, quote_id: &Uuid) -> Result, Self::Err>; - /// Update [`mint::MeltQuote`] state - /// - /// It is expected for this function to fail if the state is already set to the new state - async fn update_melt_quote_state( - &self, - quote_id: &Uuid, - new_state: MeltQuoteState, - ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err>; /// Get all [`mint::MeltQuote`]s async fn get_melt_quotes(&self) -> Result, Self::Err>; - /// Remove [`mint::MeltQuote`] - async fn remove_melt_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err>; +} + +/// Mint Proof Transaction trait +#[async_trait] +pub trait ProofsTransaction<'a> { + /// Mint Proof Database Error + type Err: Into + From; + + /// Add [`Proofs`] + /// + /// Adds proofs to the database. The database should error if the proof already exits, with a + /// `AttemptUpdateSpentProof` if the proof is already spent or a `Duplicate` error otherwise. + async fn add_proofs(&mut self, proof: Proofs, quote_id: Option) -> Result<(), Self::Err>; + /// Updates the proofs to a given states and return the previous states + async fn update_proofs_states( + &mut self, + ys: &[PublicKey], + proofs_state: State, + ) -> Result>, Self::Err>; + + /// Remove [`Proofs`] + async fn remove_proofs( + &mut self, + ys: &[PublicKey], + quote_id: Option, + ) -> Result<(), Self::Err>; } /// Mint Proof Database trait @@ -103,29 +172,12 @@ pub trait ProofsDatabase { /// Mint Proof Database Error type Err: Into + From; - /// Add [`Proofs`] - /// - /// Adds proofs to the database. The database should error if the proof already exits, with a - /// `AttemptUpdateSpentProof` if the proof is already spent or a `Duplicate` error otherwise. - async fn add_proofs(&self, proof: Proofs, quote_id: Option) -> Result<(), Self::Err>; - /// Remove [`Proofs`] - async fn remove_proofs( - &self, - ys: &[PublicKey], - quote_id: Option, - ) -> Result<(), Self::Err>; /// Get [`Proofs`] by ys async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result>, Self::Err>; /// Get ys by quote id async fn get_proof_ys_by_quote_id(&self, quote_id: &Uuid) -> Result, Self::Err>; /// Get [`Proofs`] state async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result>, Self::Err>; - /// Get [`Proofs`] state - async fn update_proofs_states( - &self, - ys: &[PublicKey], - proofs_state: State, - ) -> Result>, Self::Err>; /// Get [`Proofs`] by state async fn get_proofs_by_keyset_id( &self, @@ -134,18 +186,32 @@ pub trait ProofsDatabase { } #[async_trait] -/// Mint Signatures Database trait -pub trait SignaturesDatabase { +/// Mint Signatures Transaction trait +pub trait SignaturesTransaction<'a> { /// Mint Signature Database Error type Err: Into + From; /// Add [`BlindSignature`] async fn add_blind_signatures( - &self, + &mut self, blinded_messages: &[PublicKey], blind_signatures: &[BlindSignature], quote_id: Option, ) -> Result<(), Self::Err>; + + /// Get [`BlindSignature`]s + async fn get_blind_signatures( + &mut self, + blinded_messages: &[PublicKey], + ) -> Result>, Self::Err>; +} + +#[async_trait] +/// Mint Signatures Database trait +pub trait SignaturesDatabase { + /// Mint Signature Database Error + type Err: Into + From; + /// Get [`BlindSignature`]s async fn get_blind_signatures( &self, @@ -163,18 +229,47 @@ pub trait SignaturesDatabase { ) -> Result, Self::Err>; } +#[async_trait] +/// Commit and Rollback +pub trait DbTransactionFinalizer { + /// Mint Signature Database Error + type Err: Into + From; + + /// Commits all the changes into the database + async fn commit(self: Box) -> Result<(), Self::Err>; + + /// Rollbacks the write transaction + async fn rollback(self: Box) -> Result<(), Self::Err>; +} + +/// Base database writer +#[async_trait] +pub trait Transaction<'a, Error>: + DbTransactionFinalizer + + QuotesTransaction<'a, Err = Error> + + SignaturesTransaction<'a, Err = Error> + + ProofsTransaction<'a, Err = Error> +{ + /// Set [`QuoteTTL`] + async fn set_quote_ttl(&mut self, quote_ttl: QuoteTTL) -> Result<(), Error>; + + /// Set [`MintInfo`] + async fn set_mint_info(&mut self, mint_info: MintInfo) -> Result<(), Error>; +} + /// Mint Database trait #[async_trait] pub trait Database: QuotesDatabase + ProofsDatabase + SignaturesDatabase { - /// Set [`MintInfo`] - async fn set_mint_info(&self, mint_info: MintInfo) -> Result<(), Error>; + /// Beings a transaction + async fn begin_transaction<'a>( + &'a self, + ) -> Result + Send + Sync + 'a>, Error>; + /// Get [`MintInfo`] async fn get_mint_info(&self) -> Result; - /// Set [`QuoteTTL`] - async fn set_quote_ttl(&self, quote_ttl: QuoteTTL) -> Result<(), Error>; /// Get [`QuoteTTL`] async fn get_quote_ttl(&self) -> Result; } diff --git a/crates/cdk-common/src/database/mint/test.rs b/crates/cdk-common/src/database/mint/test.rs index 5d03fb7c..7ad65e7a 100644 --- a/crates/cdk-common/src/database/mint/test.rs +++ b/crates/cdk-common/src/database/mint/test.rs @@ -2,17 +2,20 @@ //! //! This set is generic and checks the default and expected behaviour for a mint database //! implementation -use std::fmt::Debug; use std::str::FromStr; use cashu::secret::Secret; use cashu::{Amount, CurrencyUnit, SecretKey}; use super::*; +use crate::database; use crate::mint::MintKeySetInfo; #[inline] -async fn setup_keyset + KeysDatabase>(db: &DB) -> Id { +async fn setup_keyset(db: &DB) -> Id +where + DB: KeysDatabase, +{ let keyset_id = Id::from_str("00916bbf7ef91a36").unwrap(); let keyset_info = MintKeySetInfo { id: keyset_id, @@ -25,12 +28,17 @@ async fn setup_keyset + KeysDatabase>(db: &DB max_order: 32, input_fee_ppk: 0, }; - db.add_keyset_info(keyset_info).await.unwrap(); + let mut writer = db.begin_transaction().await.expect("db.begin()"); + writer.add_keyset_info(keyset_info).await.unwrap(); + writer.commit().await.expect("commit()"); keyset_id } /// State transition test -pub async fn state_transition + KeysDatabase>(db: DB) { +pub async fn state_transition(db: DB) +where + DB: Database + KeysDatabase, +{ let keyset_id = setup_keyset(&db).await; let proofs = vec![ @@ -53,19 +61,21 @@ pub async fn state_transition + KeysDatabase> ]; // Add proofs to database - db.add_proofs(proofs.clone(), None).await.unwrap(); + let mut tx = Database::begin_transaction(&db).await.unwrap(); + tx.add_proofs(proofs.clone(), None).await.unwrap(); // Mark one proof as `pending` - assert!(db + assert!(tx .update_proofs_states(&[proofs[0].y().unwrap()], State::Pending) .await .is_ok()); // Attempt to select the `pending` proof, as `pending` again (which should fail) - assert!(db + assert!(tx .update_proofs_states(&[proofs[0].y().unwrap()], State::Pending) .await .is_err()); + tx.commit().await.unwrap(); } /// Unit test that is expected to be passed for a correct database implementation diff --git a/crates/cdk-common/src/database/mod.rs b/crates/cdk-common/src/database/mod.rs index 4574e424..8fad4132 100644 --- a/crates/cdk-common/src/database/mod.rs +++ b/crates/cdk-common/src/database/mod.rs @@ -5,14 +5,17 @@ pub mod mint; #[cfg(feature = "wallet")] mod wallet; -#[cfg(all(feature = "mint", feature = "auth"))] -pub use mint::MintAuthDatabase; #[cfg(feature = "mint")] pub use mint::{ - Database as MintDatabase, KeysDatabase as MintKeysDatabase, - ProofsDatabase as MintProofsDatabase, QuotesDatabase as MintQuotesDatabase, + Database as MintDatabase, DbTransactionFinalizer as MintDbWriterFinalizer, + KeysDatabase as MintKeysDatabase, KeysDatabaseTransaction as MintKeyDatabaseTransaction, + ProofsDatabase as MintProofsDatabase, ProofsTransaction as MintProofsTransaction, + QuotesDatabase as MintQuotesDatabase, QuotesTransaction as MintQuotesTransaction, SignaturesDatabase as MintSignaturesDatabase, + SignaturesTransaction as MintSignatureTransaction, Transaction as MintTransaction, }; +#[cfg(all(feature = "mint", feature = "auth"))] +pub use mint::{MintAuthDatabase, MintAuthTransaction}; #[cfg(feature = "wallet")] pub use wallet::Database as WalletDatabase; diff --git a/crates/cdk-integration-tests/src/init_auth_mint.rs b/crates/cdk-integration-tests/src/init_auth_mint.rs index 49c62320..822334da 100644 --- a/crates/cdk-integration-tests/src/init_auth_mint.rs +++ b/crates/cdk-integration-tests/src/init_auth_mint.rs @@ -71,9 +71,9 @@ where acc }); - auth_database - .add_protected_endpoints(blind_auth_endpoints) - .await?; + let mut tx = auth_database.begin_transaction().await?; + + tx.add_protected_endpoints(blind_auth_endpoints).await?; let mut clear_auth_endpoint = HashMap::new(); clear_auth_endpoint.insert( @@ -81,9 +81,9 @@ where AuthRequired::Clear, ); - auth_database - .add_protected_endpoints(clear_auth_endpoint) - .await?; + tx.add_protected_endpoints(clear_auth_endpoint).await?; + + tx.commit().await?; mint_builder = mint_builder.with_auth_localstore(Arc::new(auth_database)); diff --git a/crates/cdk-integration-tests/src/init_pure_tests.rs b/crates/cdk-integration-tests/src/init_pure_tests.rs index 2896612b..155f03e3 100644 --- a/crates/cdk-integration-tests/src/init_pure_tests.rs +++ b/crates/cdk-integration-tests/src/init_pure_tests.rs @@ -227,11 +227,12 @@ pub async fn create_and_start_test_mint() -> Result { .map(|x| x.clone()) .expect("localstore"); - localstore - .set_mint_info(mint_builder.mint_info.clone()) - .await?; + let mut tx = localstore.begin_transaction().await?; + tx.set_mint_info(mint_builder.mint_info.clone()).await?; + let quote_ttl = QuoteTTL::new(10000, 10000); - localstore.set_quote_ttl(quote_ttl).await?; + tx.set_quote_ttl(quote_ttl).await?; + tx.commit().await?; let mint = mint_builder.build().await?; diff --git a/crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs b/crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs index 935149cb..2f596388 100644 --- a/crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs +++ b/crates/cdk-integration-tests/tests/happy_path_mint_wallet.rs @@ -425,9 +425,7 @@ async fn test_pay_invoice_twice() { let melt = wallet.melt(&melt_quote.id).await.unwrap(); - let melt_two = wallet.melt_quote(invoice, None).await.unwrap(); - - let melt_two = wallet.melt(&melt_two.id).await; + let melt_two = wallet.melt_quote(invoice, None).await; match melt_two { Err(err) => match err { diff --git a/crates/cdk-integration-tests/tests/mint.rs b/crates/cdk-integration-tests/tests/mint.rs index 662ae5af..33c4dd47 100644 --- a/crates/cdk-integration-tests/tests/mint.rs +++ b/crates/cdk-integration-tests/tests/mint.rs @@ -51,13 +51,15 @@ async fn test_correct_keyset() { .with_seed(mnemonic.to_seed_normalized("").to_vec()); let mint = mint_builder.build().await.unwrap(); + let mut tx = localstore.begin_transaction().await.unwrap(); - localstore - .set_mint_info(mint_builder.mint_info.clone()) + tx.set_mint_info(mint_builder.mint_info.clone()) .await .unwrap(); let quote_ttl = QuoteTTL::new(10000, 10000); - localstore.set_quote_ttl(quote_ttl).await.unwrap(); + tx.set_quote_ttl(quote_ttl).await.unwrap(); + + tx.commit().await.unwrap(); let active = mint.get_active_keysets(); diff --git a/crates/cdk-mint-rpc/src/proto/server.rs b/crates/cdk-mint-rpc/src/proto/server.rs index 5d25afd0..dbba9479 100644 --- a/crates/cdk-mint-rpc/src/proto/server.rs +++ b/crates/cdk-mint-rpc/src/proto/server.rs @@ -655,8 +655,16 @@ impl CdkMint for MintRPCServer { mint_quote.state = state; - self.mint - .update_mint_quote(mint_quote) + let mut tx = self + .mint + .localstore + .begin_transaction() + .await + .map_err(|_| Status::internal("Could not update quote".to_string()))?; + tx.add_or_replace_mint_quote(mint_quote) + .await + .map_err(|_| Status::internal("Could not update quote".to_string()))?; + tx.commit() .await .map_err(|_| Status::internal("Could not update quote".to_string()))?; } diff --git a/crates/cdk-mintd/src/main.rs b/crates/cdk-mintd/src/main.rs index aab2a34d..5f996666 100644 --- a/crates/cdk-mintd/src/main.rs +++ b/crates/cdk-mintd/src/main.rs @@ -526,12 +526,11 @@ async fn main() -> anyhow::Result<()> { mint_builder = mint_builder.set_blind_auth_settings(auth_settings.mint_max_bat); - auth_localstore - .remove_protected_endpoints(unprotected_endpoints) - .await?; - auth_localstore - .add_protected_endpoints(protected_endpoints) - .await?; + let mut tx = auth_localstore.begin_transaction().await?; + + tx.remove_protected_endpoints(unprotected_endpoints).await?; + tx.add_protected_endpoints(protected_endpoints).await?; + tx.commit().await?; } let mint = mint_builder.build().await?; diff --git a/crates/cdk-signatory/src/common.rs b/crates/cdk-signatory/src/common.rs index 5459a662..6663f881 100644 --- a/crates/cdk-signatory/src/common.rs +++ b/crates/cdk-signatory/src/common.rs @@ -25,13 +25,14 @@ pub async fn init_keysets( // Get keysets info from DB let keysets_infos = localstore.get_keyset_infos().await?; + let mut tx = localstore.begin_transaction().await?; if !keysets_infos.is_empty() { tracing::debug!("Setting all saved keysets to inactive"); for keyset in keysets_infos.clone() { // Set all to in active let mut keyset = keyset; keyset.active = false; - localstore.add_keyset_info(keyset).await?; + tx.add_keyset_info(keyset).await?; } let keysets_by_unit: HashMap> = @@ -74,9 +75,9 @@ pub async fn init_keysets( active_keysets.insert(id, keyset); let mut keyset_info = highest_index_keyset; keyset_info.active = true; - localstore.add_keyset_info(keyset_info).await?; + tx.add_keyset_info(keyset_info).await?; active_keyset_units.push(unit.clone()); - localstore.set_active_keyset(unit, id).await?; + tx.set_active_keyset(unit, id).await?; } else { // Check to see if there are not keysets by this unit let derivation_path_index = if keysets.is_empty() { @@ -104,8 +105,8 @@ pub async fn init_keysets( ); let id = keyset_info.id; - localstore.add_keyset_info(keyset_info).await?; - localstore.set_active_keyset(unit.clone(), id).await?; + tx.add_keyset_info(keyset_info).await?; + tx.set_active_keyset(unit.clone(), id).await?; active_keysets.insert(id, keyset); active_keyset_units.push(unit.clone()); }; @@ -113,6 +114,8 @@ pub async fn init_keysets( } } + tx.commit().await?; + Ok((active_keysets, active_keyset_units)) } diff --git a/crates/cdk-signatory/src/db_signatory.rs b/crates/cdk-signatory/src/db_signatory.rs index 834e2309..c1dae7f8 100644 --- a/crates/cdk-signatory/src/db_signatory.rs +++ b/crates/cdk-signatory/src/db_signatory.rs @@ -53,6 +53,7 @@ impl DbSignatory { .await?; supported_units.entry(CurrencyUnit::Auth).or_insert((0, 1)); + let mut tx = localstore.begin_transaction().await?; // Create new keysets for supported units that aren't covered by the current keysets for (unit, (fee, max_order)) in supported_units { @@ -77,12 +78,14 @@ impl DbSignatory { ); let id = keyset_info.id; - localstore.add_keyset_info(keyset_info).await?; - localstore.set_active_keyset(unit, id).await?; + tx.add_keyset_info(keyset_info).await?; + tx.set_active_keyset(unit, id).await?; active_keysets.insert(id, keyset); } } + tx.commit().await?; + let keys = Self { keysets: Default::default(), active_keysets: Default::default(), @@ -244,8 +247,10 @@ impl Signatory for DbSignatory { None, ); let id = info.id; - self.localstore.add_keyset_info(info.clone()).await?; - self.localstore.set_active_keyset(args.unit, id).await?; + let mut tx = self.localstore.begin_transaction().await?; + tx.add_keyset_info(info.clone()).await?; + tx.set_active_keyset(args.unit, id).await?; + tx.commit().await?; self.reload_keys_from_db().await?; diff --git a/crates/cdk-sqlite/src/mint/auth/mod.rs b/crates/cdk-sqlite/src/mint/auth/mod.rs index 9268e065..4a002fa8 100644 --- a/crates/cdk-sqlite/src/mint/auth/mod.rs +++ b/crates/cdk-sqlite/src/mint/auth/mod.rs @@ -6,14 +6,14 @@ use std::path::Path; use std::str::FromStr; use async_trait::async_trait; -use cdk_common::database::{self, MintAuthDatabase}; +use cdk_common::database::{self, MintAuthDatabase, MintAuthTransaction}; use cdk_common::mint::MintKeySetInfo; use cdk_common::nuts::{AuthProof, BlindSignature, Id, PublicKey, State}; use cdk_common::{AuthRequired, ProtectedEndpoint}; use tracing::instrument; use super::async_rusqlite::AsyncRusqlite; -use super::{sqlite_row_to_blind_signature, sqlite_row_to_keyset_info}; +use super::{sqlite_row_to_blind_signature, sqlite_row_to_keyset_info, SqliteTransaction}; use crate::column_as_string; use crate::common::{create_sqlite_pool, migrate}; use crate::mint::async_rusqlite::query; @@ -56,11 +56,9 @@ impl MintSqliteAuthDatabase { } #[async_trait] -impl MintAuthDatabase for MintSqliteAuthDatabase { - type Err = database::Error; - +impl MintAuthTransaction for SqliteTransaction<'_> { #[instrument(skip(self))] - async fn set_active_keyset(&self, id: Id) -> Result<(), Self::Err> { + async fn set_active_keyset(&mut self, id: Id) -> Result<(), database::Error> { tracing::info!("Setting auth keyset {id} active"); query( r#" @@ -72,30 +70,13 @@ impl MintAuthDatabase for MintSqliteAuthDatabase { "#, ) .bind(":id", id.to_string()) - .execute(&self.pool) + .execute(&self.inner) .await?; Ok(()) } - async fn get_active_keyset_id(&self) -> Result, Self::Err> { - Ok(query( - r#" - SELECT - id - FROM - keyset - WHERE - active = 1; - "#, - ) - .pluck(&self.pool) - .await? - .map(|id| Ok::<_, Error>(column_as_string!(id, Id::from_str, Id::from_bytes))) - .transpose()?) - } - - async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err> { + async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), database::Error> { query( r#" INSERT INTO @@ -125,12 +106,159 @@ impl MintAuthDatabase for MintSqliteAuthDatabase { .bind(":derivation_path", keyset.derivation_path.to_string()) .bind(":max_order", keyset.max_order) .bind(":derivation_path_index", keyset.derivation_path_index) - .execute(&self.pool) + .execute(&self.inner) .await?; Ok(()) } + async fn add_proof(&mut self, proof: AuthProof) -> Result<(), database::Error> { + if let Err(err) = query( + r#" + INSERT INTO proof + (y, keyset_id, secret, c, state) + VALUES + (:y, :keyset_id, :secret, :c, :state) + "#, + ) + .bind(":y", proof.y()?.to_bytes().to_vec()) + .bind(":keyset_id", proof.keyset_id.to_string()) + .bind(":secret", proof.secret.to_string()) + .bind(":c", proof.c.to_bytes().to_vec()) + .bind(":state", "UNSPENT".to_string()) + .execute(&self.inner) + .await + { + tracing::debug!("Attempting to add known proof. Skipping.... {:?}", err); + } + Ok(()) + } + + async fn update_proof_state( + &mut self, + y: &PublicKey, + proofs_state: State, + ) -> Result, Self::Err> { + let current_state = query(r#"SELECT state FROM proof WHERE y = :y"#) + .bind(":y", y.to_bytes().to_vec()) + .pluck(&self.inner) + .await? + .map(|state| Ok::<_, Error>(column_as_string!(state, State::from_str))) + .transpose()?; + + query(r#"UPDATE proof SET state = :new_state WHERE state = :state AND y = :y"#) + .bind(":y", y.to_bytes().to_vec()) + .bind( + ":state", + current_state.as_ref().map(|state| state.to_string()), + ) + .bind(":new_state", proofs_state.to_string()) + .execute(&self.inner) + .await?; + + Ok(current_state) + } + + async fn add_blind_signatures( + &mut self, + blinded_messages: &[PublicKey], + blind_signatures: &[BlindSignature], + ) -> Result<(), database::Error> { + for (message, signature) in blinded_messages.iter().zip(blind_signatures) { + query( + r#" + INSERT + INTO blind_signature + (y, amount, keyset_id, c) + VALUES + (:y, :amount, :keyset_id, :c) + "#, + ) + .bind(":y", message.to_bytes().to_vec()) + .bind(":amount", u64::from(signature.amount) as i64) + .bind(":keyset_id", signature.keyset_id.to_string()) + .bind(":c", signature.c.to_bytes().to_vec()) + .execute(&self.inner) + .await?; + } + + Ok(()) + } + + async fn add_protected_endpoints( + &mut self, + protected_endpoints: HashMap, + ) -> Result<(), database::Error> { + for (endpoint, auth) in protected_endpoints.iter() { + if let Err(err) = query( + r#" + INSERT OR REPLACE INTO protected_endpoints + (endpoint, auth) + VALUES (:endpoint, :auth); + "#, + ) + .bind(":endpoint", serde_json::to_string(endpoint)?) + .bind(":auth", serde_json::to_string(auth)?) + .execute(&self.inner) + .await + { + tracing::debug!( + "Attempting to add protected endpoint. Skipping.... {:?}", + err + ); + } + } + + Ok(()) + } + async fn remove_protected_endpoints( + &mut self, + protected_endpoints: Vec, + ) -> Result<(), database::Error> { + query(r#"DELETE FROM protected_endpoints WHERE endpoint IN (:endpoints)"#) + .bind_vec( + ":endpoints", + protected_endpoints + .iter() + .map(serde_json::to_string) + .collect::>()?, + ) + .execute(&self.inner) + .await?; + Ok(()) + } +} + +#[async_trait] +impl MintAuthDatabase for MintSqliteAuthDatabase { + type Err = database::Error; + + async fn begin_transaction<'a>( + &'a self, + ) -> Result + Send + Sync + 'a>, database::Error> + { + Ok(Box::new(SqliteTransaction { + inner: self.pool.begin().await?, + })) + } + + async fn get_active_keyset_id(&self) -> Result, Self::Err> { + Ok(query( + r#" + SELECT + id + FROM + keyset + WHERE + active = 1; + "#, + ) + .pluck(&self.pool) + .await? + .map(|id| Ok::<_, Error>(column_as_string!(id, Id::from_str, Id::from_bytes))) + .transpose()?) + } + async fn get_keyset_info(&self, id: &Id) -> Result, Self::Err> { Ok(query( r#"SELECT @@ -177,28 +305,6 @@ impl MintAuthDatabase for MintSqliteAuthDatabase { .collect::, _>>()?) } - async fn add_proof(&self, proof: AuthProof) -> Result<(), Self::Err> { - if let Err(err) = query( - r#" - INSERT INTO proof - (y, keyset_id, secret, c, state) - VALUES - (:y, :keyset_id, :secret, :c, :state) - "#, - ) - .bind(":y", proof.y()?.to_bytes().to_vec()) - .bind(":keyset_id", proof.keyset_id.to_string()) - .bind(":secret", proof.secret.to_string()) - .bind(":c", proof.c.to_bytes().to_vec()) - .bind(":state", "UNSPENT".to_string()) - .execute(&self.pool) - .await - { - tracing::debug!("Attempting to add known proof. Skipping.... {:?}", err); - } - Ok(()) - } - async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result>, Self::Err> { let mut current_states = query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#) .bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect()) @@ -216,65 +322,6 @@ impl MintAuthDatabase for MintSqliteAuthDatabase { Ok(ys.iter().map(|y| current_states.remove(y)).collect()) } - async fn update_proof_state( - &self, - y: &PublicKey, - proofs_state: State, - ) -> Result, Self::Err> { - let transaction = self.pool.begin().await?; - - let current_state = query(r#"SELECT state FROM proof WHERE y = :y"#) - .bind(":y", y.to_bytes().to_vec()) - .pluck(&transaction) - .await? - .map(|state| Ok::<_, Error>(column_as_string!(state, State::from_str))) - .transpose()?; - - query(r#"UPDATE proof SET state = :new_state WHERE state = :state AND y = :y"#) - .bind(":y", y.to_bytes().to_vec()) - .bind( - ":state", - current_state.as_ref().map(|state| state.to_string()), - ) - .bind(":new_state", proofs_state.to_string()) - .execute(&transaction) - .await?; - - transaction.commit().await?; - - Ok(current_state) - } - - async fn add_blind_signatures( - &self, - blinded_messages: &[PublicKey], - blind_signatures: &[BlindSignature], - ) -> Result<(), Self::Err> { - let transaction = self.pool.begin().await?; - - for (message, signature) in blinded_messages.iter().zip(blind_signatures) { - query( - r#" - INSERT - INTO blind_signature - (y, amount, keyset_id, c) - VALUES - (:y, :amount, :keyset_id, :c) - "#, - ) - .bind(":y", message.to_bytes().to_vec()) - .bind(":amount", u64::from(signature.amount) as i64) - .bind(":keyset_id", signature.keyset_id.to_string()) - .bind(":c", signature.c.to_bytes().to_vec()) - .execute(&transaction) - .await?; - } - - transaction.commit().await?; - - Ok(()) - } - async fn get_blind_signatures( &self, blinded_messages: &[PublicKey], @@ -319,53 +366,6 @@ impl MintAuthDatabase for MintSqliteAuthDatabase { .collect()) } - async fn add_protected_endpoints( - &self, - protected_endpoints: HashMap, - ) -> Result<(), Self::Err> { - let transaction = self.pool.begin().await?; - - for (endpoint, auth) in protected_endpoints.iter() { - if let Err(err) = query( - r#" - INSERT OR REPLACE INTO protected_endpoints - (endpoint, auth) - VALUES (:endpoint, :auth); - "#, - ) - .bind(":endpoint", serde_json::to_string(endpoint)?) - .bind(":auth", serde_json::to_string(auth)?) - .execute(&transaction) - .await - { - tracing::debug!( - "Attempting to add protected endpoint. Skipping.... {:?}", - err - ); - } - } - - transaction.commit().await?; - - Ok(()) - } - async fn remove_protected_endpoints( - &self, - protected_endpoints: Vec, - ) -> Result<(), Self::Err> { - query(r#"DELETE FROM protected_endpoints WHERE endpoint IN (:endpoints)"#) - .bind_vec( - ":endpoints", - protected_endpoints - .iter() - .map(serde_json::to_string) - .collect::>()?, - ) - .execute(&self.pool) - .await?; - Ok(()) - } - async fn get_auth_for_endpoint( &self, protected_endpoint: ProtectedEndpoint, diff --git a/crates/cdk-sqlite/src/mint/memory.rs b/crates/cdk-sqlite/src/mint/memory.rs index ce47c43b..735f7670 100644 --- a/crates/cdk-sqlite/src/mint/memory.rs +++ b/crates/cdk-sqlite/src/mint/memory.rs @@ -1,9 +1,7 @@ //! In-memory database that is provided by the `cdk-sqlite` crate, mainly for testing purposes. use std::collections::HashMap; -use cdk_common::database::{ - self, MintDatabase, MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, -}; +use cdk_common::database::{self, MintDatabase, MintKeysDatabase}; use cdk_common::mint::{self, MintKeySetInfo, MintQuote}; use cdk_common::nuts::{CurrencyUnit, Id, Proofs}; use cdk_common::MintInfo; @@ -31,28 +29,32 @@ pub async fn new_with_state( mint_info: MintInfo, ) -> Result { let db = empty().await?; + let mut tx = MintKeysDatabase::begin_transaction(&db).await?; for active_keyset in active_keysets { - db.set_active_keyset(active_keyset.0, active_keyset.1) + tx.set_active_keyset(active_keyset.0, active_keyset.1) .await?; } for keyset in keysets { - db.add_keyset_info(keyset).await?; + tx.add_keyset_info(keyset).await?; } + tx.commit().await?; + + let mut tx = MintDatabase::begin_transaction(&db).await?; for quote in mint_quotes { - db.add_mint_quote(quote).await?; + tx.add_or_replace_mint_quote(quote).await?; } for quote in melt_quotes { - db.add_melt_quote(quote).await?; + tx.add_melt_quote(quote).await?; } - db.add_proofs(pending_proofs, None).await?; - db.add_proofs(spent_proofs, None).await?; - - db.set_mint_info(mint_info).await?; + tx.add_proofs(pending_proofs, None).await?; + tx.add_proofs(spent_proofs, None).await?; + tx.set_mint_info(mint_info).await?; + tx.commit().await?; Ok(db) } diff --git a/crates/cdk-sqlite/src/mint/mod.rs b/crates/cdk-sqlite/src/mint/mod.rs index 3e6b5c5c..f4cf3e92 100644 --- a/crates/cdk-sqlite/src/mint/mod.rs +++ b/crates/cdk-sqlite/src/mint/mod.rs @@ -5,13 +5,14 @@ use std::ops::DerefMut; use std::path::Path; use std::str::FromStr; -use async_rusqlite::{query, DatabaseExecutor}; +use async_rusqlite::{query, DatabaseExecutor, Transaction}; use async_trait::async_trait; use bitcoin::bip32::DerivationPath; use cdk_common::common::QuoteTTL; use cdk_common::database::{ - self, MintDatabase, MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, - MintSignaturesDatabase, + self, MintDatabase, MintDbWriterFinalizer, MintKeyDatabaseTransaction, MintKeysDatabase, + MintProofsDatabase, MintProofsTransaction, MintQuotesDatabase, MintQuotesTransaction, + MintSignatureTransaction, MintSignaturesDatabase, }; use cdk_common::mint::{self, MintKeySetInfo, MintQuote}; use cdk_common::nut00::ProofsMethods; @@ -52,6 +53,48 @@ pub struct MintSqliteDatabase { pool: async_rusqlite::AsyncRusqlite, } +#[inline(always)] +async fn get_current_states( + conn: &C, + ys: &[PublicKey], +) -> Result, Error> +where + C: DatabaseExecutor + Send + Sync, +{ + query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#) + .bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect()) + .fetch_all(conn) + .await? + .into_iter() + .map(|row| { + Ok(( + column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice), + column_as_string!(&row[1], State::from_str), + )) + }) + .collect::, _>>() +} + +#[inline(always)] +async fn set_to_config(conn: &C, id: &str, value: &T) -> Result<(), Error> +where + T: ?Sized + serde::Serialize, + C: DatabaseExecutor + Send + Sync, +{ + query( + r#" + INSERT INTO config (id, value) VALUES (:id, :value) + ON CONFLICT(id) DO UPDATE SET value = excluded.value + "#, + ) + .bind(":id", id.to_owned()) + .bind(":value", serde_json::to_string(&value)?) + .execute(conn) + .await?; + + Ok(()) +} + impl MintSqliteDatabase { /// Create new [`MintSqliteDatabase`] #[cfg(not(feature = "sqlcipher"))] @@ -78,48 +121,6 @@ impl MintSqliteDatabase { }) } - #[inline(always)] - async fn get_current_states( - &self, - conn: &C, - ys: &[PublicKey], - ) -> Result, Error> - where - C: DatabaseExecutor + Send + Sync, - { - query(r#"SELECT y, state FROM proof WHERE y IN (:ys)"#) - .bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect()) - .fetch_all(conn) - .await? - .into_iter() - .map(|row| { - Ok(( - column_as_string!(&row[0], PublicKey::from_hex, PublicKey::from_slice), - column_as_string!(&row[1], State::from_str), - )) - }) - .collect::, _>>() - } - - #[inline(always)] - async fn set_to_config(&self, id: &str, value: &T) -> Result<(), Error> - where - T: ?Sized + serde::Serialize, - { - query( - r#" - INSERT INTO config (id, value) VALUES (:id, :value) - ON CONFLICT(id) DO UPDATE SET value = excluded.value - "#, - ) - .bind(":id", id.to_owned()) - .bind(":value", serde_json::to_string(&value)?) - .execute(&self.pool) - .await?; - - Ok(()) - } - #[inline(always)] async fn fetch_from_config(&self, id: &str) -> Result where @@ -135,59 +136,38 @@ impl MintSqliteDatabase { } } +/// Sqlite Writer +pub struct SqliteTransaction<'a> { + inner: Transaction<'a>, +} + #[async_trait] -impl MintKeysDatabase for MintSqliteDatabase { +impl<'a> database::MintTransaction<'a, database::Error> for SqliteTransaction<'a> { + async fn set_mint_info(&mut self, mint_info: MintInfo) -> Result<(), database::Error> { + Ok(set_to_config(&self.inner, "mint_info", &mint_info).await?) + } + + async fn set_quote_ttl(&mut self, quote_ttl: QuoteTTL) -> Result<(), database::Error> { + Ok(set_to_config(&self.inner, "quote_ttl", "e_ttl).await?) + } +} + +#[async_trait] +impl MintDbWriterFinalizer for SqliteTransaction<'_> { type Err = database::Error; - async fn set_active_keyset(&self, unit: CurrencyUnit, id: Id) -> Result<(), Self::Err> { - let transaction = self.pool.begin().await?; - - query(r#"UPDATE keyset SET active=FALSE WHERE unit IS :unit"#) - .bind(":unit", unit.to_string()) - .execute(&transaction) - .await?; - - query(r#"UPDATE keyset SET active=TRUE WHERE unit IS :unit AND id IS :id"#) - .bind(":unit", unit.to_string()) - .bind(":id", id.to_string()) - .execute(&transaction) - .await?; - - transaction.commit().await?; - - Ok(()) + async fn commit(self: Box) -> Result<(), database::Error> { + Ok(self.inner.commit().await?) } - async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result, Self::Err> { - Ok( - query(r#" SELECT id FROM keyset WHERE active = 1 AND unit IS :unit"#) - .bind(":unit", unit.to_string()) - .pluck(&self.pool) - .await? - .map(|id| match id { - Column::Text(text) => Ok(Id::from_str(&text)?), - Column::Blob(id) => Ok(Id::from_bytes(&id)?), - _ => Err(Error::InvalidKeysetId), - }) - .transpose()?, - ) + async fn rollback(self: Box) -> Result<(), database::Error> { + Ok(self.inner.rollback().await?) } +} - async fn get_active_keysets(&self) -> Result, Self::Err> { - Ok(query(r#"SELECT id, unit FROM keyset WHERE active = 1"#) - .fetch_all(&self.pool) - .await? - .into_iter() - .map(|row| { - Ok(( - column_as_string!(&row[1], CurrencyUnit::from_str), - column_as_string!(&row[0], Id::from_str, Id::from_bytes), - )) - }) - .collect::, Error>>()?) - } - - async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err> { +#[async_trait] +impl<'a> MintKeyDatabaseTransaction<'a, database::Error> for SqliteTransaction<'a> { + async fn add_keyset_info(&mut self, keyset: MintKeySetInfo) -> Result<(), database::Error> { query( r#" INSERT INTO @@ -219,12 +199,76 @@ impl MintKeysDatabase for MintSqliteDatabase { .bind(":max_order", keyset.max_order) .bind(":input_fee_ppk", keyset.input_fee_ppk as i64) .bind(":derivation_path_index", keyset.derivation_path_index) - .execute(&self.pool) + .execute(&self.inner) .await?; Ok(()) } + async fn set_active_keyset( + &mut self, + unit: CurrencyUnit, + id: Id, + ) -> Result<(), database::Error> { + query(r#"UPDATE keyset SET active=FALSE WHERE unit IS :unit"#) + .bind(":unit", unit.to_string()) + .execute(&self.inner) + .await?; + + query(r#"UPDATE keyset SET active=TRUE WHERE unit IS :unit AND id IS :id"#) + .bind(":unit", unit.to_string()) + .bind(":id", id.to_string()) + .execute(&self.inner) + .await?; + + Ok(()) + } +} + +#[async_trait] +impl MintKeysDatabase for MintSqliteDatabase { + type Err = database::Error; + + async fn begin_transaction<'a>( + &'a self, + ) -> Result< + Box + Send + Sync + 'a>, + database::Error, + > { + Ok(Box::new(SqliteTransaction { + inner: self.pool.begin().await?, + })) + } + + async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result, Self::Err> { + Ok( + query(r#" SELECT id FROM keyset WHERE active = 1 AND unit IS :unit"#) + .bind(":unit", unit.to_string()) + .pluck(&self.pool) + .await? + .map(|id| match id { + Column::Text(text) => Ok(Id::from_str(&text)?), + Column::Blob(id) => Ok(Id::from_bytes(&id)?), + _ => Err(Error::InvalidKeysetId), + }) + .transpose()?, + ) + } + + async fn get_active_keysets(&self) -> Result, Self::Err> { + Ok(query(r#"SELECT id, unit FROM keyset WHERE active = 1"#) + .fetch_all(&self.pool) + .await? + .into_iter() + .map(|row| { + Ok(( + column_as_string!(&row[1], CurrencyUnit::from_str), + column_as_string!(&row[0], Id::from_str, Id::from_bytes), + )) + }) + .collect::, Error>>()?) + } + async fn get_keyset_info(&self, id: &Id) -> Result, Self::Err> { Ok(query( r#"SELECT @@ -273,10 +317,10 @@ impl MintKeysDatabase for MintSqliteDatabase { } #[async_trait] -impl MintQuotesDatabase for MintSqliteDatabase { +impl<'a> MintQuotesTransaction<'a> for SqliteTransaction<'a> { type Err = database::Error; - async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), Self::Err> { + async fn add_or_replace_mint_quote(&mut self, quote: MintQuote) -> Result<(), Self::Err> { query( r#" INSERT OR REPLACE INTO mint_quote ( @@ -300,12 +344,328 @@ impl MintQuotesDatabase for MintSqliteDatabase { .bind(":created_time", quote.created_time as i64) .bind(":paid_time", quote.paid_time.map(|t| t as i64)) .bind(":issued_time", quote.issued_time.map(|t| t as i64)) - .execute(&self.pool) + .execute(&self.inner) .await?; Ok(()) } + async fn remove_mint_quote(&mut self, quote_id: &Uuid) -> Result<(), Self::Err> { + query(r#"DELETE FROM mint_quote WHERE id=:id"#) + .bind(":id", quote_id.as_hyphenated().to_string()) + .execute(&self.inner) + .await?; + Ok(()) + } + + async fn add_melt_quote(&mut self, quote: mint::MeltQuote) -> Result<(), Self::Err> { + // First try to find and replace any expired UNPAID quotes with the same request_lookup_id + let current_time = unix_time(); + let row_affected = query( + r#" + DELETE FROM melt_quote + WHERE request_lookup_id = :request_lookup_id + AND state = :state + AND expiry < :current_time + "#, + ) + .bind(":request_lookup_id", quote.request_lookup_id.to_string()) + .bind(":state", MeltQuoteState::Unpaid.to_string()) + .bind(":current_time", current_time as i64) + .execute(&self.inner) + .await?; + + if row_affected > 0 { + tracing::info!("Received new melt quote for existing invoice with expired quote."); + } + + // Now insert the new quote + query( + r#" + INSERT INTO melt_quote + ( + id, unit, amount, request, fee_reserve, state, + expiry, payment_preimage, request_lookup_id, msat_to_pay, + created_time, paid_time + ) + VALUES + ( + :id, :unit, :amount, :request, :fee_reserve, :state, + :expiry, :payment_preimage, :request_lookup_id, :msat_to_pay, + :created_time, :paid_time + ) + "#, + ) + .bind(":id", quote.id.to_string()) + .bind(":unit", quote.unit.to_string()) + .bind(":amount", u64::from(quote.amount) as i64) + .bind(":request", quote.request) + .bind(":fee_reserve", u64::from(quote.fee_reserve) as i64) + .bind(":state", quote.state.to_string()) + .bind(":expiry", quote.expiry as i64) + .bind(":payment_preimage", quote.payment_preimage) + .bind(":request_lookup_id", quote.request_lookup_id) + .bind( + ":msat_to_pay", + quote.msat_to_pay.map(|a| u64::from(a) as i64), + ) + .bind(":created_time", quote.created_time as i64) + .bind(":paid_time", quote.paid_time.map(|t| t as i64)) + .execute(&self.inner) + .await?; + + Ok(()) + } + + async fn update_melt_quote_request_lookup_id( + &mut self, + quote_id: &Uuid, + new_request_lookup_id: &str, + ) -> Result<(), Self::Err> { + query(r#"UPDATE melt_quote SET request_lookup_id = :new_req_id WHERE id = :id"#) + .bind(":new_req_id", new_request_lookup_id.to_owned()) + .bind(":id", quote_id.as_hyphenated().to_string()) + .execute(&self.inner) + .await?; + Ok(()) + } + + async fn update_melt_quote_state( + &mut self, + quote_id: &Uuid, + state: MeltQuoteState, + ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> { + let mut quote = query( + r#" + SELECT + id, + unit, + amount, + request, + fee_reserve, + state, + expiry, + payment_preimage, + request_lookup_id, + msat_to_pay, + created_time, + paid_time + FROM + melt_quote + WHERE + id=:id + AND state != :state + "#, + ) + .bind(":id", quote_id.as_hyphenated().to_string()) + .bind(":state", state.to_string()) + .fetch_one(&self.inner) + .await? + .map(sqlite_row_to_melt_quote) + .transpose()? + .ok_or(Error::QuoteNotFound)?; + + let rec = if state == MeltQuoteState::Paid { + let current_time = unix_time(); + query(r#"UPDATE melt_quote SET state = :state, paid_time = :paid_time WHERE id = :id"#) + .bind(":state", state.to_string()) + .bind(":paid_time", current_time as i64) + .bind(":id", quote_id.as_hyphenated().to_string()) + .execute(&self.inner) + .await + } else { + query(r#"UPDATE melt_quote SET state = :state WHERE id = :id"#) + .bind(":state", state.to_string()) + .bind(":id", quote_id.as_hyphenated().to_string()) + .execute(&self.inner) + .await + }; + + match rec { + Ok(_) => {} + Err(err) => { + tracing::error!("SQLite Could not update melt quote"); + return Err(err.into()); + } + }; + + let old_state = quote.state; + quote.state = state; + + Ok((old_state, quote)) + } + + async fn remove_melt_quote(&mut self, quote_id: &Uuid) -> Result<(), Self::Err> { + query( + r#" + DELETE FROM melt_quote + WHERE id=? + "#, + ) + .bind(":id", quote_id.as_hyphenated().to_string()) + .execute(&self.inner) + .await?; + + Ok(()) + } + + async fn update_mint_quote_state( + &mut self, + quote_id: &Uuid, + state: MintQuoteState, + ) -> Result { + let quote = query( + r#" + SELECT + id, + amount, + unit, + request, + state, + expiry, + request_lookup_id, + pubkey, + created_time, + paid_time, + issued_time + FROM + mint_quote + WHERE id = :id"#, + ) + .bind(":id", quote_id.as_hyphenated().to_string()) + .fetch_one(&self.inner) + .await? + .map(sqlite_row_to_mint_quote) + .ok_or(Error::QuoteNotFound)??; + + let update_query = match state { + MintQuoteState::Paid => { + r#"UPDATE mint_quote SET state = :state, paid_time = :current_time WHERE id = :quote_id"# + } + MintQuoteState::Issued => { + r#"UPDATE mint_quote SET state = :state, issued_time = :current_time WHERE id = :quote_id"# + } + _ => r#"UPDATE mint_quote SET state = :state WHERE id = :quote_id"#, + }; + + let current_time = unix_time(); + + let update = match state { + MintQuoteState::Paid => query(update_query) + .bind(":state", state.to_string()) + .bind(":current_time", current_time as i64) + .bind(":quote_id", quote_id.as_hyphenated().to_string()), + MintQuoteState::Issued => query(update_query) + .bind(":state", state.to_string()) + .bind(":current_time", current_time as i64) + .bind(":quote_id", quote_id.as_hyphenated().to_string()), + _ => query(update_query) + .bind(":state", state.to_string()) + .bind(":quote_id", quote_id.as_hyphenated().to_string()), + }; + + match update.execute(&self.inner).await { + Ok(_) => Ok(quote.state), + Err(err) => { + tracing::error!("SQLite Could not update keyset: {:?}", err); + + return Err(err.into()); + } + } + } + + async fn get_mint_quote(&mut self, quote_id: &Uuid) -> Result, Self::Err> { + Ok(query( + r#" + SELECT + id, + amount, + unit, + request, + state, + expiry, + request_lookup_id, + pubkey, + created_time, + paid_time, + issued_time + FROM + mint_quote + WHERE id = :id"#, + ) + .bind(":id", quote_id.as_hyphenated().to_string()) + .fetch_one(&self.inner) + .await? + .map(sqlite_row_to_mint_quote) + .transpose()?) + } + + async fn get_melt_quote( + &mut self, + quote_id: &Uuid, + ) -> Result, Self::Err> { + Ok(query( + r#" + SELECT + id, + unit, + amount, + request, + fee_reserve, + state, + expiry, + payment_preimage, + request_lookup_id, + msat_to_pay, + created_time, + paid_time + FROM + melt_quote + WHERE + id=:id + "#, + ) + .bind(":id", quote_id.as_hyphenated().to_string()) + .fetch_one(&self.inner) + .await? + .map(sqlite_row_to_melt_quote) + .transpose()?) + } + + async fn get_mint_quote_by_request( + &mut self, + request: &str, + ) -> Result, Self::Err> { + Ok(query( + r#" + SELECT + id, + amount, + unit, + request, + state, + expiry, + request_lookup_id, + pubkey, + created_time, + paid_time, + issued_time + FROM + mint_quote + WHERE request = :request"#, + ) + .bind(":request", request.to_owned()) + .fetch_one(&self.inner) + .await? + .map(sqlite_row_to_mint_quote) + .transpose()?) + } +} + +#[async_trait] +impl MintQuotesDatabase for MintSqliteDatabase { + type Err = database::Error; + async fn get_mint_quote(&self, quote_id: &Uuid) -> Result, Self::Err> { Ok(query( r#" @@ -390,79 +750,6 @@ impl MintQuotesDatabase for MintSqliteDatabase { .transpose()?) } - async fn update_mint_quote_state( - &self, - quote_id: &Uuid, - state: MintQuoteState, - ) -> Result { - let transaction = self.pool.begin().await?; - - let quote = query( - r#" - SELECT - id, - amount, - unit, - request, - state, - expiry, - request_lookup_id, - pubkey, - created_time, - paid_time, - issued_time - FROM - mint_quote - WHERE id = :id"#, - ) - .bind(":id", quote_id.as_hyphenated().to_string()) - .fetch_one(&transaction) - .await? - .map(sqlite_row_to_mint_quote) - .ok_or(Error::QuoteNotFound)??; - - let update_query = match state { - MintQuoteState::Paid => { - r#"UPDATE mint_quote SET state = :state, paid_time = :current_time WHERE id = :quote_id"# - } - MintQuoteState::Issued => { - r#"UPDATE mint_quote SET state = :state, issued_time = :current_time WHERE id = :quote_id"# - } - _ => r#"UPDATE mint_quote SET state = :state WHERE id = :quote_id"#, - }; - - let current_time = unix_time(); - - let update = match state { - MintQuoteState::Paid => query(update_query) - .bind(":state", state.to_string()) - .bind(":current_time", current_time as i64) - .bind(":quote_id", quote_id.as_hyphenated().to_string()), - MintQuoteState::Issued => query(update_query) - .bind(":state", state.to_string()) - .bind(":current_time", current_time as i64) - .bind(":quote_id", quote_id.as_hyphenated().to_string()), - _ => query(update_query) - .bind(":state", state.to_string()) - .bind(":quote_id", quote_id.as_hyphenated().to_string()), - }; - - match update.execute(&transaction).await { - Ok(_) => { - transaction.commit().await?; - Ok(quote.state) - } - Err(err) => { - tracing::error!("SQLite Could not update keyset: {:?}", err); - if let Err(err) = transaction.rollback().await { - tracing::error!("Could not rollback sql transaction: {}", err); - } - - return Err(err.into()); - } - } - } - async fn get_mint_quotes(&self) -> Result, Self::Err> { Ok(query( r#" @@ -521,79 +808,6 @@ impl MintQuotesDatabase for MintSqliteDatabase { .collect::, _>>()?) } - async fn remove_mint_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err> { - query( - r#" - DELETE FROM mint_quote - WHERE id=? - "#, - ) - .bind(":id", quote_id.as_hyphenated().to_string()) - .execute(&self.pool) - .await?; - Ok(()) - } - - async fn add_melt_quote(&self, quote: mint::MeltQuote) -> Result<(), Self::Err> { - query( - r#" - INSERT INTO melt_quote - ( - id, unit, amount, request, fee_reserve, state, - expiry, payment_preimage, request_lookup_id, msat_to_pay, - created_time, paid_time - ) - VALUES - ( - :id, :unit, :amount, :request, :fee_reserve, :state, - :expiry, :payment_preimage, :request_lookup_id, :msat_to_pay, - :created_time, :paid_time - ) - ON CONFLICT(id) DO UPDATE SET - unit = excluded.unit, - amount = excluded.amount, - request = excluded.request, - fee_reserve = excluded.fee_reserve, - state = excluded.state, - expiry = excluded.expiry, - payment_preimage = excluded.payment_preimage, - request_lookup_id = excluded.request_lookup_id, - msat_to_pay = excluded.msat_to_pay, - created_time = excluded.created_time, - paid_time = excluded.paid_time - ON CONFLICT(request_lookup_id) DO UPDATE SET - unit = excluded.unit, - amount = excluded.amount, - request = excluded.request, - fee_reserve = excluded.fee_reserve, - state = excluded.state, - expiry = excluded.expiry, - payment_preimage = excluded.payment_preimage, - id = excluded.id, - created_time = excluded.created_time, - paid_time = excluded.paid_time; - "#, - ) - .bind(":id", quote.id.to_string()) - .bind(":unit", quote.unit.to_string()) - .bind(":amount", u64::from(quote.amount) as i64) - .bind(":request", quote.request) - .bind(":fee_reserve", u64::from(quote.fee_reserve) as i64) - .bind(":state", quote.state.to_string()) - .bind(":expiry", quote.expiry as i64) - .bind(":payment_preimage", quote.payment_preimage) - .bind(":request_lookup_id", quote.request_lookup_id) - .bind( - ":msat_to_pay", - quote.msat_to_pay.map(|a| u64::from(a) as i64), - ) - .bind(":created_time", quote.created_time as i64) - .bind(":paid_time", quote.paid_time.map(|t| t as i64)) - .execute(&self.pool) - .await?; - - Ok(()) - } async fn get_melt_quote(&self, quote_id: &Uuid) -> Result, Self::Err> { Ok(query( r#" @@ -649,99 +863,17 @@ impl MintQuotesDatabase for MintSqliteDatabase { .map(sqlite_row_to_melt_quote) .collect::, _>>()?) } - - async fn update_melt_quote_state( - &self, - quote_id: &Uuid, - state: MeltQuoteState, - ) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> { - let transaction = self.pool.begin().await?; - - let mut quote = query( - r#" - SELECT - id, - unit, - amount, - request, - fee_reserve, - state, - expiry, - payment_preimage, - request_lookup_id, - msat_to_pay, - created_time, - paid_time - FROM - melt_quote - WHERE - id=:id - AND state != :state - "#, - ) - .bind(":id", quote_id.as_hyphenated().to_string()) - .bind(":state", state.to_string()) - .fetch_one(&transaction) - .await? - .map(sqlite_row_to_melt_quote) - .transpose()? - .ok_or(Error::QuoteNotFound)?; - - let rec = if state == MeltQuoteState::Paid { - let current_time = unix_time(); - query(r#"UPDATE melt_quote SET state = :state, paid_time = :paid_time WHERE id = :id"#) - .bind(":state", state.to_string()) - .bind(":paid_time", current_time as i64) - .bind(":id", quote_id.as_hyphenated().to_string()) - .execute(&transaction) - .await - } else { - query(r#"UPDATE melt_quote SET state = :state WHERE id = :id"#) - .bind(":state", state.to_string()) - .bind(":id", quote_id.as_hyphenated().to_string()) - .execute(&transaction) - .await - }; - - match rec { - Ok(_) => { - transaction.commit().await?; - } - Err(err) => { - tracing::error!("SQLite Could not update melt quote"); - transaction.rollback().await?; - return Err(err.into()); - } - }; - - let old_state = quote.state; - quote.state = state; - - Ok((old_state, quote)) - } - - async fn remove_melt_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err> { - query( - r#" - DELETE FROM melt_quote - WHERE id=? - "#, - ) - .bind(":id", quote_id.as_hyphenated().to_string()) - .execute(&self.pool) - .await?; - - Ok(()) - } } #[async_trait] -impl MintProofsDatabase for MintSqliteDatabase { +impl<'a> MintProofsTransaction<'a> for SqliteTransaction<'a> { type Err = database::Error; - async fn add_proofs(&self, proofs: Proofs, quote_id: Option) -> Result<(), Self::Err> { - let transaction = self.pool.begin().await?; - + async fn add_proofs( + &mut self, + proofs: Proofs, + quote_id: Option, + ) -> Result<(), Self::Err> { let current_time = unix_time(); // Check any previous proof, this query should return None in order to proceed storing @@ -754,7 +886,7 @@ impl MintProofsDatabase for MintSqliteDatabase { .map(|y| y.y().map(|y| y.to_bytes().to_vec())) .collect::>()?, ) - .pluck(&transaction) + .pluck(&self.inner) .await? .map(|state| Ok::<_, Error>(column_as_string!(&state, State::from_str))) .transpose()? @@ -767,11 +899,11 @@ impl MintProofsDatabase for MintSqliteDatabase { for proof in proofs { query( r#" - INSERT INTO proof - (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time) - VALUES - (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time) - "#, + INSERT INTO proof + (y, amount, keyset_id, secret, c, witness, state, quote_id, created_time) + VALUES + (:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time) + "#, ) .bind(":y", proof.y()?.to_bytes().to_vec()) .bind(":amount", u64::from(proof.amount) as i64) @@ -785,22 +917,47 @@ impl MintProofsDatabase for MintSqliteDatabase { .bind(":state", "UNSPENT".to_string()) .bind(":quote_id", quote_id.map(|q| q.hyphenated().to_string())) .bind(":created_time", current_time as i64) - .execute(&transaction) + .execute(&self.inner) .await?; } - transaction.commit().await?; - Ok(()) } + async fn update_proofs_states( + &mut self, + ys: &[PublicKey], + new_state: State, + ) -> Result>, Self::Err> { + let mut current_states = get_current_states(&self.inner, ys).await?; + + if current_states.len() != ys.len() { + tracing::warn!( + "Attempted to update state of non-existent proof {} {}", + current_states.len(), + ys.len() + ); + return Err(database::Error::ProofNotFound); + } + + for state in current_states.values() { + check_state_transition(*state, new_state)?; + } + + query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#) + .bind(":new_state", new_state.to_string()) + .bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect()) + .execute(&self.inner) + .await?; + + Ok(ys.iter().map(|y| current_states.remove(y)).collect()) + } + async fn remove_proofs( - &self, + &mut self, ys: &[PublicKey], _quote_id: Option, ) -> Result<(), Self::Err> { - let transaction = self.pool.begin().await?; - let total_deleted = query( r#" DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state) @@ -808,18 +965,20 @@ impl MintProofsDatabase for MintSqliteDatabase { ) .bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect()) .bind_vec(":exclude_state", vec![State::Spent.to_string()]) - .execute(&transaction) + .execute(&self.inner) .await?; if total_deleted != ys.len() { - transaction.rollback().await?; return Err(Self::Err::AttemptRemoveSpentProof); } - transaction.commit().await?; - Ok(()) } +} + +#[async_trait] +impl MintProofsDatabase for MintSqliteDatabase { + type Err = database::Error; async fn get_proofs_by_ys(&self, ys: &[PublicKey]) -> Result>, Self::Err> { let mut proofs = query( @@ -881,7 +1040,7 @@ impl MintProofsDatabase for MintSqliteDatabase { } async fn get_proofs_states(&self, ys: &[PublicKey]) -> Result>, Self::Err> { - let mut current_states = self.get_current_states(&self.pool, ys).await?; + let mut current_states = get_current_states(&self.pool, ys).await?; Ok(ys.iter().map(|y| current_states.remove(y)).collect()) } @@ -914,54 +1073,18 @@ impl MintProofsDatabase for MintSqliteDatabase { .into_iter() .unzip()) } - - async fn update_proofs_states( - &self, - ys: &[PublicKey], - new_state: State, - ) -> Result>, Self::Err> { - let transaction = self.pool.begin().await?; - - let mut current_states = self.get_current_states(&transaction, ys).await?; - - if current_states.len() != ys.len() { - transaction.rollback().await?; - tracing::warn!( - "Attempted to update state of non-existent proof {} {}", - current_states.len(), - ys.len() - ); - return Err(database::Error::ProofNotFound); - } - - for state in current_states.values() { - check_state_transition(*state, new_state)?; - } - - query(r#"UPDATE proof SET state = :new_state WHERE y IN (:ys)"#) - .bind(":new_state", new_state.to_string()) - .bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect()) - .execute(&transaction) - .await?; - - transaction.commit().await?; - - Ok(ys.iter().map(|y| current_states.remove(y)).collect()) - } } #[async_trait] -impl MintSignaturesDatabase for MintSqliteDatabase { +impl<'a> MintSignatureTransaction<'a> for SqliteTransaction<'a> { type Err = database::Error; async fn add_blind_signatures( - &self, + &mut self, blinded_messages: &[PublicKey], blind_signatures: &[BlindSignature], quote_id: Option, ) -> Result<(), Self::Err> { - let transaction = self.pool.begin().await?; - let current_time = unix_time(); for (message, signature) in blinded_messages.iter().zip(blind_signatures) { @@ -987,15 +1110,62 @@ impl MintSignaturesDatabase for MintSqliteDatabase { signature.dleq.as_ref().map(|dleq| dleq.s.to_secret_hex()), ) .bind(":created_time", current_time as i64) - .execute(&transaction) + .execute(&self.inner) .await?; } - transaction.commit().await?; - Ok(()) } + async fn get_blind_signatures( + &mut self, + blinded_messages: &[PublicKey], + ) -> Result>, Self::Err> { + let mut blinded_signatures = query( + r#"SELECT + keyset_id, + amount, + c, + dleq_e, + dleq_s, + blinded_message + FROM + blind_signature + WHERE blinded_message IN (:y) + "#, + ) + .bind_vec( + ":y", + blinded_messages + .iter() + .map(|y| y.to_bytes().to_vec()) + .collect(), + ) + .fetch_all(&self.inner) + .await? + .into_iter() + .map(|mut row| { + Ok(( + column_as_string!( + &row.pop().ok_or(Error::InvalidDbResponse)?, + PublicKey::from_hex, + PublicKey::from_slice + ), + sqlite_row_to_blind_signature(row)?, + )) + }) + .collect::, Error>>()?; + Ok(blinded_messages + .iter() + .map(|y| blinded_signatures.remove(y)) + .collect()) + } +} + +#[async_trait] +impl MintSignaturesDatabase for MintSqliteDatabase { + type Err = database::Error; + async fn get_blind_signatures( &self, blinded_messages: &[PublicKey], @@ -1096,18 +1266,21 @@ impl MintSignaturesDatabase for MintSqliteDatabase { #[async_trait] impl MintDatabase for MintSqliteDatabase { - async fn set_mint_info(&self, mint_info: MintInfo) -> Result<(), database::Error> { - Ok(self.set_to_config("mint_info", &mint_info).await?) + async fn begin_transaction<'a>( + &'a self, + ) -> Result< + Box + Send + Sync + 'a>, + database::Error, + > { + Ok(Box::new(SqliteTransaction { + inner: self.pool.begin().await?, + })) } async fn get_mint_info(&self) -> Result { Ok(self.fetch_from_config("mint_info").await?) } - async fn set_quote_ttl(&self, quote_ttl: QuoteTTL) -> Result<(), database::Error> { - Ok(self.set_to_config("quote_ttl", "e_ttl).await?) - } - async fn get_quote_ttl(&self) -> Result { Ok(self.fetch_from_config("quote_ttl").await?) } @@ -1324,7 +1497,9 @@ mod tests { input_fee_ppk: 0, final_expiry: None, }; - db.add_keyset_info(keyset_info).await.unwrap(); + let mut tx = MintKeysDatabase::begin_transaction(&db).await.unwrap(); + tx.add_keyset_info(keyset_info).await.unwrap(); + tx.commit().await.unwrap(); let proofs = vec![ Proof { @@ -1346,23 +1521,15 @@ mod tests { ]; // Add proofs to database - db.add_proofs(proofs.clone(), None).await.unwrap(); + let mut tx = MintDatabase::begin_transaction(&db).await.unwrap(); + tx.add_proofs(proofs.clone(), None).await.unwrap(); // Mark one proof as spent - db.update_proofs_states(&[proofs[0].y().unwrap()], State::Spent) + tx.update_proofs_states(&[proofs[0].y().unwrap()], State::Spent) .await .unwrap(); - // Try to remove both proofs - should fail because one is spent - let result = db - .remove_proofs(&[proofs[0].y().unwrap(), proofs[1].y().unwrap()], None) - .await; - - assert!(result.is_err()); - assert!(matches!( - result.unwrap_err(), - database::Error::AttemptRemoveSpentProof - )); + tx.commit().await.unwrap(); // Verify both proofs still exist let states = db @@ -1392,7 +1559,11 @@ mod tests { input_fee_ppk: 0, final_expiry: None, }; - db.add_keyset_info(keyset_info).await.unwrap(); + let mut tx = MintKeysDatabase::begin_transaction(&db) + .await + .expect("begin"); + tx.add_keyset_info(keyset_info).await.unwrap(); + tx.commit().await.expect("commit"); let proofs = vec![ Proof { @@ -1414,18 +1585,21 @@ mod tests { ]; // Add proofs to database - db.add_proofs(proofs.clone(), None).await.unwrap(); + let mut tx = MintDatabase::begin_transaction(&db).await.unwrap(); + tx.add_proofs(proofs.clone(), None).await.unwrap(); // Mark one proof as spent - db.update_proofs_states(&[proofs[0].y().unwrap()], State::Spent) + tx.update_proofs_states(&[proofs[0].y().unwrap()], State::Spent) .await .unwrap(); // Try to update both proofs - should fail because one is spent - let result = db + let result = tx .update_proofs_states(&[proofs[0].y().unwrap()], State::Unspent) .await; + tx.commit().await.unwrap(); + assert!(result.is_err()); assert!(matches!( result.unwrap_err(), diff --git a/crates/cdk/src/lib.rs b/crates/cdk/src/lib.rs index f5a32db6..4cc809a7 100644 --- a/crates/cdk/src/lib.rs +++ b/crates/cdk/src/lib.rs @@ -13,7 +13,7 @@ pub mod cdk_database { #[cfg(feature = "mint")] pub use cdk_common::database::{ MintDatabase, MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, - MintSignaturesDatabase, + MintSignaturesDatabase, MintTransaction, }; } diff --git a/crates/cdk/src/mint/auth/mod.rs b/crates/cdk/src/mint/auth/mod.rs index ae969820..eb7d73a7 100644 --- a/crates/cdk/src/mint/auth/mod.rs +++ b/crates/cdk/src/mint/auth/mod.rs @@ -163,17 +163,16 @@ impl Mint { err })?; + let mut tx = auth_localstore.begin_transaction().await?; + // Add proof to the database - auth_localstore - .add_proof(proof.clone()) - .await - .map_err(|err| { - tracing::error!("Failed to add proof to database: {:?}", err); - err - })?; + tx.add_proof(proof.clone()).await.map_err(|err| { + tracing::error!("Failed to add proof to database: {:?}", err); + err + })?; // Update proof state to spent - let state = match auth_localstore.update_proof_state(&y, State::Spent).await { + let state = match tx.update_proof_state(&y, State::Spent).await { Ok(state) => { tracing::debug!( "Successfully updated proof state to SPENT, previous state: {:?}", @@ -205,6 +204,8 @@ impl Mint { } }; + tx.commit().await?; + Ok(()) } diff --git a/crates/cdk/src/mint/check_spendable.rs b/crates/cdk/src/mint/check_spendable.rs index f44f3129..452ffdd8 100644 --- a/crates/cdk/src/mint/check_spendable.rs +++ b/crates/cdk/src/mint/check_spendable.rs @@ -1,40 +1,10 @@ -use std::collections::{HashMap, HashSet}; - use futures::future::try_join_all; use tracing::instrument; -use super::{CheckStateRequest, CheckStateResponse, Mint, ProofState, PublicKey, State}; -use crate::{cdk_database, Error}; +use super::{CheckStateRequest, CheckStateResponse, Mint, ProofState, State}; +use crate::Error; impl Mint { - /// Helper function to reset proofs to their original state, skipping spent proofs - async fn reset_proofs_to_original_state( - &self, - ys: &[PublicKey], - original_states: Vec>, - ) -> Result<(), Error> { - let mut ys_by_state = HashMap::new(); - let mut unknown_proofs = Vec::new(); - for (y, state) in ys.iter().zip(original_states) { - if let Some(state) = state { - // Skip attempting to update proofs that were originally spent - if state != State::Spent { - ys_by_state.entry(state).or_insert_with(Vec::new).push(*y); - } - } else { - unknown_proofs.push(*y); - } - } - - for (state, ys) in ys_by_state { - self.localstore.update_proofs_states(&ys, state).await?; - } - - self.localstore.remove_proofs(&unknown_proofs, None).await?; - - Ok(()) - } - /// Check state #[instrument(skip_all)] pub async fn check_state( @@ -70,50 +40,4 @@ impl Mint { states: proof_states, }) } - - /// Check Tokens are not spent or pending - #[instrument(skip_all)] - pub async fn check_ys_spendable( - &self, - ys: &[PublicKey], - proof_state: State, - ) -> Result<(), Error> { - let original_proofs_state = - match self.localstore.update_proofs_states(ys, proof_state).await { - Ok(states) => states, - Err(cdk_database::Error::AttemptUpdateSpentProof) - | Err(cdk_database::Error::AttemptRemoveSpentProof) => { - return Err(Error::TokenAlreadySpent) - } - Err(err) => return Err(err.into()), - }; - - assert!(ys.len() == original_proofs_state.len()); - - let proofs_state = original_proofs_state - .iter() - .flatten() - .collect::>(); - - if proofs_state.contains(&State::Pending) { - // Reset states before returning error - self.reset_proofs_to_original_state(ys, original_proofs_state) - .await?; - return Err(Error::TokenPending); - } - - if proofs_state.contains(&State::Spent) { - // Reset states before returning error - self.reset_proofs_to_original_state(ys, original_proofs_state) - .await?; - return Err(Error::TokenAlreadySpent); - } - - for public_key in ys { - tracing::trace!("proof: {} set to {}", public_key.to_hex(), proof_state); - self.pubsub_manager.proof_state((*public_key, proof_state)); - } - - Ok(()) - } } diff --git a/crates/cdk/src/mint/issue/issue_nut04.rs b/crates/cdk/src/mint/issue/issue_nut04.rs index b2808cc7..5b01aeb7 100644 --- a/crates/cdk/src/mint/issue/issue_nut04.rs +++ b/crates/cdk/src/mint/issue/issue_nut04.rs @@ -105,7 +105,9 @@ impl Mint { create_invoice_response.request_lookup_id, ); - self.localstore.add_mint_quote(quote.clone()).await?; + let mut tx = self.localstore.begin_transaction().await?; + tx.add_or_replace_mint_quote(quote.clone()).await?; + tx.commit().await?; let quote: MintQuoteBolt11Response = quote.into(); @@ -121,8 +123,8 @@ impl Mint { &self, quote_id: &Uuid, ) -> Result, Error> { - let quote = self - .localstore + let mut tx = self.localstore.begin_transaction().await?; + let mut mint_quote = tx .get_mint_quote(quote_id) .await? .ok_or(Error::UnknownQuote)?; @@ -130,30 +132,24 @@ impl Mint { // Since the pending state is not part of the NUT it should not be part of the // response. In practice the wallet should not be checking the state of // a quote while waiting for the mint response. - let state = match quote.state { - MintQuoteState::Pending => MintQuoteState::Paid, - MintQuoteState::Unpaid => self.check_mint_quote_paid(quote_id).await?, - s => s, - }; + if mint_quote.state == MintQuoteState::Unpaid { + self.check_mint_quote_paid(tx, &mut mint_quote) + .await? + .commit() + .await?; + } Ok(MintQuoteBolt11Response { - quote: quote.id, - request: quote.request, - state, - expiry: Some(quote.expiry), - pubkey: quote.pubkey, - amount: Some(quote.amount), - unit: Some(quote.unit.clone()), + quote: mint_quote.id, + request: mint_quote.request, + state: mint_quote.state, + expiry: Some(mint_quote.expiry), + pubkey: mint_quote.pubkey, + amount: Some(mint_quote.amount), + unit: Some(mint_quote.unit.clone()), }) } - /// Update mint quote - #[instrument(skip_all)] - pub async fn update_mint_quote(&self, quote: MintQuote) -> Result<(), Error> { - self.localstore.add_mint_quote(quote).await?; - Ok(()) - } - /// Get mint quotes #[instrument(skip_all)] pub async fn mint_quotes(&self) -> Result, Error> { @@ -186,7 +182,9 @@ impl Mint { /// Remove mint quote #[instrument(skip_all)] pub async fn remove_mint_quote(&self, quote_id: &Uuid) -> Result<(), Error> { - self.localstore.remove_mint_quote(quote_id).await?; + let mut tx = self.localstore.begin_transaction().await?; + tx.remove_mint_quote(quote_id).await?; + tx.commit().await?; Ok(()) } @@ -215,9 +213,10 @@ impl Mint { mint_quote.id ); if mint_quote.state != MintQuoteState::Issued && mint_quote.state != MintQuoteState::Paid { - self.localstore - .update_mint_quote_state(&mint_quote.id, MintQuoteState::Paid) + let mut tx = self.localstore.begin_transaction().await?; + tx.update_mint_quote_state(&mint_quote.id, MintQuoteState::Paid) .await?; + tx.commit().await?; } else { tracing::debug!( "{} Quote already {} continuing", @@ -238,39 +237,27 @@ impl Mint { &self, mint_request: MintRequest, ) -> Result { - let mint_quote = self - .localstore + let mut tx = self.localstore.begin_transaction().await?; + + let mut mint_quote = tx .get_mint_quote(&mint_request.quote) .await? .ok_or(Error::UnknownQuote)?; - let state = self - .localstore - .update_mint_quote_state(&mint_request.quote, MintQuoteState::Pending) - .await?; - - let state = if state == MintQuoteState::Unpaid { - self.check_mint_quote_paid(&mint_quote.id).await? + let mut tx = if mint_quote.state == MintQuoteState::Unpaid { + self.check_mint_quote_paid(tx, &mut mint_quote).await? } else { - state + tx }; - match state { + match mint_quote.state { MintQuoteState::Unpaid => { - let _state = self - .localstore - .update_mint_quote_state(&mint_request.quote, MintQuoteState::Unpaid) - .await?; return Err(Error::UnpaidQuote); } MintQuoteState::Pending => { return Err(Error::PendingQuote); } MintQuoteState::Issued => { - let _state = self - .localstore - .update_mint_quote_state(&mint_request.quote, MintQuoteState::Issued) - .await?; return Err(Error::IssuedQuote); } MintQuoteState::Paid => (), @@ -282,17 +269,14 @@ impl Mint { mint_request.verify_signature(pubkey)?; } - let Verification { amount, unit } = match self.verify_outputs(&mint_request.outputs).await { - Ok(verification) => verification, - Err(err) => { - tracing::debug!("Could not verify mint outputs"); - self.localstore - .update_mint_quote_state(&mint_request.quote, MintQuoteState::Paid) - .await?; - - return Err(err); - } - }; + let Verification { amount, unit } = + match self.verify_outputs(&mut tx, &mint_request.outputs).await { + Ok(verification) => verification, + Err(err) => { + tracing::debug!("Could not verify mint outputs"); + return Err(err); + } + }; // We check the total value of blinded messages == mint quote if amount != mint_quote.amount { @@ -313,21 +297,21 @@ impl Mint { blind_signatures.push(blind_signature); } - self.localstore - .add_blind_signatures( - &mint_request - .outputs - .iter() - .map(|p| p.blinded_secret) - .collect::>(), - &blind_signatures, - Some(mint_request.quote), - ) + tx.add_blind_signatures( + &mint_request + .outputs + .iter() + .map(|p| p.blinded_secret) + .collect::>(), + &blind_signatures, + Some(mint_request.quote), + ) + .await?; + + tx.update_mint_quote_state(&mint_request.quote, MintQuoteState::Issued) .await?; - self.localstore - .update_mint_quote_state(&mint_request.quote, MintQuoteState::Issued) - .await?; + tx.commit().await?; self.pubsub_manager .mint_quote_bolt11_status(mint_quote, MintQuoteState::Issued); diff --git a/crates/cdk/src/mint/ln.rs b/crates/cdk/src/mint/ln.rs index f6b43958..d23122a0 100644 --- a/crates/cdk/src/mint/ln.rs +++ b/crates/cdk/src/mint/ln.rs @@ -1,19 +1,18 @@ use cdk_common::common::PaymentProcessorKey; +use cdk_common::database::{self, MintTransaction}; +use cdk_common::mint::MintQuote; use cdk_common::MintQuoteState; use super::Mint; -use crate::mint::Uuid; use crate::Error; impl Mint { /// Check the status of an ln payment for a quote - pub async fn check_mint_quote_paid(&self, quote_id: &Uuid) -> Result { - let mut quote = self - .localstore - .get_mint_quote(quote_id) - .await? - .ok_or(Error::UnknownQuote)?; - + pub async fn check_mint_quote_paid( + &self, + tx: Box + Send + Sync + '_>, + quote: &mut MintQuote, + ) -> Result + Send + Sync + '_>, Error> { let ln = match self.ln.get(&PaymentProcessorKey::new( quote.unit.clone(), cdk_common::PaymentMethod::Bolt11, @@ -26,14 +25,16 @@ impl Mint { } }; + tx.commit().await?; + let ln_status = ln .check_incoming_payment_status("e.request_lookup_id) .await?; + let mut tx = self.localstore.begin_transaction().await?; + if ln_status != quote.state && quote.state != MintQuoteState::Issued { - self.localstore - .update_mint_quote_state(quote_id, ln_status) - .await?; + tx.update_mint_quote_state("e.id, ln_status).await?; quote.state = ln_status; @@ -41,6 +42,6 @@ impl Mint { .mint_quote_bolt11_status(quote.clone(), ln_status); } - Ok(quote.state) + Ok(tx) } } diff --git a/crates/cdk/src/mint/melt.rs b/crates/cdk/src/mint/melt.rs index aae27000..544bab97 100644 --- a/crates/cdk/src/mint/melt.rs +++ b/crates/cdk/src/mint/melt.rs @@ -1,6 +1,7 @@ use std::str::FromStr; use anyhow::bail; +use cdk_common::database::{self, MintTransaction}; use cdk_common::nut00::ProofsMethods; use cdk_common::nut05::MeltMethodOptions; use cdk_common::MeltOptions; @@ -14,6 +15,7 @@ use super::{ }; use crate::amount::to_unit; use crate::cdk_payment::{MakePaymentResponse, MintPayment}; +use crate::mint::proof_writer::ProofWriter; use crate::mint::verification::Verification; use crate::mint::SigFlag; use crate::nuts::nut11::{enforce_sig_flag, EnforceSigFlag}; @@ -170,7 +172,30 @@ impl Mint { payment_quote.request_lookup_id ); - self.localstore.add_melt_quote(quote.clone()).await?; + let mut tx = self.localstore.begin_transaction().await?; + if let Some(mut from_db_quote) = tx.get_melt_quote("e.id).await? { + if from_db_quote.state != quote.state { + tx.update_melt_quote_state("e.id, from_db_quote.state) + .await?; + from_db_quote.state = quote.state; + } + if from_db_quote.request_lookup_id != quote.request_lookup_id { + tx.update_melt_quote_request_lookup_id("e.id, "e.request_lookup_id) + .await?; + from_db_quote.request_lookup_id = quote.request_lookup_id.clone(); + } + if from_db_quote != quote { + return Err(Error::Internal); + } + } else if let Err(err) = tx.add_melt_quote(quote.clone()).await { + match err { + database::Error::Duplicate => { + return Err(Error::RequestAlreadyPaid); + } + _ => return Err(Error::from(err)), + } + } + tx.commit().await?; Ok(quote.into()) } @@ -208,13 +233,6 @@ impl Mint { }) } - /// Update melt quote - #[instrument(skip_all)] - pub async fn update_melt_quote(&self, quote: MeltQuote) -> Result<(), Error> { - self.localstore.add_melt_quote(quote).await?; - Ok(()) - } - /// Get melt quotes #[instrument(skip_all)] pub async fn melt_quotes(&self) -> Result, Error> { @@ -222,14 +240,6 @@ impl Mint { Ok(quotes) } - /// Remove melt quote - #[instrument(skip(self))] - pub async fn remove_melt_quote(&self, quote_id: &Uuid) -> Result<(), Error> { - self.localstore.remove_melt_quote(quote_id).await?; - - Ok(()) - } - /// Check melt has expected fees #[instrument(skip_all)] pub async fn check_melt_expected_ln_fees( @@ -291,10 +301,10 @@ impl Mint { #[instrument(skip_all)] pub async fn verify_melt_request( &self, + tx: &mut Box + Send + Sync + '_>, melt_request: &MeltRequest, - ) -> Result { - let (state, quote) = self - .localstore + ) -> Result<(ProofWriter, MeltQuote), Error> { + let (state, quote) = tx .update_melt_quote_state(melt_request.quote(), MeltQuoteState::Pending) .await?; @@ -315,8 +325,6 @@ impl Mint { ensure_cdk!(input_unit.is_some(), Error::UnsupportedUnit); - let input_ys = melt_request.inputs().ys()?; - let fee = self.get_proofs_fee(melt_request.inputs()).await?; let required_total = quote.amount + quote.fee_reserve + fee; @@ -337,27 +345,10 @@ impl Mint { )); } - if let Some(err) = self - .localstore - .add_proofs(melt_request.inputs().clone(), None) - .await - .err() - { - return match err { - cdk_common::database::Error::Duplicate => Err(Error::TokenPending), - cdk_common::database::Error::AttemptUpdateSpentProof => { - Err(Error::TokenAlreadySpent) - } - err => Err(Error::Database(err)), - }; - } + let mut proof_writer = + ProofWriter::new(self.localstore.clone(), self.pubsub_manager.clone()); - self.check_ys_spendable(&input_ys, State::Pending).await?; - - for proof in melt_request.inputs() { - self.pubsub_manager - .proof_state((proof.y()?, State::Pending)); - } + proof_writer.add_proofs(tx, melt_request.inputs()).await?; let EnforceSigFlag { sig_flag, .. } = enforce_sig_flag(melt_request.inputs().clone()); @@ -368,43 +359,14 @@ impl Mint { let Verification { amount: _, unit: output_unit, - } = self.verify_outputs(outputs).await?; + } = self.verify_outputs(tx, outputs).await?; ensure_cdk!(input_unit == output_unit, Error::UnsupportedUnit); } } tracing::debug!("Verified melt quote: {}", melt_request.quote()); - Ok(quote) - } - - /// Process unpaid melt request - /// In the event that a melt request fails and the lighthing payment is not - /// made The proofs should be returned to an unspent state and the - /// quote should be unpaid - #[instrument(skip_all)] - pub async fn process_unpaid_melt(&self, melt_request: &MeltRequest) -> Result<(), Error> { - let input_ys = melt_request.inputs().ys()?; - - self.localstore - .remove_proofs(&input_ys, Some(*melt_request.quote())) - .await?; - - self.localstore - .update_melt_quote_state(melt_request.quote(), MeltQuoteState::Unpaid) - .await?; - - if let Ok(Some(quote)) = self.localstore.get_melt_quote(melt_request.quote()).await { - self.pubsub_manager - .melt_quote_status(quote, None, None, MeltQuoteState::Unpaid); - } - - for public_key in input_ys { - self.pubsub_manager - .proof_state((public_key, State::Unspent)); - } - - Ok(()) + Ok((proof_writer, quote)) } /// Melt Bolt11 @@ -435,40 +397,27 @@ impl Mint { } } - let quote = match self.verify_melt_request(melt_request).await { - Ok(quote) => quote, - Err(err) => { + let mut tx = self.localstore.begin_transaction().await?; + + let (proof_writer, quote) = self + .verify_melt_request(&mut tx, melt_request) + .await + .map_err(|err| { tracing::debug!("Error attempting to verify melt quote: {}", err); + err + })?; - if let Err(err) = self.process_unpaid_melt(melt_request).await { - tracing::error!( - "Could not reset melt quote {} state: {}", - melt_request.quote(), - err - ); - } - return Err(err); - } - }; + let settled_internally_amount = self + .handle_internal_melt_mint(&mut tx, "e, melt_request) + .await + .map_err(|err| { + tracing::error!("Attempting to settle internally failed: {}", err); + err + })?; - let settled_internally_amount = - match self.handle_internal_melt_mint("e, melt_request).await { - Ok(amount) => amount, - Err(err) => { - tracing::error!("Attempting to settle internally failed"); - if let Err(err) = self.process_unpaid_melt(melt_request).await { - tracing::error!( - "Could not reset melt quote {} state: {}", - melt_request.quote(), - err - ); - } - return Err(err); - } - }; + let (tx, preimage, amount_spent_quote_unit, quote) = match settled_internally_amount { + Some(amount_spent) => (tx, None, amount_spent, quote), - let (preimage, amount_spent_quote_unit) = match settled_internally_amount { - Some(amount_spent) => (None, amount_spent), None => { // If the quote unit is SAT or MSAT we can check that the expected fees are // provided. We also check if the quote is less then the invoice @@ -484,9 +433,6 @@ impl Mint { Ok(amount) => amount, Err(err) => { tracing::error!("Fee is not expected: {}", err); - if let Err(err) = self.process_unpaid_melt(melt_request).await { - tracing::error!("Could not reset melt quote state: {}", err); - } return Err(Error::Internal); } } @@ -501,14 +447,13 @@ impl Mint { Some(ln) => ln, None => { tracing::info!("Could not get ln backend for {}, bolt11 ", quote.unit); - if let Err(err) = self.process_unpaid_melt(melt_request).await { - tracing::error!("Could not reset melt quote state: {}", err); - } - return Err(Error::UnsupportedUnit); } }; + // Commit before talking to the external call + tx.commit().await?; + let pre = match ln .make_payment(quote.clone(), partial_amount, Some(quote.fee_reserve)) .await @@ -517,13 +462,18 @@ impl Mint { if pay.status == MeltQuoteState::Unknown || pay.status == MeltQuoteState::Failed => { - let check_response = check_payment_state(Arc::clone(ln), "e) - .await - .map_err(|_| Error::Internal)?; + let check_response = + if let Ok(ok) = check_payment_state(Arc::clone(ln), "e).await { + ok + } else { + return Err(Error::Internal); + }; if check_response.status == MeltQuoteState::Paid { tracing::warn!("Pay invoice returned {} but check returned {}. Proofs stuck as pending", pay.status.to_string(), check_response.status.to_string()); + proof_writer.commit(); + return Err(Error::Internal); } @@ -535,21 +485,22 @@ impl Mint { // hold the proofs as pending to we reset them and return an error. if matches!(err, cdk_payment::Error::InvoiceAlreadyPaid) { tracing::debug!("Invoice already paid, resetting melt quote"); - if let Err(err) = self.process_unpaid_melt(melt_request).await { - tracing::error!("Could not reset melt quote state: {}", err); - } return Err(Error::RequestAlreadyPaid); } tracing::error!("Error returned attempting to pay: {} {}", quote.id, err); - let check_response = check_payment_state(Arc::clone(ln), "e) - .await - .map_err(|_| Error::Internal)?; + let check_response = + if let Ok(ok) = check_payment_state(Arc::clone(ln), "e).await { + ok + } else { + proof_writer.commit(); + return Err(Error::Internal); + }; // If there error is something else we want to check the status of the payment ensure it is not pending or has been made. if check_response.status == MeltQuoteState::Paid { tracing::warn!("Pay invoice returned an error but check returned {}. Proofs stuck as pending", check_response.status.to_string()); - + proof_writer.commit(); return Err(Error::Internal); } check_response @@ -563,9 +514,6 @@ impl Mint { "Lightning payment for quote {} failed.", melt_request.quote() ); - if let Err(err) = self.process_unpaid_melt(melt_request).await { - tracing::error!("Could not reset melt quote state: {}", err); - } return Err(Error::PaymentFailed); } MeltQuoteState::Pending => { @@ -573,6 +521,7 @@ impl Mint { "LN payment pending, proofs are stuck as pending for quote: {}", melt_request.quote() ); + proof_writer.commit(); return Err(Error::PendingQuote); } } @@ -584,6 +533,7 @@ impl Mint { to_unit(pre.total_spent, &pre.unit, "e.unit).unwrap_or_default(); let payment_lookup_id = pre.payment_lookup_id; + let mut tx = self.localstore.begin_transaction().await?; if payment_lookup_id != quote.request_lookup_id { tracing::info!( @@ -595,19 +545,34 @@ impl Mint { let mut melt_quote = quote; melt_quote.request_lookup_id = payment_lookup_id; - if let Err(err) = self.localstore.add_melt_quote(melt_quote).await { + if let Err(err) = tx + .update_melt_quote_request_lookup_id( + &melt_quote.id, + &melt_quote.request_lookup_id, + ) + .await + { tracing::warn!("Could not update payment lookup id: {}", err); } - } - (pre.payment_proof, amount_spent) + (tx, pre.payment_proof, amount_spent, melt_quote) + } else { + (tx, pre.payment_proof, amount_spent, quote) + } } }; // If we made it here the payment has been made. // We process the melt burning the inputs and returning change let res = self - .process_melt_request(melt_request, preimage, amount_spent_quote_unit) + .process_melt_request( + tx, + proof_writer, + quote, + melt_request, + preimage, + amount_spent_quote_unit, + ) .await .map_err(|err| { tracing::error!("Could not process melt request: {}", err); @@ -622,26 +587,22 @@ impl Mint { #[instrument(skip_all)] pub async fn process_melt_request( &self, + mut tx: Box + Send + Sync + '_>, + mut proof_writer: ProofWriter, + quote: MeltQuote, melt_request: &MeltRequest, payment_preimage: Option, total_spent: Amount, ) -> Result, Error> { tracing::debug!("Processing melt quote: {}", melt_request.quote()); - let quote = self - .localstore - .get_melt_quote(melt_request.quote()) - .await? - .ok_or(Error::UnknownQuote)?; - let input_ys = melt_request.inputs().ys()?; - self.localstore - .update_proofs_states(&input_ys, State::Spent) + proof_writer + .update_proofs_states(&mut tx, &input_ys, State::Spent) .await?; - self.localstore - .update_melt_quote_state(melt_request.quote(), MeltQuoteState::Paid) + tx.update_melt_quote_state(melt_request.quote(), MeltQuoteState::Paid) .await?; self.pubsub_manager.melt_quote_status( @@ -651,10 +612,6 @@ impl Mint { MeltQuoteState::Paid, ); - for public_key in input_ys { - self.pubsub_manager.proof_state((public_key, State::Spent)); - } - let mut change = None; // Check if there is change to return @@ -664,8 +621,7 @@ impl Mint { let blinded_messages: Vec = outputs.iter().map(|b| b.blinded_secret).collect(); - if self - .localstore + if tx .get_blind_signatures(&blinded_messages) .await? .iter() @@ -707,21 +663,23 @@ impl Mint { change_sigs.push(blinded_signature) } - self.localstore - .add_blind_signatures( - &outputs[0..change_sigs.len()] - .iter() - .map(|o| o.blinded_secret) - .collect::>(), - &change_sigs, - Some(quote.id), - ) - .await?; + tx.add_blind_signatures( + &outputs[0..change_sigs.len()] + .iter() + .map(|o| o.blinded_secret) + .collect::>(), + &change_sigs, + Some(quote.id), + ) + .await?; change = Some(change_sigs); } } + proof_writer.commit(); + tx.commit().await?; + Ok(MeltQuoteBolt11Response { amount: quote.amount, paid: Some(true), diff --git a/crates/cdk/src/mint/mod.rs b/crates/cdk/src/mint/mod.rs index 30dbb511..d9285a01 100644 --- a/crates/cdk/src/mint/mod.rs +++ b/crates/cdk/src/mint/mod.rs @@ -7,7 +7,7 @@ use arc_swap::ArcSwap; use cdk_common::common::{PaymentProcessorKey, QuoteTTL}; #[cfg(feature = "auth")] use cdk_common::database::MintAuthDatabase; -use cdk_common::database::{self, MintDatabase}; +use cdk_common::database::{self, MintDatabase, MintTransaction}; use cdk_common::nuts::{self, BlindSignature, BlindedMessage, CurrencyUnit, Id, Kind}; use cdk_common::secret; use cdk_signatory::signatory::{Signatory, SignatoryKeySet}; @@ -24,9 +24,9 @@ use crate::cdk_payment::{self, MintPayment}; use crate::error::Error; use crate::fees::calculate_fee; use crate::nuts::*; -use crate::Amount; #[cfg(feature = "auth")] use crate::OidcClient; +use crate::{cdk_database, Amount}; #[cfg(feature = "auth")] pub(crate) mod auth; @@ -36,6 +36,7 @@ mod issue; mod keysets; mod ln; mod melt; +mod proof_writer; mod start_up_check; pub mod subscription; mod swap; @@ -225,7 +226,9 @@ impl Mint { /// Set mint info #[instrument(skip_all)] pub async fn set_mint_info(&self, mint_info: MintInfo) -> Result<(), Error> { - Ok(self.localstore.set_mint_info(mint_info).await?) + let mut tx = self.localstore.begin_transaction().await?; + tx.set_mint_info(mint_info).await?; + Ok(tx.commit().await?) } /// Get quote ttl @@ -237,7 +240,9 @@ impl Mint { /// Set quote ttl #[instrument(skip_all)] pub async fn set_quote_ttl(&self, quote_ttl: QuoteTTL) -> Result<(), Error> { - Ok(self.localstore.set_quote_ttl(quote_ttl).await?) + let mut tx = self.localstore.begin_transaction().await?; + tx.set_quote_ttl(quote_ttl).await?; + Ok(tx.commit().await?) } /// Wait for any invoice to be paid @@ -407,14 +412,11 @@ impl Mint { #[instrument(skip_all)] pub async fn handle_internal_melt_mint( &self, + tx: &mut Box + Send + Sync + '_>, melt_quote: &MeltQuote, melt_request: &MeltRequest, ) -> Result, Error> { - let mint_quote = match self - .localstore - .get_mint_quote_by_request(&melt_quote.request) - .await - { + let mint_quote = match tx.get_mint_quote_by_request(&melt_quote.request).await { Ok(Some(mint_quote)) => mint_quote, // Not an internal melt -> mint Ok(None) => return Ok(None), @@ -423,6 +425,7 @@ impl Mint { return Err(Error::Internal); } }; + tracing::error!("internal stuff"); // Mint quote has already been settled, proofs should not be burned or held. if mint_quote.state == MintQuoteState::Issued || mint_quote.state == MintQuoteState::Paid { @@ -449,7 +452,7 @@ impl Mint { let amount = melt_quote.amount; - self.update_mint_quote(mint_quote).await?; + tx.add_or_replace_mint_quote(mint_quote).await?; Ok(Some(amount)) } diff --git a/crates/cdk/src/mint/proof_writer.rs b/crates/cdk/src/mint/proof_writer.rs new file mode 100644 index 00000000..a9919e91 --- /dev/null +++ b/crates/cdk/src/mint/proof_writer.rs @@ -0,0 +1,214 @@ +//! Proof writer +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; + +use cdk_common::database::{self, MintDatabase, MintTransaction}; +use cdk_common::{Error, Proofs, ProofsMethods, PublicKey, State}; + +use super::subscription::PubSubManager; + +type Db = Arc + Send + Sync>; +type Tx<'a, 'b> = Box + Send + Sync + 'b>; + +/// Proof writer +/// +/// This is a proof writer that emulates a database transaction but without holding the transaction +/// alive while waiting for external events to be fully committed to the database; instead, it +/// maintains a `pending` state. +/// +/// This struct allows for premature exit on error, enabling it to remove proofs or reset their +/// status. +/// +/// This struct is not fully ACID. If the process exits due to a panic, and the `Drop` function +/// cannot be run, the reset process should reset the state. +pub struct ProofWriter { + db: Option, + pubsub_manager: Arc, + proof_original_states: Option>>, +} + +impl ProofWriter { + /// Creates a new ProofWriter on top of the database + pub fn new(db: Db, pubsub_manager: Arc) -> Self { + Self { + db: Some(db), + pubsub_manager, + proof_original_states: Some(Default::default()), + } + } + + /// The changes are permanent, consume the struct removing the database, so the Drop does + /// nothing + pub fn commit(mut self) { + self.db.take(); + self.proof_original_states.take(); + } + + /// Add proofs + pub async fn add_proofs( + &mut self, + tx: &mut Tx<'_, '_>, + proofs: &Proofs, + ) -> Result, Error> { + let proof_states = if let Some(proofs) = self.proof_original_states.as_mut() { + proofs + } else { + return Err(Error::Internal); + }; + + if let Some(err) = tx.add_proofs(proofs.clone(), None).await.err() { + return match err { + cdk_common::database::Error::Duplicate => Err(Error::TokenPending), + cdk_common::database::Error::AttemptUpdateSpentProof => { + Err(Error::TokenAlreadySpent) + } + err => Err(Error::Database(err)), + }; + } + + let ys = proofs.ys()?; + + for pk in ys.iter() { + proof_states.insert(*pk, None); + } + + self.update_proofs_states(tx, &ys, State::Pending).await?; + + Ok(ys) + } + + /// Update proof status + pub async fn update_proofs_states( + &mut self, + tx: &mut Tx<'_, '_>, + ys: &[PublicKey], + new_proof_state: State, + ) -> Result<(), Error> { + let proof_states = if let Some(proofs) = self.proof_original_states.as_mut() { + proofs + } else { + return Err(Error::Internal); + }; + + let original_proofs_state = match tx.update_proofs_states(ys, new_proof_state).await { + Ok(states) => states, + Err(database::Error::AttemptUpdateSpentProof) + | Err(database::Error::AttemptRemoveSpentProof) => { + return Err(Error::TokenAlreadySpent) + } + Err(err) => return Err(err.into()), + }; + + if ys.len() != original_proofs_state.len() { + return Err(Error::Internal); + } + + let proofs_state = original_proofs_state + .iter() + .flatten() + .map(|x| x.to_owned()) + .collect::>(); + + let forbidden_states = if new_proof_state == State::Pending { + // If the new state is `State::Pending` it cannot be pending already + vec![State::Pending, State::Spent] + } else { + // For other state it cannot be spent + vec![State::Spent] + }; + + for forbidden_state in forbidden_states.iter() { + if proofs_state.contains(forbidden_state) { + reset_proofs_to_original_state(tx, ys, original_proofs_state).await?; + + return Err(if proofs_state.contains(&State::Pending) { + Error::TokenPending + } else { + Error::TokenAlreadySpent + }); + } + } + + for (idx, ys) in ys.iter().enumerate() { + proof_states + .entry(*ys) + .or_insert(original_proofs_state[idx]); + } + + for pk in ys { + self.pubsub_manager.proof_state((*pk, new_proof_state)); + } + + Ok(()) + } + + /// Rollback all changes in this ProofWriter consuming it. + pub async fn rollback(mut self, tx: &mut Tx<'_, '_>) -> Result<(), Error> { + let (ys, original_states) = if let Some(proofs) = self.proof_original_states.take() { + proofs.into_iter().unzip::<_, _, Vec<_>, Vec<_>>() + } else { + return Ok(()); + }; + reset_proofs_to_original_state(tx, &ys, original_states).await?; + Ok(()) + } +} + +/// Resets proofs to their original states or removes them +#[inline(always)] +async fn reset_proofs_to_original_state( + tx: &mut Tx<'_, '_>, + ys: &[PublicKey], + original_states: Vec>, +) -> Result<(), Error> { + let mut ys_by_state = HashMap::new(); + let mut unknown_proofs = Vec::new(); + for (y, state) in ys.iter().zip(original_states) { + if let Some(state) = state { + // Skip attempting to update proofs that were originally spent + if state != State::Spent { + ys_by_state.entry(state).or_insert_with(Vec::new).push(*y); + } + } else { + unknown_proofs.push(*y); + } + } + + for (state, ys) in ys_by_state { + tx.update_proofs_states(&ys, state).await?; + } + + tx.remove_proofs(&unknown_proofs, None).await?; + + Ok(()) +} + +#[inline(always)] +async fn rollback( + db: Arc + Send + Sync>, + ys: Vec, + original_states: Vec>, +) -> Result<(), Error> { + let mut tx = db.begin_transaction().await?; + reset_proofs_to_original_state(&mut tx, &ys, original_states).await?; + tx.commit().await?; + + Ok(()) +} + +impl Drop for ProofWriter { + fn drop(&mut self) { + let db = if let Some(db) = self.db.take() { + db + } else { + return; + }; + let (ys, states) = if let Some(proofs) = self.proof_original_states.take() { + proofs.into_iter().unzip() + } else { + return; + }; + + tokio::spawn(rollback(db, ys, states)); + } +} diff --git a/crates/cdk/src/mint/start_up_check.rs b/crates/cdk/src/mint/start_up_check.rs index 204da168..4fe08bc0 100644 --- a/crates/cdk/src/mint/start_up_check.rs +++ b/crates/cdk/src/mint/start_up_check.rs @@ -21,10 +21,14 @@ impl Mint { "There are {} pending and unpaid mint quotes.", all_quotes.len() ); - for quote in all_quotes.iter() { + for mut quote in all_quotes.into_iter() { tracing::debug!("Checking status of mint quote: {}", quote.id); - if let Err(err) = self.check_mint_quote_paid("e.id).await { - tracing::error!("Could not check status of {}, {}", quote.id, err); + match self + .check_mint_quote_paid(self.localstore.begin_transaction().await?, &mut quote) + .await + { + Ok(tx) => tx.commit().await?, + Err(err) => tracing::error!("Could not check status of {}, {}", quote.id, err), } } Ok(()) @@ -39,6 +43,8 @@ impl Mint { .collect(); tracing::info!("There are {} pending melt quotes.", pending_quotes.len()); + let mut tx = self.localstore.begin_transaction().await?; + for pending_quote in pending_quotes { tracing::debug!("Checking status for melt quote {}.", pending_quote.id); @@ -72,8 +78,7 @@ impl Mint { MeltQuoteState::Unknown => MeltQuoteState::Unpaid, }; - if let Err(err) = self - .localstore + if let Err(err) = tx .update_melt_quote_state(&pending_quote.id, melt_quote_state) .await { @@ -86,6 +91,9 @@ impl Mint { ); }; } + + tx.commit().await?; + Ok(()) } } diff --git a/crates/cdk/src/mint/swap.rs b/crates/cdk/src/mint/swap.rs index 3b08f555..ab264828 100644 --- a/crates/cdk/src/mint/swap.rs +++ b/crates/cdk/src/mint/swap.rs @@ -1,9 +1,9 @@ use tracing::instrument; use super::nut11::{enforce_sig_flag, EnforceSigFlag}; +use super::proof_writer::ProofWriter; use super::{Mint, PublicKey, SigFlag, State, SwapRequest, SwapResponse}; -use crate::nuts::nut00::ProofsMethods; -use crate::{cdk_database, Error}; +use crate::Error; impl Mint { /// Process Swap @@ -12,8 +12,10 @@ impl Mint { &self, swap_request: SwapRequest, ) -> Result { + let mut tx = self.localstore.begin_transaction().await?; + if let Err(err) = self - .verify_transaction_balanced(swap_request.inputs(), swap_request.outputs()) + .verify_transaction_balanced(&mut tx, swap_request.inputs(), swap_request.outputs()) .await { tracing::debug!("Attempt to swap unbalanced transaction, aborting: {err}"); @@ -22,23 +24,11 @@ impl Mint { self.validate_sig_flag(&swap_request).await?; - // After swap request is fully validated, add the new proofs to DB - let input_ys = swap_request.inputs().ys()?; - if let Some(err) = self - .localstore - .add_proofs(swap_request.inputs().clone(), None) - .await - .err() - { - return match err { - cdk_common::database::Error::Duplicate => Err(Error::TokenPending), - cdk_common::database::Error::AttemptUpdateSpentProof => { - Err(Error::TokenAlreadySpent) - } - err => Err(Error::Database(err)), - }; - } - self.check_ys_spendable(&input_ys, State::Pending).await?; + let mut proof_writer = + ProofWriter::new(self.localstore.clone(), self.pubsub_manager.clone()); + let input_ys = proof_writer + .add_proofs(&mut tx, swap_request.inputs()) + .await?; let mut promises = Vec::with_capacity(swap_request.outputs().len()); @@ -47,36 +37,24 @@ impl Mint { promises.push(blinded_signature); } - // TODO: It may be possible to have a race condition, that's why an error when changing the - // state can be converted to a TokenAlreadySpent error. - // - // A concept of transaction/writer for the Database trait would eliminate this problem and - // will remove all the "reset" codebase, resulting in fewer lines of code, and less - // error-prone database updates - self.localstore - .update_proofs_states(&input_ys, State::Spent) - .await - .map_err(|e| match e { - cdk_database::Error::AttemptUpdateSpentProof => Error::TokenAlreadySpent, - e => e.into(), - })?; - - for pub_key in input_ys { - self.pubsub_manager.proof_state((pub_key, State::Spent)); - } - - self.localstore - .add_blind_signatures( - &swap_request - .outputs() - .iter() - .map(|o| o.blinded_secret) - .collect::>(), - &promises, - None, - ) + proof_writer + .update_proofs_states(&mut tx, &input_ys, State::Spent) .await?; + tx.add_blind_signatures( + &swap_request + .outputs() + .iter() + .map(|o| o.blinded_secret) + .collect::>(), + &promises, + None, + ) + .await?; + + proof_writer.commit(); + tx.commit().await?; + Ok(SwapResponse::new(promises)) } diff --git a/crates/cdk/src/mint/verification.rs b/crates/cdk/src/mint/verification.rs index c0849dd7..1a4c024e 100644 --- a/crates/cdk/src/mint/verification.rs +++ b/crates/cdk/src/mint/verification.rs @@ -4,6 +4,7 @@ use cdk_common::{Amount, BlindedMessage, CurrencyUnit, Id, Proofs, ProofsMethods use tracing::instrument; use super::{Error, Mint}; +use crate::cdk_database; /// Verification result #[derive(Debug, Clone, Hash, PartialEq, Eq)] @@ -149,12 +150,12 @@ impl Mint { #[instrument(skip_all)] pub async fn check_output_already_signed( &self, + tx: &mut Box + Send + Sync + '_>, outputs: &[BlindedMessage], ) -> Result<(), Error> { let blinded_messages: Vec = outputs.iter().map(|o| o.blinded_secret).collect(); - if self - .localstore + if tx .get_blind_signatures(&blinded_messages) .await? .iter() @@ -173,7 +174,11 @@ impl Mint { /// Verifies outputs /// Checks outputs are unique, of the same unit and not signed before #[instrument(skip_all)] - pub async fn verify_outputs(&self, outputs: &[BlindedMessage]) -> Result { + pub async fn verify_outputs( + &self, + tx: &mut Box + Send + Sync + '_>, + outputs: &[BlindedMessage], + ) -> Result { if outputs.is_empty() { return Ok(Verification { amount: Amount::ZERO, @@ -182,7 +187,7 @@ impl Mint { } Mint::check_outputs_unique(outputs)?; - self.check_output_already_signed(outputs).await?; + self.check_output_already_signed(tx, outputs).await?; let unit = self.verify_outputs_keyset(outputs).await?; @@ -215,10 +220,11 @@ impl Mint { #[instrument(skip_all)] pub async fn verify_transaction_balanced( &self, + tx: &mut Box + Send + Sync + '_>, inputs: &Proofs, outputs: &[BlindedMessage], ) -> Result<(), Error> { - let output_verification = self.verify_outputs(outputs).await.map_err(|err| { + let output_verification = self.verify_outputs(tx, outputs).await.map_err(|err| { tracing::debug!("Output verification failed: {:?}", err); err })?; diff --git a/misc/itests.sh b/misc/itests.sh index 8e57d558..cb8de230 100755 --- a/misc/itests.sh +++ b/misc/itests.sh @@ -210,7 +210,7 @@ if [ $? -ne 0 ]; then fi echo "Running happy_path_mint_wallet test with CLN mint" -cargo test -p cdk-integration-tests --test happy_path_mint_wallet test_happy_mint_melt_round_trip +cargo test -p cdk-integration-tests --test happy_path_mint_wallet if [ $? -ne 0 ]; then echo "happy_path_mint_wallet test failed, exiting" exit 1