# Implement Saga Pattern for Swap Operations with Recovery Mechanism

## Overview

This PR refactors the swap operation implementation to use the saga pattern - a distributed transaction pattern that provides reliable transaction management through explicit state tracking and compensation-based error handling. The implementation includes a robust recovery mechanism that automatically handles swap operations interrupted by crashes, power loss, or network failures.

## What Changed

**Saga Pattern Implementation:**
- Introduced a strict linear state machine for swaps: `Initial` → `SetupComplete` → `Signed` → `Completed`
- New modular `swap_saga` module with state validation, compensation logic, and saga orchestration
- Automatic rollback of database changes on failure, ensuring atomic swap operations
- Replaced previous swap implementation (`swap.rs`, `blinded_message_writer.rs`) with saga-based approach

**Recovery Mechanism:**
- Added `operation_id` and `operation_kind` columns to database schema for tracking which operation proofs belong to
- New `recover_from_bad_swaps()` method that runs on mint startup to handle incomplete swaps
- For proofs left in `PENDING` state from swap operations:
  - If blind signatures exist: marks proofs as `SPENT` (swap completed but not finalized)
  - If no blind signatures exist: removes proofs from database (swap failed partway through)
- Database migrations included for both PostgreSQL and SQLite
This commit is contained in:
tsk
2025-10-22 08:30:33 -05:00
committed by GitHub
parent db2764c566
commit 33c206a310
28 changed files with 4550 additions and 361 deletions

View File

@@ -7,7 +7,7 @@ use cashu::quote_id::QuoteId;
use cashu::Amount;
use super::Error;
use crate::mint::{self, MintKeySetInfo, MintQuote as MintMintQuote};
use crate::mint::{self, MintKeySetInfo, MintQuote as MintMintQuote, Operation};
use crate::nuts::{
BlindSignature, BlindedMessage, CurrencyUnit, Id, MeltQuoteState, Proof, Proofs, PublicKey,
State,
@@ -145,6 +145,7 @@ pub trait QuotesTransaction<'a> {
&mut self,
quote_id: Option<&QuoteId>,
blinded_messages: &[BlindedMessage],
operation: &Operation,
) -> Result<(), Self::Err>;
/// Delete blinded_messages by their blinded secrets
@@ -265,6 +266,7 @@ pub trait ProofsTransaction<'a> {
&mut self,
proof: Proofs,
quote_id: Option<QuoteId>,
operation: &Operation,
) -> Result<(), Self::Err>;
/// Updates the proofs to a given states and return the previous states
async fn update_proofs_states(
@@ -353,6 +355,45 @@ pub trait SignaturesDatabase {
) -> Result<Vec<BlindSignature>, Self::Err>;
}
#[async_trait]
/// Saga Transaction trait
pub trait SagaTransaction<'a> {
/// Saga Database Error
type Err: Into<Error> + From<Error>;
/// Get saga by operation_id
async fn get_saga(
&mut self,
operation_id: &uuid::Uuid,
) -> Result<Option<mint::Saga>, Self::Err>;
/// Add saga
async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err>;
/// Update saga state (only updates state and updated_at fields)
async fn update_saga(
&mut self,
operation_id: &uuid::Uuid,
new_state: mint::SagaStateEnum,
) -> Result<(), Self::Err>;
/// Delete saga
async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err>;
}
#[async_trait]
/// Saga Database trait
pub trait SagaDatabase {
/// Saga Database Error
type Err: Into<Error> + From<Error>;
/// Get all incomplete sagas for a given operation kind
async fn get_incomplete_sagas(
&self,
operation_kind: mint::OperationKind,
) -> Result<Vec<mint::Saga>, Self::Err>;
}
#[async_trait]
/// Commit and Rollback
pub trait DbTransactionFinalizer {
@@ -409,6 +450,7 @@ pub trait Transaction<'a, Error>:
+ SignaturesTransaction<'a, Err = Error>
+ ProofsTransaction<'a, Err = Error>
+ KVStoreTransaction<'a, Error>
+ SagaTransaction<'a, Err = Error>
{
}
@@ -453,6 +495,7 @@ pub trait Database<Error>:
+ QuotesDatabase<Err = Error>
+ ProofsDatabase<Err = Error>
+ SignaturesDatabase<Err = Error>
+ SagaDatabase<Err = Error>
{
/// Beings a transaction
async fn begin_transaction<'a>(

View File

@@ -8,7 +8,7 @@ use cashu::{Amount, Id, SecretKey};
use crate::database::mint::test::unique_string;
use crate::database::mint::{Database, Error, KeysDatabase};
use crate::database::MintSignaturesDatabase;
use crate::mint::{MeltPaymentRequest, MeltQuote, MintQuote};
use crate::mint::{MeltPaymentRequest, MeltQuote, MintQuote, Operation};
use crate::payment::PaymentIdentifier;
/// Add a mint quote
@@ -435,7 +435,7 @@ where
tx.add_melt_request(&quote.id, inputs_amount, inputs_fee)
.await
.unwrap();
tx.add_blinded_messages(Some(&quote.id), &blinded_messages)
tx.add_blinded_messages(Some(&quote.id), &blinded_messages, &Operation::new_melt())
.await
.unwrap();
tx.commit().await.unwrap();
@@ -497,7 +497,7 @@ where
.await
.unwrap();
let result = tx
.add_blinded_messages(Some(&quote2.id), &blinded_messages)
.add_blinded_messages(Some(&quote2.id), &blinded_messages, &Operation::new_melt())
.await;
assert!(result.is_err() && matches!(result.unwrap_err(), Error::Duplicate));
tx.rollback().await.unwrap(); // Rollback to avoid partial state
@@ -530,7 +530,7 @@ where
.await
.unwrap();
assert!(tx
.add_blinded_messages(Some(&quote.id), &blinded_messages)
.add_blinded_messages(Some(&quote.id), &blinded_messages, &Operation::new_melt())
.await
.is_ok());
tx.commit().await.unwrap();
@@ -543,7 +543,7 @@ where
.await
.unwrap();
let result = tx
.add_blinded_messages(Some(&quote.id), &blinded_messages)
.add_blinded_messages(Some(&quote.id), &blinded_messages, &Operation::new_melt())
.await;
// Expect a database error due to unique violation
assert!(result.is_err()); // Specific error might be DB-specific, e.g., SqliteError or PostgresError
@@ -576,7 +576,7 @@ where
tx1.add_melt_request(&quote.id, inputs_amount, inputs_fee)
.await
.unwrap();
tx1.add_blinded_messages(Some(&quote.id), &blinded_messages)
tx1.add_blinded_messages(Some(&quote.id), &blinded_messages, &Operation::new_melt())
.await
.unwrap();
tx1.commit().await.unwrap();

View File

@@ -74,7 +74,9 @@ where
// Add proofs to database
let mut tx = Database::begin_transaction(&db).await.unwrap();
tx.add_proofs(proofs.clone(), None).await.unwrap();
tx.add_proofs(proofs.clone(), None, &Operation::new_swap())
.await
.unwrap();
// Mark one proof as `pending`
assert!(tx

View File

@@ -7,6 +7,7 @@ use cashu::{Amount, Id, SecretKey};
use crate::database::mint::test::setup_keyset;
use crate::database::mint::{Database, Error, KeysDatabase, Proof, QuoteId};
use crate::mint::Operation;
/// Test get proofs by keyset id
pub async fn get_proofs_by_keyset_id<DB>(db: DB)
@@ -36,7 +37,9 @@ where
// Add proofs to database
let mut tx = Database::begin_transaction(&db).await.unwrap();
tx.add_proofs(proofs, Some(quote_id)).await.unwrap();
tx.add_proofs(proofs, Some(quote_id), &Operation::new_swap())
.await
.unwrap();
assert!(tx.commit().await.is_ok());
let (proofs, states) = db.get_proofs_by_keyset_id(&keyset_id).await.unwrap();
@@ -88,9 +91,13 @@ where
// Add proofs to database
let mut tx = Database::begin_transaction(&db).await.unwrap();
tx.add_proofs(proofs.clone(), Some(quote_id.clone()))
.await
.unwrap();
tx.add_proofs(
proofs.clone(),
Some(quote_id.clone()),
&Operation::new_swap(),
)
.await
.unwrap();
assert!(tx.commit().await.is_ok());
let proofs_from_db = db.get_proofs_by_ys(&[proofs[0].c, proofs[1].c]).await;
@@ -132,13 +139,23 @@ where
// Add proofs to database
let mut tx = Database::begin_transaction(&db).await.unwrap();
tx.add_proofs(proofs.clone(), Some(quote_id.clone()))
.await
.unwrap();
tx.add_proofs(
proofs.clone(),
Some(quote_id.clone()),
&Operation::new_swap(),
)
.await
.unwrap();
assert!(tx.commit().await.is_ok());
let mut tx = Database::begin_transaction(&db).await.unwrap();
let result = tx.add_proofs(proofs.clone(), Some(quote_id.clone())).await;
let result = tx
.add_proofs(
proofs.clone(),
Some(quote_id.clone()),
&Operation::new_swap(),
)
.await;
assert!(
matches!(result.unwrap_err(), Error::Duplicate),

View File

@@ -335,7 +335,10 @@ pub enum Error {
/// Http transport error
#[error("Http transport error {0:?}: {1}")]
HttpError(Option<u16>, String),
#[cfg(feature = "wallet")]
/// Parse invoice error
#[cfg(feature = "mint")]
#[error(transparent)]
Uuid(#[from] uuid::Error),
// Crate error conversions
/// Cashu Url Error
#[error(transparent)]

View File

@@ -1,5 +1,8 @@
//! Mint types
use std::fmt;
use std::str::FromStr;
use bitcoin::bip32::DerivationPath;
use cashu::quote_id::QuoteId;
use cashu::util::unix_time;
@@ -14,7 +17,206 @@ use uuid::Uuid;
use crate::nuts::{MeltQuoteState, MintQuoteState};
use crate::payment::PaymentIdentifier;
use crate::{Amount, CurrencyUnit, Id, KeySetInfo, PublicKey};
use crate::{Amount, CurrencyUnit, Error, Id, KeySetInfo, PublicKey};
/// Operation kind for saga persistence
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum OperationKind {
/// Swap operation
Swap,
/// Mint operation
Mint,
/// Melt operation
Melt,
}
impl fmt::Display for OperationKind {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
OperationKind::Swap => write!(f, "swap"),
OperationKind::Mint => write!(f, "mint"),
OperationKind::Melt => write!(f, "melt"),
}
}
}
impl FromStr for OperationKind {
type Err = Error;
fn from_str(value: &str) -> Result<Self, Self::Err> {
let value = value.to_lowercase();
match value.as_str() {
"swap" => Ok(OperationKind::Swap),
"mint" => Ok(OperationKind::Mint),
"melt" => Ok(OperationKind::Melt),
_ => Err(Error::Custom(format!("Invalid operation kind: {}", value))),
}
}
}
/// States specific to swap saga
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum SwapSagaState {
/// Swap setup complete (proofs added, blinded messages added)
SetupComplete,
/// Outputs signed (signatures generated but not persisted)
Signed,
}
impl fmt::Display for SwapSagaState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SwapSagaState::SetupComplete => write!(f, "setup_complete"),
SwapSagaState::Signed => write!(f, "signed"),
}
}
}
impl FromStr for SwapSagaState {
type Err = Error;
fn from_str(value: &str) -> Result<Self, Self::Err> {
let value = value.to_lowercase();
match value.as_str() {
"setup_complete" => Ok(SwapSagaState::SetupComplete),
"signed" => Ok(SwapSagaState::Signed),
_ => Err(Error::Custom(format!("Invalid swap saga state: {}", value))),
}
}
}
/// Saga state for different operation types
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SagaStateEnum {
/// Swap saga states
Swap(SwapSagaState),
// Future: Mint saga states
// Mint(MintSagaState),
// Future: Melt saga states
// Melt(MeltSagaState),
}
impl SagaStateEnum {
/// Create from string given operation kind
pub fn new(operation_kind: OperationKind, s: &str) -> Result<Self, Error> {
match operation_kind {
OperationKind::Swap => Ok(SagaStateEnum::Swap(SwapSagaState::from_str(s)?)),
OperationKind::Mint => Err(Error::Custom("Mint saga not implemented yet".to_string())),
OperationKind::Melt => Err(Error::Custom("Melt saga not implemented yet".to_string())),
}
}
/// Get string representation of the state
pub fn state(&self) -> &str {
match self {
SagaStateEnum::Swap(state) => match state {
SwapSagaState::SetupComplete => "setup_complete",
SwapSagaState::Signed => "signed",
},
}
}
}
/// Persisted saga for recovery
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct Saga {
/// Operation ID (correlation key)
pub operation_id: Uuid,
/// Operation kind (swap, mint, melt)
pub operation_kind: OperationKind,
/// Current saga state (operation-specific)
pub state: SagaStateEnum,
/// Blinded secrets (B values) from output blinded messages
pub blinded_secrets: Vec<PublicKey>,
/// Y values (public keys) from input proofs
pub input_ys: Vec<PublicKey>,
/// Unix timestamp when saga was created
pub created_at: u64,
/// Unix timestamp when saga was last updated
pub updated_at: u64,
}
impl Saga {
/// Create new swap saga
pub fn new_swap(
operation_id: Uuid,
state: SwapSagaState,
blinded_secrets: Vec<PublicKey>,
input_ys: Vec<PublicKey>,
) -> Self {
let now = unix_time();
Self {
operation_id,
operation_kind: OperationKind::Swap,
state: SagaStateEnum::Swap(state),
blinded_secrets,
input_ys,
created_at: now,
updated_at: now,
}
}
/// Update swap saga state
pub fn update_swap_state(&mut self, new_state: SwapSagaState) {
self.state = SagaStateEnum::Swap(new_state);
self.updated_at = unix_time();
}
}
/// Operation
pub enum Operation {
/// Mint
Mint(Uuid),
/// Melt
Melt(Uuid),
/// Swap
Swap(Uuid),
}
impl Operation {
/// Mint
pub fn new_mint() -> Self {
Self::Mint(Uuid::new_v4())
}
/// Melt
pub fn new_melt() -> Self {
Self::Melt(Uuid::new_v4())
}
/// Swap
pub fn new_swap() -> Self {
Self::Swap(Uuid::new_v4())
}
/// Operation id
pub fn id(&self) -> &Uuid {
match self {
Operation::Mint(id) => id,
Operation::Melt(id) => id,
Operation::Swap(id) => id,
}
}
/// Operation kind
pub fn kind(&self) -> &str {
match self {
Operation::Mint(_) => "mint",
Operation::Melt(_) => "melt",
Operation::Swap(_) => "swap",
}
}
/// From kind and i
pub fn from_kind_and_id(kind: &str, id: &str) -> Result<Self, Error> {
let uuid = Uuid::parse_str(id)?;
match kind {
"mint" => Ok(Self::Mint(uuid)),
"melt" => Ok(Self::Melt(uuid)),
"swap" => Ok(Self::Swap(uuid)),
_ => Err(Error::Custom(format!("Invalid operation kind: {}", kind))),
}
}
}
/// Mint Quote Info
#[derive(Debug, Clone, Hash, PartialEq, Eq, Serialize, Deserialize)]

View File

@@ -1183,11 +1183,6 @@ pub async fn run_mintd_with_shutdown(
let mint = Arc::new(mint);
// Checks the status of all pending melt quotes
// Pending melt quotes where the payment has gone through inputs are burnt
// Pending melt quotes where the payment has **failed** inputs are reset to unspent
mint.check_pending_melt_quotes().await?;
start_services_with_shutdown(
mint.clone(),
settings,

View File

@@ -29,3 +29,4 @@ serde.workspace = true
serde_json.workspace = true
lightning-invoice.workspace = true
once_cell.workspace = true
uuid.workspace = true

View File

@@ -0,0 +1,24 @@
-- Add operation and operation_id columns to proof table
ALTER TABLE proof ADD COLUMN operation_kind TEXT;
ALTER TABLE proof ADD COLUMN operation_id TEXT;
-- Add operation and operation_id columns to blind_signature table
ALTER TABLE blind_signature ADD COLUMN operation_kind TEXT;
ALTER TABLE blind_signature ADD COLUMN operation_id TEXT;
CREATE INDEX idx_proof_state_operation ON proof(state, operation_kind);
CREATE INDEX idx_proof_operation_id ON proof(operation_kind, operation_id);
CREATE INDEX idx_blind_sig_operation_id ON blind_signature(operation_kind, operation_id);
-- Add saga_state table for persisting saga state
CREATE TABLE IF NOT EXISTS saga_state (
operation_id TEXT PRIMARY KEY,
operation_kind TEXT NOT NULL,
state TEXT NOT NULL,
blinded_secrets TEXT NOT NULL,
input_ys TEXT NOT NULL,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_saga_state_operation_kind ON saga_state(operation_kind);

View File

@@ -0,0 +1,24 @@
-- Add operation and operation_id columns to proof table
ALTER TABLE proof ADD COLUMN operation_kind TEXT;
ALTER TABLE proof ADD COLUMN operation_id TEXT;
-- Add operation and operation_id columns to blind_signature table
ALTER TABLE blind_signature ADD COLUMN operation_kind TEXT;
ALTER TABLE blind_signature ADD COLUMN operation_id TEXT;
CREATE INDEX idx_proof_state_operation ON proof(state, operation_kind);
CREATE INDEX idx_proof_operation_id ON proof(operation_kind, operation_id);
CREATE INDEX idx_blind_sig_operation_id ON blind_signature(operation_kind, operation_id);
-- Add saga_state table for persisting saga state
CREATE TABLE IF NOT EXISTS saga_state (
operation_id TEXT PRIMARY KEY,
operation_kind TEXT NOT NULL,
state TEXT NOT NULL,
blinded_secrets TEXT NOT NULL,
input_ys TEXT NOT NULL,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_saga_state_operation_kind ON saga_state(operation_kind);

View File

@@ -15,7 +15,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use bitcoin::bip32::DerivationPath;
use cdk_common::database::mint::validate_kvstore_params;
use cdk_common::database::mint::{validate_kvstore_params, SagaDatabase, SagaTransaction};
use cdk_common::database::{
self, ConversionError, Error, MintDatabase, MintDbWriterFinalizer, MintKeyDatabaseTransaction,
MintKeysDatabase, MintProofsDatabase, MintQuotesDatabase, MintQuotesTransaction,
@@ -23,6 +23,7 @@ use cdk_common::database::{
};
use cdk_common::mint::{
self, IncomingPayment, Issuance, MeltPaymentRequest, MeltQuote, MintKeySetInfo, MintQuote,
Operation,
};
use cdk_common::nut00::ProofsMethods;
use cdk_common::payment::PaymentIdentifier;
@@ -138,6 +139,7 @@ where
&mut self,
proofs: Proofs,
quote_id: Option<QuoteId>,
operation: &Operation,
) -> Result<(), Self::Err> {
let current_time = unix_time();
@@ -165,9 +167,9 @@ where
query(
r#"
INSERT INTO proof
(y, amount, keyset_id, secret, c, witness, state, quote_id, created_time)
(y, amount, keyset_id, secret, c, witness, state, quote_id, created_time, operation_kind, operation_id)
VALUES
(:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time)
(:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time, :operation_kind, :operation_id)
"#,
)?
.bind("y", proof.y()?.to_bytes().to_vec())
@@ -182,6 +184,8 @@ where
.bind("state", "UNSPENT".to_string())
.bind("quote_id", quote_id.clone().map(|q| q.to_string()))
.bind("created_time", current_time as i64)
.bind("operation_kind", operation.kind())
.bind("operation_id", operation.id().to_string())
.execute(&self.inner)
.await?;
}
@@ -574,6 +578,7 @@ where
&mut self,
quote_id: Option<&QuoteId>,
blinded_messages: &[BlindedMessage],
operation: &Operation,
) -> Result<(), Self::Err> {
let current_time = unix_time();
@@ -583,9 +588,9 @@ where
match query(
r#"
INSERT INTO blind_signature
(blinded_message, amount, keyset_id, c, quote_id, created_time)
(blinded_message, amount, keyset_id, c, quote_id, created_time, operation_kind, operation_id)
VALUES
(:blinded_message, :amount, :keyset_id, NULL, :quote_id, :created_time)
(:blinded_message, :amount, :keyset_id, NULL, :quote_id, :created_time, :operation_kind, :operation_id)
"#,
)?
.bind(
@@ -596,6 +601,8 @@ where
.bind("keyset_id", message.keyset_id.to_string())
.bind("quote_id", quote_id.map(|q| q.to_string()))
.bind("created_time", current_time as i64)
.bind("operation_kind", operation.kind())
.bind("operation_id", operation.id().to_string())
.execute(&self.inner)
.await
{
@@ -2120,6 +2127,147 @@ where
}
}
#[async_trait]
impl<RM> SagaTransaction<'_> for SQLTransaction<RM>
where
RM: DatabasePool + 'static,
{
type Err = Error;
async fn get_saga(
&mut self,
operation_id: &uuid::Uuid,
) -> Result<Option<mint::Saga>, Self::Err> {
Ok(query(
r#"
SELECT
operation_id,
operation_kind,
state,
blinded_secrets,
input_ys,
created_at,
updated_at
FROM
saga_state
WHERE
operation_id = :operation_id
FOR UPDATE
"#,
)?
.bind("operation_id", operation_id.to_string())
.fetch_one(&self.inner)
.await?
.map(sql_row_to_saga)
.transpose()?)
}
async fn add_saga(&mut self, saga: &mint::Saga) -> Result<(), Self::Err> {
let current_time = unix_time();
let blinded_secrets_json = serde_json::to_string(&saga.blinded_secrets)
.map_err(|e| Error::Internal(format!("Failed to serialize blinded_secrets: {}", e)))?;
let input_ys_json = serde_json::to_string(&saga.input_ys)
.map_err(|e| Error::Internal(format!("Failed to serialize input_ys: {}", e)))?;
query(
r#"
INSERT INTO saga_state
(operation_id, operation_kind, state, blinded_secrets, input_ys, created_at, updated_at)
VALUES
(:operation_id, :operation_kind, :state, :blinded_secrets, :input_ys, :created_at, :updated_at)
"#,
)?
.bind("operation_id", saga.operation_id.to_string())
.bind("operation_kind", saga.operation_kind.to_string())
.bind("state", saga.state.state())
.bind("blinded_secrets", blinded_secrets_json)
.bind("input_ys", input_ys_json)
.bind("created_at", saga.created_at as i64)
.bind("updated_at", current_time as i64)
.execute(&self.inner)
.await?;
Ok(())
}
async fn update_saga(
&mut self,
operation_id: &uuid::Uuid,
new_state: mint::SagaStateEnum,
) -> Result<(), Self::Err> {
let current_time = unix_time();
query(
r#"
UPDATE saga_state
SET state = :state, updated_at = :updated_at
WHERE operation_id = :operation_id
"#,
)?
.bind("state", new_state.state())
.bind("updated_at", current_time as i64)
.bind("operation_id", operation_id.to_string())
.execute(&self.inner)
.await?;
Ok(())
}
async fn delete_saga(&mut self, operation_id: &uuid::Uuid) -> Result<(), Self::Err> {
query(
r#"
DELETE FROM saga_state
WHERE operation_id = :operation_id
"#,
)?
.bind("operation_id", operation_id.to_string())
.execute(&self.inner)
.await?;
Ok(())
}
}
#[async_trait]
impl<RM> SagaDatabase for SQLMintDatabase<RM>
where
RM: DatabasePool + 'static,
{
type Err = Error;
async fn get_incomplete_sagas(
&self,
operation_kind: mint::OperationKind,
) -> Result<Vec<mint::Saga>, Self::Err> {
let conn = self.pool.get().map_err(|e| Error::Database(Box::new(e)))?;
Ok(query(
r#"
SELECT
operation_id,
operation_kind,
state,
blinded_secrets,
input_ys,
created_at,
updated_at
FROM
saga_state
WHERE
operation_kind = :operation_kind
ORDER BY created_at ASC
"#,
)?
.bind("operation_kind", operation_kind.to_string())
.fetch_all(&*conn)
.await?
.into_iter()
.map(sql_row_to_saga)
.collect::<Result<Vec<_>, _>>()?)
}
}
#[async_trait]
impl<RM> MintDatabase<Error> for SQLMintDatabase<RM>
where
@@ -2383,6 +2531,53 @@ fn sql_row_to_blind_signature(row: Vec<Column>) -> Result<BlindSignature, Error>
})
}
fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
unpack_into!(
let (
operation_id,
operation_kind,
state,
blinded_secrets,
input_ys,
created_at,
updated_at
) = row
);
let operation_id_str = column_as_string!(&operation_id);
let operation_id = uuid::Uuid::parse_str(&operation_id_str)
.map_err(|e| Error::Internal(format!("Invalid operation_id UUID: {}", e)))?;
let operation_kind_str = column_as_string!(&operation_kind);
let operation_kind = mint::OperationKind::from_str(&operation_kind_str)
.map_err(|e| Error::Internal(format!("Invalid operation kind: {}", e)))?;
let state_str = column_as_string!(&state);
let state = mint::SagaStateEnum::new(operation_kind, &state_str)
.map_err(|e| Error::Internal(format!("Invalid saga state: {}", e)))?;
let blinded_secrets_str = column_as_string!(&blinded_secrets);
let blinded_secrets: Vec<PublicKey> = serde_json::from_str(&blinded_secrets_str)
.map_err(|e| Error::Internal(format!("Failed to deserialize blinded_secrets: {}", e)))?;
let input_ys_str = column_as_string!(&input_ys);
let input_ys: Vec<PublicKey> = serde_json::from_str(&input_ys_str)
.map_err(|e| Error::Internal(format!("Failed to deserialize input_ys: {}", e)))?;
let created_at: u64 = column_as_number!(created_at);
let updated_at: u64 = column_as_number!(updated_at);
Ok(mint::Saga {
operation_id,
operation_kind,
state,
blinded_secrets,
input_ys,
created_at,
updated_at,
})
}
#[cfg(test)]
mod test {
use super::*;

View File

@@ -2,7 +2,7 @@
use std::collections::HashMap;
use cdk_common::database::{self, MintDatabase, MintKeysDatabase};
use cdk_common::mint::{self, MintKeySetInfo, MintQuote};
use cdk_common::mint::{self, MintKeySetInfo, MintQuote, Operation};
use cdk_common::nuts::{CurrencyUnit, Id, Proofs};
use cdk_common::MintInfo;
@@ -56,8 +56,10 @@ pub async fn new_with_state(
tx.add_melt_quote(quote).await?;
}
tx.add_proofs(pending_proofs, None).await?;
tx.add_proofs(spent_proofs, None).await?;
tx.add_proofs(pending_proofs, None, &Operation::new_swap())
.await?;
tx.add_proofs(spent_proofs, None, &Operation::new_swap())
.await?;
let mint_info_bytes = serde_json::to_vec(&mint_info)?;
tx.kv_write(
CDK_MINT_PRIMARY_NAMESPACE,

View File

@@ -141,12 +141,14 @@ required-features = ["wallet"]
[dev-dependencies]
rand.workspace = true
cdk-sqlite.workspace = true
cdk-fake-wallet.workspace = true
bip39.workspace = true
tracing-subscriber.workspace = true
criterion.workspace = true
reqwest = { workspace = true }
anyhow.workspace = true
ureq = { version = "3.1.0", features = ["json"] }
tokio = { workspace = true, features = ["full"] }
[[bench]]

View File

@@ -49,6 +49,9 @@ pub use oidc_client::OidcClient;
pub mod event;
pub mod fees;
#[cfg(test)]
pub mod test_helpers;
#[doc(hidden)]
pub use bitcoin::secp256k1;
#[cfg(feature = "mint")]

View File

@@ -1,152 +0,0 @@
//! Blinded message writer
use std::collections::HashSet;
use cdk_common::database::{self, DynMintDatabase, MintTransaction};
use cdk_common::nuts::BlindedMessage;
use cdk_common::{Error, PublicKey, QuoteId};
type Tx<'a, 'b> = Box<dyn MintTransaction<'a, database::Error> + Send + Sync + 'b>;
/// Blinded message writer
///
/// This is a blinded message writer that emulates a database transaction but without holding the
/// transaction alive while waiting for external events to be fully committed to the database;
/// instead, it maintains a `pending` state.
///
/// This struct allows for premature exit on error, enabling it to remove blinded messages that
/// were added during the operation.
///
/// This struct is not fully ACID. If the process exits due to a panic, and the `Drop` function
/// cannot be run, the cleanup process should reset the state.
pub struct BlindedMessageWriter {
db: Option<DynMintDatabase>,
added_blinded_secrets: Option<HashSet<PublicKey>>,
}
impl BlindedMessageWriter {
/// Creates a new BlindedMessageWriter on top of the database
pub fn new(db: DynMintDatabase) -> Self {
Self {
db: Some(db),
added_blinded_secrets: Some(Default::default()),
}
}
/// The changes are permanent, consume the struct removing the database, so the Drop does
/// nothing
pub fn commit(mut self) {
self.db.take();
self.added_blinded_secrets.take();
}
/// Add blinded messages
pub async fn add_blinded_messages(
&mut self,
tx: &mut Tx<'_, '_>,
quote_id: Option<QuoteId>,
blinded_messages: &[BlindedMessage],
) -> Result<Vec<PublicKey>, Error> {
let added_secrets = if let Some(secrets) = self.added_blinded_secrets.as_mut() {
secrets
} else {
return Err(Error::Internal);
};
if let Some(err) = tx
.add_blinded_messages(quote_id.as_ref(), blinded_messages)
.await
.err()
{
return match err {
cdk_common::database::Error::Duplicate => Err(Error::DuplicateOutputs),
err => Err(Error::Database(err)),
};
}
let blinded_secrets: Vec<PublicKey> = blinded_messages
.iter()
.map(|bm| bm.blinded_secret)
.collect();
for blinded_secret in &blinded_secrets {
added_secrets.insert(*blinded_secret);
}
Ok(blinded_secrets)
}
/// Rollback all changes in this BlindedMessageWriter consuming it.
pub async fn rollback(mut self) -> Result<(), Error> {
let db = if let Some(db) = self.db.take() {
db
} else {
return Ok(());
};
let mut tx = db.begin_transaction().await?;
let blinded_secrets: Vec<PublicKey> =
if let Some(secrets) = self.added_blinded_secrets.take() {
secrets.into_iter().collect()
} else {
return Ok(());
};
if !blinded_secrets.is_empty() {
tracing::info!("Rollback {} blinded messages", blinded_secrets.len(),);
remove_blinded_messages(&mut tx, &blinded_secrets).await?;
}
tx.commit().await?;
Ok(())
}
}
/// Removes blinded messages from the database
#[inline(always)]
async fn remove_blinded_messages(
tx: &mut Tx<'_, '_>,
blinded_secrets: &[PublicKey],
) -> Result<(), Error> {
tx.delete_blinded_messages(blinded_secrets)
.await
.map_err(Error::Database)
}
#[inline(always)]
async fn rollback_blinded_messages(
db: DynMintDatabase,
blinded_secrets: Vec<PublicKey>,
) -> Result<(), Error> {
let mut tx = db.begin_transaction().await?;
remove_blinded_messages(&mut tx, &blinded_secrets).await?;
tx.commit().await?;
Ok(())
}
impl Drop for BlindedMessageWriter {
fn drop(&mut self) {
let db = if let Some(db) = self.db.take() {
db
} else {
tracing::debug!("Blinded message writer dropped after commit, no need to rollback.");
return;
};
let blinded_secrets: Vec<PublicKey> =
if let Some(secrets) = self.added_blinded_secrets.take() {
secrets.into_iter().collect()
} else {
return;
};
if !blinded_secrets.is_empty() {
tracing::debug!("Blinded message writer dropper with messages attempting to remove.");
tokio::spawn(async move {
if let Err(err) = rollback_blinded_messages(db, blinded_secrets).await {
tracing::error!("Failed to rollback blinded messages in Drop: {}", err);
}
});
}
}
}

View File

@@ -1,4 +1,4 @@
use cdk_common::mint::MintQuote;
use cdk_common::mint::{MintQuote, Operation};
use cdk_common::payment::{
Bolt11IncomingPaymentOptions, Bolt11Settings, Bolt12IncomingPaymentOptions,
IncomingPaymentOptions, WaitPaymentResponse,
@@ -657,6 +657,10 @@ impl Mint {
let unit = unit.ok_or(Error::UnsupportedUnit).unwrap();
ensure_cdk!(unit == mint_quote.unit, Error::UnsupportedUnit);
let operation = Operation::new_mint();
tx.add_blinded_messages(Some(&mint_request.quote), &mint_request.outputs, &operation).await?;
tx.add_blind_signatures(
&mint_request
.outputs

View File

@@ -5,7 +5,7 @@ use cdk_common::amount::amount_for_offer;
use cdk_common::database::mint::MeltRequestInfo;
use cdk_common::database::{self, MintTransaction};
use cdk_common::melt::MeltQuoteRequest;
use cdk_common::mint::MeltPaymentRequest;
use cdk_common::mint::{MeltPaymentRequest, Operation};
use cdk_common::nut05::MeltMethodOptions;
use cdk_common::payment::{
Bolt11OutgoingPaymentOptions, Bolt12OutgoingPaymentOptions, DynMintPayment,
@@ -506,6 +506,7 @@ impl Mint {
tx: &mut Box<dyn MintTransaction<'_, database::Error> + Send + Sync + '_>,
input_verification: Verification,
melt_request: &MeltRequest<QuoteId>,
operation: &Operation,
) -> Result<(ProofWriter, MeltQuote), Error> {
let Verification {
amount: input_amount,
@@ -520,6 +521,7 @@ impl Mint {
tx,
melt_request.inputs(),
Some(melt_request.quote_id().to_owned()),
operation,
)
.await?;
@@ -613,10 +615,11 @@ impl Mint {
let verification = self.verify_inputs(melt_request.inputs()).await?;
let melt_operation = Operation::new_melt();
let mut tx = self.localstore.begin_transaction().await?;
let (proof_writer, quote) = match self
.verify_melt_request(&mut tx, verification, melt_request)
.verify_melt_request(&mut tx, verification, melt_request, &melt_operation)
.await
{
Ok(result) => result,
@@ -646,6 +649,7 @@ impl Mint {
tx.add_blinded_messages(
Some(melt_request.quote_id()),
melt_request.outputs().as_ref().unwrap_or(&Vec::new()),
&melt_operation,
)
.await?;

View File

@@ -34,7 +34,6 @@ use crate::{cdk_database, Amount};
#[cfg(feature = "auth")]
pub(crate) mod auth;
mod blinded_message_writer;
mod builder;
mod check_spendable;
mod issue;
@@ -239,6 +238,15 @@ impl Mint {
/// - Payment processor initialization and startup
/// - Invoice payment monitoring across all configured payment processors
pub async fn start(&self) -> Result<(), Error> {
// Checks the status of all pending melt quotes
// Pending melt quotes where the payment has gone through inputs are burnt
// Pending melt quotes where the payment has **failed** inputs are reset to unspent
self.check_pending_melt_quotes().await?;
// Recover from incomplete swap sagas
// This cleans up incomplete swap operations using persisted saga state
self.recover_from_incomplete_sagas().await?;
let mut task_state = self.task_state.lock().await;
// Prevent starting if already running
@@ -813,6 +821,13 @@ impl Mint {
&self,
blinded_message: Vec<BlindedMessage>,
) -> Result<Vec<BlindSignature>, Error> {
#[cfg(test)]
{
if crate::test_helpers::mint::should_fail_in_test() {
return Err(Error::SignatureMissingOrInvalid);
}
}
#[cfg(feature = "prometheus")]
global::inc_in_flight_requests("blind_sign");

View File

@@ -3,6 +3,7 @@ use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use cdk_common::database::{self, DynMintDatabase, MintTransaction};
use cdk_common::mint::Operation;
use cdk_common::{Error, Proofs, ProofsMethods, PublicKey, QuoteId, State};
use super::subscription::PubSubManager;
@@ -49,6 +50,7 @@ impl ProofWriter {
tx: &mut Tx<'_, '_>,
proofs: &Proofs,
quote_id: Option<QuoteId>,
operation_id: &Operation,
) -> Result<Vec<PublicKey>, Error> {
let proof_states = if let Some(proofs) = self.proof_original_states.as_mut() {
proofs
@@ -56,7 +58,11 @@ impl ProofWriter {
return Err(Error::Internal);
};
if let Some(err) = tx.add_proofs(proofs.clone(), quote_id).await.err() {
if let Some(err) = tx
.add_proofs(proofs.clone(), quote_id, operation_id)
.await
.err()
{
return match err {
cdk_common::database::Error::Duplicate => Err(Error::TokenPending),
cdk_common::database::Error::AttemptUpdateSpentProof => {

View File

@@ -3,7 +3,10 @@
//! These checks are need in the case the mint was offline and the lightning node was node.
//! These ensure that the status of the mint or melt quote matches in the mint db and on the node.
use cdk_common::mint::OperationKind;
use super::{Error, Mint};
use crate::mint::swap::swap_saga::compensation::{CompensatingAction, RemoveSwapSetup};
use crate::mint::{MeltQuote, MeltQuoteState, PaymentMethod};
use crate::types::PaymentProcessorKey;
@@ -79,4 +82,67 @@ impl Mint {
Ok(())
}
/// Recover from incomplete swap sagas
///
/// Checks all persisted sagas for swap operations and compensates
/// incomplete ones by removing both proofs and blinded messages.
pub async fn recover_from_incomplete_sagas(&self) -> Result<(), Error> {
let incomplete_sagas = self
.localstore
.get_incomplete_sagas(OperationKind::Swap)
.await?;
if incomplete_sagas.is_empty() {
tracing::info!("No incomplete swap sagas found to recover.");
return Ok(());
}
let total_sagas = incomplete_sagas.len();
tracing::info!("Found {} incomplete swap sagas to recover.", total_sagas);
for saga in incomplete_sagas {
tracing::info!(
"Recovering saga {} in state '{}' (created: {}, updated: {})",
saga.operation_id,
saga.state.state(),
saga.created_at,
saga.updated_at
);
// Use the same compensation logic as in-process failures
let compensation = RemoveSwapSetup {
blinded_secrets: saga.blinded_secrets.clone(),
input_ys: saga.input_ys.clone(),
};
// Execute compensation
if let Err(e) = compensation.execute(&self.localstore).await {
tracing::error!(
"Failed to compensate saga {}: {}. Continuing...",
saga.operation_id,
e
);
continue;
}
// Delete saga after successful compensation
let mut tx = self.localstore.begin_transaction().await?;
if let Err(e) = tx.delete_saga(&saga.operation_id).await {
tracing::error!("Failed to delete saga for {}: {}", saga.operation_id, e);
tx.rollback().await?;
continue;
}
tx.commit().await?;
tracing::info!("Successfully recovered saga {}", saga.operation_id);
}
tracing::info!(
"Successfully recovered {} incomplete swap sagas.",
total_sagas
);
Ok(())
}
}

View File

@@ -1,173 +0,0 @@
#[cfg(feature = "prometheus")]
use cdk_prometheus::METRICS;
use tracing::instrument;
use super::blinded_message_writer::BlindedMessageWriter;
use super::nut11::{enforce_sig_flag, EnforceSigFlag};
use super::proof_writer::ProofWriter;
use super::{Mint, PublicKey, SigFlag, State, SwapRequest, SwapResponse};
use crate::Error;
impl Mint {
/// Process Swap
#[instrument(skip_all)]
pub async fn process_swap_request(
&self,
swap_request: SwapRequest,
) -> Result<SwapResponse, Error> {
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("process_swap_request");
// Do the external call before beginning the db transaction
// Check any overflow before talking to the signatory
swap_request.input_amount()?;
swap_request.output_amount()?;
// We add blinded messages to db before attempting to sign
// this ensures that they are unique and have not been used before
let mut blinded_message_writer = BlindedMessageWriter::new(self.localstore.clone());
let mut tx = self.localstore.begin_transaction().await?;
match blinded_message_writer
.add_blinded_messages(&mut tx, None, swap_request.outputs())
.await
{
Ok(_) => {
tx.commit().await?;
}
Err(err) => {
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("process_swap_request");
METRICS.record_mint_operation("process_swap_request", false);
METRICS.record_error();
}
return Err(err);
}
}
let promises = self.blind_sign(swap_request.outputs().to_owned()).await?;
let input_verification =
self.verify_inputs(swap_request.inputs())
.await
.map_err(|err| {
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("process_swap_request");
METRICS.record_mint_operation("process_swap_request", false);
METRICS.record_error();
}
tracing::debug!("Input verification failed: {:?}", err);
err
})?;
let mut tx = self.localstore.begin_transaction().await?;
if let Err(err) = self
.verify_transaction_balanced(
&mut tx,
input_verification,
swap_request.inputs(),
swap_request.outputs(),
)
.await
{
tracing::debug!("Attempt to swap unbalanced transaction, aborting: {err}");
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("process_swap_request");
METRICS.record_mint_operation("process_swap_request", false);
METRICS.record_error();
}
tx.rollback().await?;
blinded_message_writer.rollback().await?;
return Err(err);
};
let validate_sig_result = self.validate_sig_flag(&swap_request).await;
if let Err(err) = validate_sig_result {
tx.rollback().await?;
blinded_message_writer.rollback().await?;
#[cfg(feature = "prometheus")]
self.record_swap_failure("process_swap_request");
return Err(err);
}
let mut proof_writer =
ProofWriter::new(self.localstore.clone(), self.pubsub_manager.clone());
let input_ys = match proof_writer
.add_proofs(&mut tx, swap_request.inputs(), None)
.await
{
Ok(ys) => ys,
Err(err) => {
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("process_swap_request");
METRICS.record_mint_operation("process_swap_request", false);
METRICS.record_error();
}
tx.rollback().await?;
blinded_message_writer.rollback().await?;
return Err(err);
}
};
let update_proof_states_result = proof_writer
.update_proofs_states(&mut tx, &input_ys, State::Spent)
.await;
if let Err(err) = update_proof_states_result {
#[cfg(feature = "prometheus")]
self.record_swap_failure("process_swap_request");
tx.rollback().await?;
blinded_message_writer.rollback().await?;
return Err(err);
}
tx.add_blind_signatures(
&swap_request
.outputs()
.iter()
.map(|o| o.blinded_secret)
.collect::<Vec<PublicKey>>(),
&promises,
None,
)
.await?;
proof_writer.commit();
blinded_message_writer.commit();
tx.commit().await?;
let response = SwapResponse::new(promises);
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("process_swap_request");
METRICS.record_mint_operation("process_swap_request", true);
}
Ok(response)
}
async fn validate_sig_flag(&self, swap_request: &SwapRequest) -> Result<(), Error> {
let EnforceSigFlag { sig_flag, .. } = enforce_sig_flag(swap_request.inputs().clone());
if sig_flag == SigFlag::SigAll {
swap_request.verify_sig_all()?;
}
Ok(())
}
#[cfg(feature = "prometheus")]
fn record_swap_failure(&self, operation: &str) {
METRICS.dec_in_flight_requests(operation);
METRICS.record_mint_operation(operation, false);
METRICS.record_error();
}
}

View File

@@ -0,0 +1,84 @@
#[cfg(feature = "prometheus")]
use cdk_prometheus::METRICS;
use swap_saga::SwapSaga;
use tracing::instrument;
use super::nut11::{enforce_sig_flag, EnforceSigFlag};
use super::{Mint, SigFlag, SwapRequest, SwapResponse};
use crate::Error;
pub mod swap_saga;
impl Mint {
/// Process Swap
#[instrument(skip_all)]
pub async fn process_swap_request(
&self,
swap_request: SwapRequest,
) -> Result<SwapResponse, Error> {
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("process_swap_request");
swap_request.input_amount()?;
swap_request.output_amount()?;
// Verify inputs (cryptographic verification, no DB needed)
let input_verification =
self.verify_inputs(swap_request.inputs())
.await
.map_err(|err| {
#[cfg(feature = "prometheus")]
self.record_swap_failure("process_swap_request");
tracing::debug!("Input verification failed: {:?}", err);
err
})?;
// Verify signature flag (no DB needed)
self.validate_sig_flag(&swap_request).await?;
// Step 1: Initialize the swap saga
let init_saga = SwapSaga::new(self, self.localstore.clone(), self.pubsub_manager.clone());
// Step 2: TX1 - Setup swap (verify balance + add inputs as pending + add output blinded messages)
let setup_saga = init_saga
.setup_swap(
swap_request.inputs(),
swap_request.outputs(),
None,
input_verification,
)
.await?;
// Step 3: Blind sign outputs (no DB transaction)
let signed_saga = setup_saga.sign_outputs().await?;
// Step 4: TX2 - Finalize swap (add signatures + mark inputs spent)
let response = signed_saga.finalize().await?;
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("process_swap_request");
METRICS.record_mint_operation("process_swap_request", true);
}
Ok(response)
}
async fn validate_sig_flag(&self, swap_request: &SwapRequest) -> Result<(), Error> {
let EnforceSigFlag { sig_flag, .. } = enforce_sig_flag(swap_request.inputs().clone());
if sig_flag == SigFlag::SigAll {
swap_request.verify_sig_all()?;
}
Ok(())
}
#[cfg(feature = "prometheus")]
fn record_swap_failure(&self, operation: &str) {
METRICS.dec_in_flight_requests(operation);
METRICS.record_mint_operation(operation, false);
METRICS.record_error();
}
}

View File

@@ -0,0 +1,61 @@
use async_trait::async_trait;
use cdk_common::database::DynMintDatabase;
use cdk_common::{Error, PublicKey};
use tracing::instrument;
#[async_trait]
pub trait CompensatingAction: Send + Sync {
async fn execute(&self, db: &DynMintDatabase) -> Result<(), Error>;
fn name(&self) -> &'static str;
}
/// Compensation action to remove swap setup (both proofs and blinded messages).
///
/// This compensation is used when blind signing fails or finalization fails after
/// the setup transaction has committed. It removes:
/// - Output blinded messages (identified by blinded_secrets)
/// - Input proofs (identified by input_ys)
///
/// This restores the database to its pre-swap state.
pub struct RemoveSwapSetup {
/// Blinded secrets (B values) from the output blinded messages
pub blinded_secrets: Vec<PublicKey>,
/// Y values (public keys) from the input proofs
pub input_ys: Vec<PublicKey>,
}
#[async_trait]
impl CompensatingAction for RemoveSwapSetup {
#[instrument(skip_all)]
async fn execute(&self, db: &DynMintDatabase) -> Result<(), Error> {
if self.blinded_secrets.is_empty() && self.input_ys.is_empty() {
return Ok(());
}
tracing::info!(
"Compensation: Removing swap setup ({} blinded messages, {} proofs)",
self.blinded_secrets.len(),
self.input_ys.len()
);
let mut tx = db.begin_transaction().await?;
// Remove blinded messages (outputs)
if !self.blinded_secrets.is_empty() {
tx.delete_blinded_messages(&self.blinded_secrets).await?;
}
// Remove proofs (inputs)
if !self.input_ys.is_empty() {
tx.remove_proofs(&self.input_ys, None).await?;
}
tx.commit().await?;
Ok(())
}
fn name(&self) -> &'static str {
"RemoveSwapSetup"
}
}

View File

@@ -0,0 +1,514 @@
use std::collections::VecDeque;
use std::sync::Arc;
use cdk_common::database::DynMintDatabase;
use cdk_common::mint::{Operation, Saga, SwapSagaState};
use cdk_common::nuts::BlindedMessage;
use cdk_common::{database, Error, Proofs, ProofsMethods, PublicKey, QuoteId, State};
use tokio::sync::Mutex;
use tracing::instrument;
use self::compensation::{CompensatingAction, RemoveSwapSetup};
use self::state::{Initial, SetupComplete, Signed};
use crate::mint::subscription::PubSubManager;
pub mod compensation;
mod state;
#[cfg(test)]
mod tests;
/// Saga pattern implementation for atomic swap operations.
///
/// # Why Use the Saga Pattern?
///
/// The swap operation consists of multiple steps that span database transactions
/// and non-transactional operations (blind signing). We need to ensure atomicity
/// across these heterogeneous steps while maintaining consistency in failure scenarios.
///
/// Traditional ACID transactions cannot span:
/// 1. Multiple database transactions (TX1: setup, TX2: finalize)
/// 2. Non-database operations (blind signing of outputs)
///
/// The saga pattern solves this by:
/// - Breaking the operation into discrete steps with clear state transitions
/// - Recording compensating actions for each forward step
/// - Automatically rolling back via compensations if any step fails
///
/// # Transaction Boundaries
///
/// - **TX1 (setup_swap)**: Atomically verifies balance, adds input proofs (pending),
/// adds output blinded messages, and persists saga state for crash recovery
/// - **Signing (sign_outputs)**: Non-transactional cryptographic operation
/// - **TX2 (finalize)**: Atomically adds signatures to outputs, marks inputs as spent,
/// and deletes saga state (best-effort, will be cleaned up on recovery if this fails)
///
/// Saga state persistence is atomic with swap state changes, ensuring consistency
/// for crash recovery scenarios.
///
/// # Expected Actions
///
/// 1. **setup_swap**: Verifies the swap is balanced, reserves inputs, prepares outputs
/// - Compensation: Removes both inputs and outputs if later steps fail
/// 2. **sign_outputs**: Performs blind signing (no DB changes)
/// - Triggers compensation if signing fails
/// 3. **finalize**: Commits signatures and marks inputs spent
/// - Triggers compensation if finalization fails
/// - Clears compensations on success (swap complete)
///
/// # Failure Handling
///
/// If any step fails after setup_swap, all compensating actions are executed in reverse
/// order to restore the database to its pre-swap state. This ensures no partial swaps
/// leave the system in an inconsistent state.
///
/// # Compensation Order (LIFO)
///
/// Compensations are stored in a VecDeque and executed in LIFO (Last-In-First-Out) order
/// using `push_front` + iteration. This ensures that actions are undone in the reverse
/// order they were performed, which is critical for maintaining data consistency.
///
/// Example: If we perform actions A → B → C in the forward path, compensations must
/// execute as C' → B' → A' to properly reverse the operations without violating
/// any invariants or constraints.
///
/// # Typestate Pattern
///
/// This saga uses the **typestate pattern** to enforce state transitions at compile-time.
/// Each state (Initial, SetupComplete, Signed) is a distinct type, and operations are
/// only available on the appropriate type:
///
/// ```text
/// SwapSaga<Initial>
/// └─> setup_swap() -> SwapSaga<SetupComplete>
/// └─> sign_outputs() -> SwapSaga<Signed>
/// └─> finalize() -> SwapResponse
/// ```
///
/// **Benefits:**
/// - Invalid state transitions (e.g., `finalize()` before `sign_outputs()`) won't compile
/// - State-specific data (e.g., signatures) only exists in the appropriate state type
/// - No runtime state checks or `Option<T>` unwrapping needed
/// - IDE autocomplete only shows valid operations for each state
pub struct SwapSaga<'a, S> {
mint: &'a super::Mint,
db: DynMintDatabase,
pubsub: Arc<PubSubManager>,
/// Compensating actions in LIFO order (most recent first)
compensations: Arc<Mutex<VecDeque<Box<dyn CompensatingAction>>>>,
operation: Operation,
state_data: S,
}
impl<'a> SwapSaga<'a, Initial> {
pub fn new(mint: &'a super::Mint, db: DynMintDatabase, pubsub: Arc<PubSubManager>) -> Self {
Self {
mint,
db,
pubsub,
compensations: Arc::new(Mutex::new(VecDeque::new())),
operation: Operation::new_swap(),
state_data: Initial,
}
}
/// Sets up the swap by atomically verifying balance and reserving inputs/outputs.
///
/// This is the first transaction (TX1) in the saga and must complete before blind signing.
///
/// # What This Does
///
/// Within a single database transaction:
/// 1. Verifies the swap is balanced (input amount >= output amount + fees)
/// 2. Adds input proofs to the database
/// 3. Updates input proof states from Unspent to Pending
/// 4. Adds output blinded messages to the database
/// 5. Persists saga state for crash recovery (atomic with steps 1-4)
/// 6. Publishes proof state changes via pubsub
///
/// # Compensation
///
/// Registers a compensation action that will remove both the input proofs and output
/// blinded messages if any subsequent step (signing or finalization) fails.
///
/// # Errors
///
/// - `TokenPending`: Proofs are already pending or blinded messages are duplicates
/// - `TokenAlreadySpent`: Proofs have already been spent
/// - `DuplicateOutputs`: Output blinded messages already exist
#[instrument(skip_all)]
pub async fn setup_swap(
self,
input_proofs: &Proofs,
blinded_messages: &[BlindedMessage],
quote_id: Option<QuoteId>,
input_verification: crate::mint::Verification,
) -> Result<SwapSaga<'a, SetupComplete>, Error> {
tracing::info!("TX1: Setting up swap (verify + inputs + outputs)");
let mut tx = self.db.begin_transaction().await?;
// Verify balance within the transaction
self.mint
.verify_transaction_balanced(
&mut tx,
input_verification,
input_proofs,
blinded_messages,
)
.await?;
// Add input proofs to DB
if let Err(err) = tx
.add_proofs(input_proofs.clone(), quote_id.clone(), &self.operation)
.await
{
tx.rollback().await?;
return Err(match err {
database::Error::Duplicate => Error::TokenPending,
database::Error::AttemptUpdateSpentProof => Error::TokenAlreadySpent,
_ => Error::Database(err),
});
}
let ys = match input_proofs.ys() {
Ok(ys) => ys,
Err(err) => return Err(Error::NUT00(err)),
};
// Update input proof states to Pending
let original_proof_states = match tx.update_proofs_states(&ys, State::Pending).await {
Ok(states) => states,
Err(database::Error::AttemptUpdateSpentProof)
| Err(database::Error::AttemptRemoveSpentProof) => {
tx.rollback().await?;
return Err(Error::TokenAlreadySpent);
}
Err(err) => {
tx.rollback().await?;
return Err(err.into());
}
};
// Verify proofs weren't already pending or spent
if ys.len() != original_proof_states.len() {
tracing::error!("Mismatched proof states");
tx.rollback().await?;
return Err(Error::Internal);
}
let forbidden_states = [State::Pending, State::Spent];
for original_state in original_proof_states.iter().flatten() {
if forbidden_states.contains(original_state) {
tx.rollback().await?;
return Err(if *original_state == State::Pending {
Error::TokenPending
} else {
Error::TokenAlreadySpent
});
}
}
// Add output blinded messages
if let Err(err) = tx
.add_blinded_messages(quote_id.as_ref(), blinded_messages, &self.operation)
.await
{
tx.rollback().await?;
return Err(match err {
database::Error::Duplicate => Error::DuplicateOutputs,
_ => Error::Database(err),
});
}
// Publish proof state changes
for pk in &ys {
self.pubsub.proof_state((*pk, State::Pending));
}
// Store data in saga struct (avoid duplication in state enum)
let blinded_messages_vec = blinded_messages.to_vec();
let blinded_secrets: Vec<PublicKey> = blinded_messages_vec
.iter()
.map(|bm| bm.blinded_secret)
.collect();
// Persist saga state for crash recovery (atomic with TX1)
let saga = Saga::new_swap(
*self.operation.id(),
SwapSagaState::SetupComplete,
blinded_secrets.clone(),
ys.clone(),
);
if let Err(err) = tx.add_saga(&saga).await {
tx.rollback().await?;
return Err(err.into());
}
tx.commit().await?;
// Register compensation (uses LIFO via push_front)
let compensations = Arc::clone(&self.compensations);
compensations
.lock()
.await
.push_front(Box::new(RemoveSwapSetup {
blinded_secrets: blinded_secrets.clone(),
input_ys: ys.clone(),
}));
// Transition to SetupComplete state
Ok(SwapSaga {
mint: self.mint,
db: self.db,
pubsub: self.pubsub,
compensations: self.compensations,
operation: self.operation,
state_data: SetupComplete {
blinded_messages: blinded_messages_vec,
ys,
},
})
}
}
impl<'a> SwapSaga<'a, SetupComplete> {
/// Performs blind signing of output blinded messages.
///
/// This is a non-transactional cryptographic operation that happens after `setup_swap`
/// and before `finalize`. No database changes occur in this step.
///
/// # What This Does
///
/// 1. Retrieves blinded messages from the state data
/// 2. Calls the mint's blind signing function to generate signatures
/// 3. Stores signatures and transitions to the Signed state
///
/// # Failure Handling
///
/// If blind signing fails, all registered compensations are executed to roll back
/// the setup transaction, removing both input proofs and output blinded messages.
///
/// # Errors
///
/// - Propagates any errors from the blind signing operation
#[instrument(skip_all)]
pub async fn sign_outputs(self) -> Result<SwapSaga<'a, Signed>, Error> {
tracing::info!("Signing outputs (no DB)");
match self
.mint
.blind_sign(self.state_data.blinded_messages.clone())
.await
{
Ok(signatures) => {
// Transition to Signed state
// Note: We don't update saga state here because the "signed" state
// is not used by recovery logic - saga state remains "SetupComplete"
// until the swap is finalized or compensated
Ok(SwapSaga {
mint: self.mint,
db: self.db,
pubsub: self.pubsub,
compensations: self.compensations,
operation: self.operation,
state_data: Signed {
blinded_messages: self.state_data.blinded_messages,
ys: self.state_data.ys,
signatures,
},
})
}
Err(err) => {
self.compensate_all().await?;
Err(err)
}
}
}
}
impl SwapSaga<'_, Signed> {
/// Finalizes the swap by committing signatures and marking inputs as spent.
///
/// This is the second and final transaction (TX2) in the saga and completes the swap.
///
/// # What This Does
///
/// Within a single database transaction:
/// 1. Adds the blind signatures to the output blinded messages
/// 2. Updates input proof states from Pending to Spent
/// 3. Deletes saga state (best-effort, won't fail swap if this fails)
/// 4. Publishes proof state changes via pubsub
/// 5. Clears all registered compensations (swap successfully completed)
///
/// # Failure Handling
///
/// If finalization fails, all registered compensations are executed to roll back
/// the setup transaction, removing both input proofs and output blinded messages.
/// The signatures are not persisted, so they are lost.
///
/// # Success
///
/// On success, compensations are cleared and the swap is complete. The client
/// can now use the returned signatures to construct valid proofs. If saga state
/// deletion fails, a warning is logged but the swap still succeeds (orphaned
/// saga state will be cleaned up on next recovery).
///
/// # Errors
///
/// - `TokenAlreadySpent`: Input proofs were already spent by another operation
/// - Propagates any database errors
#[instrument(skip_all)]
pub async fn finalize(self) -> Result<cdk_common::nuts::SwapResponse, Error> {
tracing::info!("TX2: Finalizing swap (signatures + mark spent)");
let blinded_secrets: Vec<PublicKey> = self
.state_data
.blinded_messages
.iter()
.map(|bm| bm.blinded_secret)
.collect();
let mut tx = self.db.begin_transaction().await?;
// Add blind signatures to outputs
// TODO: WE should move the should fail to the db so the there is not this extra rollback.
// This would allow the error to be from the same place in test and prod
#[cfg(test)]
{
if crate::test_helpers::mint::should_fail_for("ADD_SIGNATURES") {
tx.rollback().await?;
self.compensate_all().await?;
return Err(Error::Database(database::Error::Database(
"Test failure: ADD_SIGNATURES".into(),
)));
}
}
if let Err(err) = tx
.add_blind_signatures(&blinded_secrets, &self.state_data.signatures, None)
.await
{
tx.rollback().await?;
self.compensate_all().await?;
return Err(err.into());
}
// Mark input proofs as spent
// TODO: WE should move the should fail to the db so the there is not this extra rollback.
// This would allow the error to be from the same place in test and prod
#[cfg(test)]
{
if crate::test_helpers::mint::should_fail_for("UPDATE_PROOFS") {
tx.rollback().await?;
self.compensate_all().await?;
return Err(Error::Database(database::Error::Database(
"Test failure: UPDATE_PROOFS".into(),
)));
}
}
match tx
.update_proofs_states(&self.state_data.ys, State::Spent)
.await
{
Ok(_) => {}
Err(database::Error::AttemptUpdateSpentProof)
| Err(database::Error::AttemptRemoveSpentProof) => {
tx.rollback().await?;
self.compensate_all().await?;
return Err(Error::TokenAlreadySpent);
}
Err(err) => {
tx.rollback().await?;
self.compensate_all().await?;
return Err(err.into());
}
}
// Publish proof state changes
for pk in &self.state_data.ys {
self.pubsub.proof_state((*pk, State::Spent));
}
// Delete saga - swap completed successfully (best-effort, atomic with TX2)
// Don't fail the swap if saga deletion fails - orphaned saga will be
// cleaned up on next recovery
if let Err(e) = tx.delete_saga(self.operation.id()).await {
tracing::warn!(
"Failed to delete saga in finalize (will be cleaned up on recovery): {}",
e
);
// Don't rollback - swap succeeded, orphaned saga is harmless
}
tx.commit().await?;
// Clear compensations - swap is complete
self.compensations.lock().await.clear();
Ok(cdk_common::nuts::SwapResponse::new(
self.state_data.signatures,
))
}
}
impl<S> SwapSaga<'_, S> {
/// Execute all compensating actions and consume the saga.
///
/// This method takes ownership of self to ensure the saga cannot be used
/// after compensation has been triggered.
#[instrument(skip_all)]
async fn compensate_all(self) -> Result<(), Error> {
let mut compensations = self.compensations.lock().await;
if compensations.is_empty() {
return Ok(());
}
#[cfg(feature = "prometheus")]
{
use cdk_prometheus::METRICS;
self.mint.record_swap_failure("process_swap_request");
METRICS.dec_in_flight_requests("process_swap_request");
}
tracing::warn!("Running {} compensating actions", compensations.len());
while let Some(compensation) = compensations.pop_front() {
tracing::debug!("Running compensation: {}", compensation.name());
if let Err(e) = compensation.execute(&self.db).await {
tracing::error!(
"Compensation {} failed: {}. Continuing...",
compensation.name(),
e
);
}
}
// Delete saga - swap was compensated
// Use a separate transaction since compensations already ran
// Don't fail the compensation if saga cleanup fails (log only)
let mut tx = match self.db.begin_transaction().await {
Ok(tx) => tx,
Err(e) => {
tracing::error!(
"Failed to begin tx for saga cleanup after compensation: {}",
e
);
return Ok(()); // Compensations already ran, don't fail now
}
};
if let Err(e) = tx.delete_saga(self.operation.id()).await {
tracing::warn!("Failed to delete saga after compensation: {}", e);
} else if let Err(e) = tx.commit().await {
tracing::error!("Failed to commit saga cleanup after compensation: {}", e);
}
// Always succeed - compensations are done, saga cleanup is best-effort
Ok(())
}
}

View File

@@ -0,0 +1,26 @@
use cdk_common::nuts::{BlindSignature, BlindedMessage};
use cdk_common::PublicKey;
/// Initial state - no data yet.
///
/// The swap saga starts in this state. Only the `setup_swap` method is available.
pub struct Initial;
/// Setup complete - has blinded messages and input Y values.
///
/// After successful setup, the saga transitions to this state.
/// Only the `sign_outputs` method is available.
pub struct SetupComplete {
pub blinded_messages: Vec<BlindedMessage>,
pub ys: Vec<PublicKey>,
}
/// Signed state - has everything including signatures.
///
/// After successful signing, the saga transitions to this state.
/// Only the `finalize` method is available.
pub struct Signed {
pub blinded_messages: Vec<BlindedMessage>,
pub ys: Vec<PublicKey>,
pub signatures: Vec<BlindSignature>,
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,218 @@
#![cfg(test)]
//! Test helpers for creating test mints and related utilities
use std::collections::{HashMap, HashSet};
use std::str::FromStr;
use std::sync::Arc;
use std::time::Duration;
use bip39::Mnemonic;
use cdk_common::amount::SplitTarget;
use cdk_common::dhke::construct_proofs;
use cdk_common::nuts::{BlindedMessage, CurrencyUnit, Id, PaymentMethod, PreMintSecrets, Proofs};
use cdk_common::{
Amount, MintQuoteBolt11Request, MintQuoteBolt11Response, MintQuoteState, MintRequest,
};
use cdk_fake_wallet::FakeWallet;
use tokio::time::sleep;
use crate::mint::{Mint, MintBuilder, MintMeltLimits};
use crate::types::{FeeReserve, QuoteTTL};
use crate::Error;
#[cfg(test)]
pub(crate) fn should_fail_in_test() -> bool {
// Some condition that determines when to fail in tests
std::env::var("TEST_FAIL").is_ok()
}
#[cfg(test)]
pub(crate) fn should_fail_for(operation: &str) -> bool {
// Check for specific failure modes using environment variables
// Format: TEST_FAIL_<OPERATION>
let var_name = format!("TEST_FAIL_{}", operation);
std::env::var(&var_name).is_ok()
}
/// Creates and starts a test mint with in-memory storage and a fake Lightning backend.
///
/// This mint can be used for unit tests without requiring external dependencies
/// like Lightning nodes or persistent databases.
///
/// # Example
///
/// ```
/// use cdk::test_helpers::mint::create_test_mint;
///
/// #[tokio::test]
/// async fn test_something() {
/// let mint = create_test_mint().await.unwrap();
/// // Use the mint for testing
/// }
/// ```
pub async fn create_test_mint() -> Result<Mint, Error> {
let db = Arc::new(cdk_sqlite::mint::memory::empty().await?);
let mut mint_builder = MintBuilder::new(db.clone());
let fee_reserve = FeeReserve {
min_fee_reserve: 1.into(),
percent_fee_reserve: 1.0,
};
let ln_fake_backend = FakeWallet::new(
fee_reserve.clone(),
HashMap::default(),
HashSet::default(),
2,
CurrencyUnit::Sat,
);
mint_builder
.add_payment_processor(
CurrencyUnit::Sat,
PaymentMethod::Bolt11,
MintMeltLimits::new(1, 10_000),
Arc::new(ln_fake_backend),
)
.await?;
let mnemonic = Mnemonic::generate(12).map_err(|e| Error::Custom(e.to_string()))?;
mint_builder = mint_builder
.with_name("test mint".to_string())
.with_description("test mint for unit tests".to_string())
.with_urls(vec!["https://test-mint".to_string()]);
let quote_ttl = QuoteTTL::new(10000, 10000);
let mint = mint_builder
.build_with_seed(db.clone(), &mnemonic.to_seed_normalized(""))
.await?;
mint.set_quote_ttl(quote_ttl).await?;
mint.start().await?;
Ok(mint)
}
/// Creates test proofs by performing a mock mint operation.
///
/// This helper creates valid proofs for the given amount by:
/// 1. Creating blinded messages
/// 2. Performing a swap to get signatures
/// 3. Constructing valid proofs from the signatures
///
/// # Arguments
///
/// * `mint` - The test mint to use for creating proofs
/// * `amount` - The total amount to create proofs for
pub async fn mint_test_proofs(mint: &Mint, amount: Amount) -> Result<Proofs, Error> {
// Just use fund_mint_with_proofs which creates proofs via swap
let mint_quote: MintQuoteBolt11Response<_> = mint
.get_mint_quote(
MintQuoteBolt11Request {
amount,
unit: CurrencyUnit::Sat,
description: None,
pubkey: None,
}
.into(),
)
.await?
.into();
loop {
let check: MintQuoteBolt11Response<_> = mint
.check_mint_quote(&cdk_common::QuoteId::from_str(&mint_quote.quote).unwrap())
.await
.unwrap()
.into();
if check.state == MintQuoteState::Paid {
break;
}
sleep(Duration::from_secs(1)).await;
}
let keysets = mint
.get_active_keysets()
.get(&CurrencyUnit::Sat)
.unwrap()
.clone();
let keys = mint
.keyset_pubkeys(&keysets)?
.keysets
.first()
.unwrap()
.keys
.clone();
let fees: (u64, Vec<u64>) = (
0,
keys.iter().map(|a| a.0.to_u64()).collect::<Vec<_>>().into(),
);
let premint_secrets =
PreMintSecrets::random(keysets, amount, &SplitTarget::None, &fees.into()).unwrap();
let request = MintRequest {
quote: mint_quote.quote,
outputs: premint_secrets.blinded_messages(),
signature: None,
};
let mint_res = mint
.process_mint_request(request.try_into().unwrap())
.await?;
Ok(construct_proofs(
mint_res.signatures,
premint_secrets.rs(),
premint_secrets.secrets(),
&keys,
)?)
}
/// Creates test blinded messages for the given amount.
///
/// This is useful for testing operations that require blinded messages as input.
///
/// # Arguments
///
/// * `mint` - The test mint (used to get the active keyset)
/// * `amount` - The total amount to create blinded messages for
///
/// # Returns
///
/// A tuple containing:
/// - Vector of blinded messages
/// - PreMintSecrets (needed to construct proofs later)
pub async fn create_test_blinded_messages(
mint: &Mint,
amount: Amount,
) -> Result<(Vec<BlindedMessage>, PreMintSecrets), Error> {
let keyset_id = get_active_keyset_id(mint).await?;
let split_target = SplitTarget::default();
let fee_and_amounts = (0, ((0..32).map(|x| 2u64.pow(x)).collect::<Vec<_>>())).into();
let pre_mint = PreMintSecrets::random(keyset_id, amount, &split_target, &fee_and_amounts)?;
let blinded_messages = pre_mint.blinded_messages().to_vec();
Ok((blinded_messages, pre_mint))
}
/// Gets the active keyset ID from the mint.
pub async fn get_active_keyset_id(mint: &Mint) -> Result<Id, Error> {
let keys = mint
.pubkeys()
.keysets
.first()
.ok_or(Error::Internal)?
.clone();
keys.verify_id()?;
Ok(keys.id)
}

View File

@@ -0,0 +1,10 @@
#![cfg(test)]
//! Test helper utilities for CDK unit tests
//!
//! This module provides shared test utilities for creating test mints, wallets,
//! and test data without external dependencies (Lightning nodes, databases).
//!
//! These helpers are only compiled when running tests.
#[cfg(feature = "mint")]
pub mod mint;