refactor(cdk): implement saga pattern for melt operations (#1186)

Restructure melt flow into a multi-step saga with compensation-based rollback.
Remove ProofWriter in favor of explicit compensation actions for improved
reliability and clarity in handling partial failures during melt operations.

Breaks down monolithic change processing logic into smaller, focused methods:
- process_change_outputs: orchestrates full change workflow
- validate_change_outputs: checks for already-signed messages
- calculate_change_fee_and_amounts: fetches keyset configuration
- split_change_amount: splits change into denominations
- prepare_blinded_messages_with_amounts: pairs amounts with blinded messages
- store_change_signatures: handles TX2 database operations
This commit is contained in:
tsk
2025-11-03 11:40:21 -05:00
committed by GitHub
parent aa8258d955
commit d9e001bee6
16 changed files with 4197 additions and 1059 deletions

View File

@@ -31,7 +31,7 @@ impl Melted {
pub fn from_proofs(
state: MeltQuoteState,
preimage: Option<String>,
amount: Amount,
quote_amount: Amount,
proofs: Proofs,
change_proofs: Option<Proofs>,
) -> Result<Self, Error> {
@@ -44,19 +44,19 @@ impl Melted {
tracing::info!(
"Proofs amount: {} Amount: {} Change: {}",
proofs_amount,
amount,
quote_amount,
change_amount
);
let fee_paid = proofs_amount
.checked_sub(amount + change_amount)
.checked_sub(quote_amount + change_amount)
.ok_or(Error::AmountOverflow)?;
Ok(Self {
state,
preimage,
change: change_proofs,
amount,
amount: quote_amount,
fee_paid,
})
}

View File

@@ -85,16 +85,47 @@ impl FromStr for SwapSagaState {
}
}
/// States specific to melt saga
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum MeltSagaState {
/// Setup complete (proofs reserved, quote verified)
SetupComplete,
/// Payment sent to Lightning network
PaymentSent,
}
impl fmt::Display for MeltSagaState {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
MeltSagaState::SetupComplete => write!(f, "setup_complete"),
MeltSagaState::PaymentSent => write!(f, "payment_sent"),
}
}
}
impl FromStr for MeltSagaState {
type Err = Error;
fn from_str(value: &str) -> Result<Self, Self::Err> {
let value = value.to_lowercase();
match value.as_str() {
"setup_complete" => Ok(MeltSagaState::SetupComplete),
"payment_sent" => Ok(MeltSagaState::PaymentSent),
_ => Err(Error::Custom(format!("Invalid melt saga state: {}", value))),
}
}
}
/// Saga state for different operation types
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum SagaStateEnum {
/// Swap saga states
Swap(SwapSagaState),
/// Melt saga states
Melt(MeltSagaState),
// Future: Mint saga states
// Mint(MintSagaState),
// Future: Melt saga states
// Melt(MeltSagaState),
}
impl SagaStateEnum {
@@ -102,8 +133,8 @@ impl SagaStateEnum {
pub fn new(operation_kind: OperationKind, s: &str) -> Result<Self, Error> {
match operation_kind {
OperationKind::Swap => Ok(SagaStateEnum::Swap(SwapSagaState::from_str(s)?)),
OperationKind::Melt => Ok(SagaStateEnum::Melt(MeltSagaState::from_str(s)?)),
OperationKind::Mint => Err(Error::Custom("Mint saga not implemented yet".to_string())),
OperationKind::Melt => Err(Error::Custom("Melt saga not implemented yet".to_string())),
}
}
@@ -114,6 +145,10 @@ impl SagaStateEnum {
SwapSagaState::SetupComplete => "setup_complete",
SwapSagaState::Signed => "signed",
},
SagaStateEnum::Melt(state) => match state {
MeltSagaState::SetupComplete => "setup_complete",
MeltSagaState::PaymentSent => "payment_sent",
},
}
}
}
@@ -131,6 +166,9 @@ pub struct Saga {
pub blinded_secrets: Vec<PublicKey>,
/// Y values (public keys) from input proofs
pub input_ys: Vec<PublicKey>,
/// Quote ID for melt operations (used for payment status lookup during recovery)
/// None for swap operations
pub quote_id: Option<String>,
/// Unix timestamp when saga was created
pub created_at: u64,
/// Unix timestamp when saga was last updated
@@ -152,6 +190,7 @@ impl Saga {
state: SagaStateEnum::Swap(state),
blinded_secrets,
input_ys,
quote_id: None,
created_at: now,
updated_at: now,
}
@@ -162,6 +201,33 @@ impl Saga {
self.state = SagaStateEnum::Swap(new_state);
self.updated_at = unix_time();
}
/// Create new melt saga
pub fn new_melt(
operation_id: Uuid,
state: MeltSagaState,
input_ys: Vec<PublicKey>,
blinded_secrets: Vec<PublicKey>,
quote_id: String,
) -> Self {
let now = unix_time();
Self {
operation_id,
operation_kind: OperationKind::Melt,
state: SagaStateEnum::Melt(state),
blinded_secrets,
input_ys,
quote_id: Some(quote_id),
created_at: now,
updated_at: now,
}
}
/// Update melt saga state
pub fn update_melt_state(&mut self, new_state: MeltSagaState) {
self.state = SagaStateEnum::Melt(new_state);
self.updated_at = unix_time();
}
}
/// Operation

View File

@@ -134,7 +134,7 @@ async fn test_fake_melt_payment_fail() {
}
let wallet_bal = wallet.total_balance().await.unwrap();
assert_eq!(wallet_bal, 100.into());
assert_eq!(wallet_bal, 98.into());
}
/// Tests that when both the pay_invoice and check_invoice both fail,
@@ -222,6 +222,16 @@ async fn test_fake_melt_payment_return_fail_status() {
let melt = wallet.melt(&melt_quote.id).await;
assert!(melt.is_err());
wallet.check_all_pending_proofs().await.unwrap();
let pending = wallet
.localstore
.get_proofs(None, None, Some(vec![State::Pending]), None)
.await
.unwrap();
assert!(pending.is_empty());
let fake_description = FakeInvoiceDescription {
pay_invoice_state: MeltQuoteState::Unknown,
check_payment_state: MeltQuoteState::Unknown,
@@ -237,13 +247,15 @@ async fn test_fake_melt_payment_return_fail_status() {
let melt = wallet.melt(&melt_quote.id).await;
assert!(melt.is_err());
wallet.check_all_pending_proofs().await.unwrap();
let pending = wallet
.localstore
.get_proofs(None, None, Some(vec![State::Pending]), None)
.await
.unwrap();
assert!(pending.is_empty());
assert!(!pending.is_empty());
}
/// Tests that when the ln backend returns an error with unknown status,
@@ -282,7 +294,7 @@ async fn test_fake_melt_payment_error_unknown() {
// The melt should error at the payment invoice command
let melt = wallet.melt(&melt_quote.id).await;
assert_eq!(melt.unwrap_err().to_string(), "Payment failed");
assert!(melt.is_err());
let fake_description = FakeInvoiceDescription {
pay_invoice_state: MeltQuoteState::Unknown,
@@ -297,7 +309,9 @@ async fn test_fake_melt_payment_error_unknown() {
// The melt should error at the payment invoice command
let melt = wallet.melt(&melt_quote.id).await;
assert_eq!(melt.unwrap_err().to_string(), "Payment failed");
assert!(melt.is_err());
wallet.check_all_pending_proofs().await.unwrap();
let pending = wallet
.localstore
@@ -305,7 +319,7 @@ async fn test_fake_melt_payment_error_unknown() {
.await
.unwrap();
assert!(pending.is_empty());
assert!(!pending.is_empty());
}
/// Tests that when the ln backend returns an error but the second check returns paid,
@@ -343,10 +357,10 @@ async fn test_fake_melt_payment_err_paid() {
let melt_quote = wallet.melt_quote(invoice.to_string(), None).await.unwrap();
// The melt should error at the payment invoice command
let melt = wallet.melt(&melt_quote.id).await;
assert!(melt.is_err());
let melt = wallet.melt(&melt_quote.id).await.unwrap();
attempt_to_swap_pending(&wallet).await.unwrap();
assert!(melt.fee_paid == Amount::ZERO);
assert!(melt.amount == Amount::from(7));
}
/// Tests that change outputs in a melt quote are correctly handled

View File

@@ -17,8 +17,10 @@ CREATE TABLE IF NOT EXISTS saga_state (
state TEXT NOT NULL,
blinded_secrets TEXT NOT NULL,
input_ys TEXT NOT NULL,
quote_id TEXT,
created_at BIGINT NOT NULL,
updated_at BIGINT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_saga_state_operation_kind ON saga_state(operation_kind);
CREATE INDEX IF NOT EXISTS idx_saga_state_quote_id ON saga_state(quote_id);

View File

@@ -17,8 +17,10 @@ CREATE TABLE IF NOT EXISTS saga_state (
state TEXT NOT NULL,
blinded_secrets TEXT NOT NULL,
input_ys TEXT NOT NULL,
quote_id TEXT,
created_at INTEGER NOT NULL,
updated_at INTEGER NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_saga_state_operation_kind ON saga_state(operation_kind);
CREATE INDEX IF NOT EXISTS idx_saga_state_quote_id ON saga_state(quote_id);

View File

@@ -2146,6 +2146,7 @@ where
state,
blinded_secrets,
input_ys,
quote_id,
created_at,
updated_at
FROM
@@ -2174,9 +2175,9 @@ where
query(
r#"
INSERT INTO saga_state
(operation_id, operation_kind, state, blinded_secrets, input_ys, created_at, updated_at)
(operation_id, operation_kind, state, blinded_secrets, input_ys, quote_id, created_at, updated_at)
VALUES
(:operation_id, :operation_kind, :state, :blinded_secrets, :input_ys, :created_at, :updated_at)
(:operation_id, :operation_kind, :state, :blinded_secrets, :input_ys, :quote_id, :created_at, :updated_at)
"#,
)?
.bind("operation_id", saga.operation_id.to_string())
@@ -2184,6 +2185,7 @@ where
.bind("state", saga.state.state())
.bind("blinded_secrets", blinded_secrets_json)
.bind("input_ys", input_ys_json)
.bind("quote_id", saga.quote_id.as_deref())
.bind("created_at", saga.created_at as i64)
.bind("updated_at", current_time as i64)
.execute(&self.inner)
@@ -2250,6 +2252,7 @@ where
state,
blinded_secrets,
input_ys,
quote_id,
created_at,
updated_at
FROM
@@ -2539,6 +2542,7 @@ fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
state,
blinded_secrets,
input_ys,
quote_id,
created_at,
updated_at
) = row
@@ -2564,6 +2568,18 @@ fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
let input_ys: Vec<PublicKey> = serde_json::from_str(&input_ys_str)
.map_err(|e| Error::Internal(format!("Failed to deserialize input_ys: {}", e)))?;
let quote_id = match &quote_id {
Column::Text(s) => {
if s.is_empty() {
None
} else {
Some(s.clone())
}
}
Column::Null => None,
_ => None,
};
let created_at: u64 = column_as_number!(created_at);
let updated_at: u64 = column_as_number!(updated_at);
@@ -2573,6 +2589,7 @@ fn sql_row_to_saga(row: Vec<Column>) -> Result<mint::Saga, Error> {
state,
blinded_secrets,
input_ys,
quote_id,
created_at,
updated_at,
})

View File

@@ -1,15 +1,11 @@
use std::str::FromStr;
use anyhow::bail;
use cdk_common::amount::amount_for_offer;
use cdk_common::database::mint::MeltRequestInfo;
use cdk_common::database::{self, MintTransaction};
use cdk_common::melt::MeltQuoteRequest;
use cdk_common::mint::{MeltPaymentRequest, Operation};
use cdk_common::mint::MeltPaymentRequest;
use cdk_common::nut05::MeltMethodOptions;
use cdk_common::payment::{
Bolt11OutgoingPaymentOptions, Bolt12OutgoingPaymentOptions, DynMintPayment,
OutgoingPaymentOptions, PaymentIdentifier,
Bolt11OutgoingPaymentOptions, Bolt12OutgoingPaymentOptions, OutgoingPaymentOptions,
};
use cdk_common::quote_id::QuoteId;
use cdk_common::{MeltOptions, MeltQuoteBolt12Request};
@@ -20,18 +16,18 @@ use tracing::instrument;
use super::{
CurrencyUnit, MeltQuote, MeltQuoteBolt11Request, MeltQuoteBolt11Response, MeltRequest, Mint,
PaymentMethod, PublicKey, State,
PaymentMethod,
};
use crate::amount::to_unit;
use crate::cdk_payment::MakePaymentResponse;
use crate::mint::proof_writer::ProofWriter;
use crate::mint::verification::Verification;
use crate::mint::SigFlag;
use crate::nuts::nut11::{enforce_sig_flag, EnforceSigFlag};
use crate::nuts::MeltQuoteState;
use crate::types::PaymentProcessorKey;
use crate::util::unix_time;
use crate::{cdk_payment, ensure_cdk, Amount, Error};
use crate::{ensure_cdk, Amount, Error};
mod melt_saga;
pub(super) mod shared;
use melt_saga::MeltSaga;
impl Mint {
#[instrument(skip_all)]
@@ -418,663 +414,33 @@ impl Mint {
Ok(quotes)
}
/// Check melt has expected fees
#[instrument(skip_all)]
pub async fn check_melt_expected_ln_fees(
&self,
melt_quote: &MeltQuote,
melt_request: &MeltRequest<QuoteId>,
) -> Result<Option<Amount>, Error> {
let quote_msats = to_unit(melt_quote.amount, &melt_quote.unit, &CurrencyUnit::Msat)
.expect("Quote unit is checked above that it can convert to msat");
let invoice_amount_msats = match &melt_quote.request {
MeltPaymentRequest::Bolt11 { bolt11 } => match bolt11.amount_milli_satoshis() {
Some(amount) => amount.into(),
None => melt_quote
.options
.ok_or(Error::InvoiceAmountUndefined)?
.amount_msat(),
},
MeltPaymentRequest::Bolt12 { offer } => match offer.amount() {
Some(amount) => {
let (amount, currency) = match amount {
lightning::offers::offer::Amount::Bitcoin { amount_msats } => {
(amount_msats, CurrencyUnit::Msat)
}
lightning::offers::offer::Amount::Currency {
iso4217_code,
amount,
} => (
amount,
CurrencyUnit::from_str(&String::from_utf8(iso4217_code.to_vec())?)?,
),
};
to_unit(amount, &currency, &CurrencyUnit::Msat)
.map_err(|_err| Error::UnsupportedUnit)?
}
None => melt_quote
.options
.ok_or(Error::InvoiceAmountUndefined)?
.amount_msat(),
},
};
let partial_amount = match invoice_amount_msats > quote_msats {
true => Some(
to_unit(quote_msats, &CurrencyUnit::Msat, &melt_quote.unit)
.map_err(|_| Error::UnsupportedUnit)?,
),
false => None,
};
let amount_to_pay = match partial_amount {
Some(amount_to_pay) => amount_to_pay,
None => to_unit(invoice_amount_msats, &CurrencyUnit::Msat, &melt_quote.unit)
.map_err(|_| Error::UnsupportedUnit)?,
};
let inputs_amount_quote_unit = melt_request.inputs_amount().map_err(|_| {
tracing::error!("Proof inputs in melt quote overflowed");
Error::AmountOverflow
})?;
if amount_to_pay + melt_quote.fee_reserve > inputs_amount_quote_unit {
tracing::debug!(
"Not enough inputs provided: {} {} needed {} {}",
inputs_amount_quote_unit,
melt_quote.unit,
amount_to_pay,
melt_quote.unit
);
return Err(Error::TransactionUnbalanced(
inputs_amount_quote_unit.into(),
amount_to_pay.into(),
melt_quote.fee_reserve.into(),
));
}
Ok(partial_amount)
}
/// Verify melt request is valid
#[instrument(skip_all)]
pub async fn verify_melt_request(
&self,
tx: &mut Box<dyn MintTransaction<'_, database::Error> + Send + Sync + '_>,
input_verification: Verification,
melt_request: &MeltRequest<QuoteId>,
operation: &Operation,
) -> Result<(ProofWriter, MeltQuote), Error> {
let Verification {
amount: input_amount,
unit: input_unit,
} = input_verification;
let mut proof_writer =
ProofWriter::new(self.localstore.clone(), self.pubsub_manager.clone());
proof_writer
.add_proofs(
tx,
melt_request.inputs(),
Some(melt_request.quote_id().to_owned()),
operation,
)
.await?;
// Only after proof verification succeeds, proceed with quote state check
let (state, quote) = tx
.update_melt_quote_state(melt_request.quote(), MeltQuoteState::Pending, None)
.await?;
if input_unit != Some(quote.unit.clone()) {
return Err(Error::UnitMismatch);
}
match state {
MeltQuoteState::Unpaid | MeltQuoteState::Failed => Ok(()),
MeltQuoteState::Pending => Err(Error::PendingQuote),
MeltQuoteState::Paid => Err(Error::PaidQuote),
MeltQuoteState::Unknown => Err(Error::UnknownPaymentState),
}?;
self.pubsub_manager
.melt_quote_status(&quote, None, None, MeltQuoteState::Pending);
let fee = self.get_proofs_fee(melt_request.inputs()).await?;
let required_total = quote.amount + quote.fee_reserve + fee;
// Check that the inputs proofs are greater then total.
// Transaction does not need to be balanced as wallet may not want change.
if input_amount < required_total {
tracing::info!(
"Swap request unbalanced: {}, outputs {}, fee {}",
input_amount,
quote.amount,
fee
);
return Err(Error::TransactionUnbalanced(
input_amount.into(),
quote.amount.into(),
(fee + quote.fee_reserve).into(),
));
}
let EnforceSigFlag { sig_flag, .. } = enforce_sig_flag(melt_request.inputs().clone());
if sig_flag == SigFlag::SigAll {
melt_request.verify_sig_all()?;
}
if let Some(outputs) = &melt_request.outputs() {
if !outputs.is_empty() {
let Verification {
amount: _,
unit: output_unit,
} = self.verify_outputs(tx, outputs).await?;
ensure_cdk!(input_unit == output_unit, Error::UnsupportedUnit);
}
}
tracing::debug!("Verified melt quote: {}", melt_request.quote());
Ok((proof_writer, quote))
}
/// Melt Bolt11
/// Melt
///
/// Uses MeltSaga typestate pattern for atomic transaction handling with automatic rollback on failure.
#[instrument(skip_all)]
pub async fn melt(
&self,
melt_request: &MeltRequest<QuoteId>,
) -> Result<MeltQuoteBolt11Response<QuoteId>, Error> {
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("melt_bolt11");
use std::sync::Arc;
async fn check_payment_state(
ln: DynMintPayment,
lookup_id: &PaymentIdentifier,
) -> anyhow::Result<MakePaymentResponse> {
match ln.check_outgoing_payment(lookup_id).await {
Ok(response) => Ok(response),
Err(check_err) => {
// If we cannot check the status of the payment we keep the proofs stuck as pending.
tracing::error!(
"Could not check the status of payment for {},. Proofs stuck as pending",
lookup_id
);
tracing::error!("Checking payment error: {}", check_err);
bail!("Could not check payment status")
}
}
}
let verification = self.verify_inputs(melt_request.inputs()).await?;
let melt_operation = Operation::new_melt();
let mut tx = self.localstore.begin_transaction().await?;
let (proof_writer, quote) = match self
.verify_melt_request(&mut tx, verification, melt_request, &melt_operation)
.await
{
Ok(result) => result,
Err(err) => {
tracing::debug!("Error attempting to verify melt quote: {}", err);
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("melt_bolt11");
METRICS.record_mint_operation("melt_bolt11", false);
METRICS.record_error();
}
return Err(err);
}
};
let inputs_fee = self.get_proofs_fee(melt_request.inputs()).await?;
tx.add_melt_request(
melt_request.quote_id(),
melt_request.inputs_amount()?,
inputs_fee,
)
.await?;
tx.add_blinded_messages(
Some(melt_request.quote_id()),
melt_request.outputs().as_ref().unwrap_or(&Vec::new()),
&melt_operation,
)
.await?;
let settled_internally_amount = match self
.handle_internal_melt_mint(&mut tx, &quote, melt_request)
.await
{
Ok(amount) => amount,
Err(err) => {
tracing::error!("Attempting to settle internally failed: {}", err);
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("melt_bolt11");
METRICS.record_mint_operation("melt_bolt11", false);
METRICS.record_error();
}
return Err(err);
}
};
let (tx, preimage, amount_spent_quote_unit, quote) = match settled_internally_amount {
Some(amount_spent) => (tx, None, amount_spent, quote),
None => {
// If the quote unit is SAT or MSAT we can check that the expected fees are
// provided. We also check if the quote is less then the invoice
// amount in the case that it is a mmp However, if the quote is not
// of a bitcoin unit we cannot do these checks as the mint
// is unaware of a conversion rate. In this case it is assumed that the quote is
// correct and the mint should pay the full invoice amount if inputs
// > `then quote.amount` are included. This is checked in the
// `verify_melt` method.
let _partial_amount = match quote.unit {
CurrencyUnit::Sat | CurrencyUnit::Msat => {
match self.check_melt_expected_ln_fees(&quote, melt_request).await {
Ok(amount) => amount,
Err(err) => {
tracing::error!("Fee is not expected: {}", err);
return Err(Error::Internal);
}
}
}
_ => None,
};
let ln = match self.payment_processors.get(&PaymentProcessorKey::new(
quote.unit.clone(),
quote.payment_method.clone(),
)) {
Some(ln) => ln,
None => {
tracing::info!("Could not get ln backend for {}, bolt11 ", quote.unit);
return Err(Error::UnsupportedUnit);
}
};
// Commit before talking to the external call
tx.commit().await?;
let pre = match ln
.make_payment(&quote.unit, quote.clone().try_into()?)
.await
{
Ok(pay)
if pay.status == MeltQuoteState::Unknown
|| pay.status == MeltQuoteState::Failed =>
{
tracing::warn!("Got {} status when paying melt quote {} for {} {}. Checking with backend...", pay.status, quote.id, quote.amount, quote.unit);
let check_response = if let Ok(ok) =
check_payment_state(Arc::clone(ln), &pay.payment_lookup_id).await
{
ok
} else {
return Err(Error::Internal);
};
if check_response.status == MeltQuoteState::Paid {
tracing::warn!("Pay invoice returned {} but check returned {}. Proofs stuck as pending", pay.status.to_string(), check_response.status.to_string());
proof_writer.commit();
return Err(Error::Internal);
}
check_response
}
Ok(pay) => pay,
Err(err) => {
// If the error is that the invoice was already paid we do not want to hold
// hold the proofs as pending to we reset them and return an error.
if matches!(err, cdk_payment::Error::InvoiceAlreadyPaid) {
tracing::debug!("Invoice already paid, resetting melt quote");
return Err(Error::RequestAlreadyPaid);
}
tracing::error!("Error returned attempting to pay: {} {}", quote.id, err);
let lookup_id = quote.request_lookup_id.as_ref().ok_or_else(|| {
tracing::error!(
"No payment id could not lookup payment for {} after error.",
quote.id
);
Error::Internal
})?;
let check_response =
if let Ok(ok) = check_payment_state(Arc::clone(ln), lookup_id).await {
ok
} else {
proof_writer.commit();
return Err(Error::Internal);
};
// If there error is something else we want to check the status of the payment ensure it is not pending or has been made.
if check_response.status == MeltQuoteState::Paid {
tracing::warn!("Pay invoice returned an error but check returned {}. Proofs stuck as pending", check_response.status.to_string());
proof_writer.commit();
return Err(Error::Internal);
}
check_response
}
};
match pre.status {
MeltQuoteState::Paid => (),
MeltQuoteState::Unpaid | MeltQuoteState::Unknown | MeltQuoteState::Failed => {
tracing::info!(
"Lightning payment for quote {} failed.",
melt_request.quote()
);
proof_writer.rollback().await?;
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("melt_bolt11");
METRICS.record_mint_operation("melt_bolt11", false);
METRICS.record_error();
}
return Err(Error::PaymentFailed);
}
MeltQuoteState::Pending => {
tracing::warn!(
"LN payment pending, proofs are stuck as pending for quote: {}",
melt_request.quote()
);
proof_writer.commit();
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("melt_bolt11");
METRICS.record_mint_operation("melt_bolt11", false);
METRICS.record_error();
}
return Err(Error::PendingQuote);
}
}
// Convert from unit of backend to quote unit
// Note: this should never fail since these conversions happen earlier and would fail there.
// Since it will not fail and even if it does the ln payment has already been paid, proofs should still be burned
let amount_spent =
to_unit(pre.total_spent, &pre.unit, &quote.unit).unwrap_or_default();
let payment_lookup_id = pre.payment_lookup_id;
let mut tx = self.localstore.begin_transaction().await?;
if Some(payment_lookup_id.clone()).as_ref() != quote.request_lookup_id.as_ref() {
tracing::info!(
"Payment lookup id changed post payment from {:?} to {}",
&quote.request_lookup_id,
payment_lookup_id
);
let mut melt_quote = quote;
melt_quote.request_lookup_id = Some(payment_lookup_id.clone());
if let Err(err) = tx
.update_melt_quote_request_lookup_id(&melt_quote.id, &payment_lookup_id)
.await
{
tracing::warn!("Could not update payment lookup id: {}", err);
}
(tx, pre.payment_proof, amount_spent, melt_quote)
} else {
(tx, pre.payment_proof, amount_spent, quote)
}
}
};
// If we made it here the payment has been made.
// We process the melt burning the inputs and returning change
let res = match self
.process_melt_request(tx, proof_writer, quote, preimage, amount_spent_quote_unit)
.await
{
Ok(response) => response,
Err(err) => {
tracing::error!("Could not process melt request: {}", err);
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("melt_bolt11");
METRICS.record_mint_operation("melt_bolt11", false);
METRICS.record_error();
}
return Err(err);
}
};
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("melt_bolt11");
METRICS.record_mint_operation("melt_bolt11", true);
}
Ok(res)
}
/// Process melt request marking proofs as spent
/// The melt request must be verified using [`Self::verify_melt_request`]
/// before calling [`Self::process_melt_request`]
#[instrument(skip_all)]
pub async fn process_melt_request(
&self,
mut tx: Box<dyn MintTransaction<'_, database::Error> + Send + Sync + '_>,
mut proof_writer: ProofWriter,
quote: MeltQuote,
payment_preimage: Option<String>,
total_spent: Amount,
) -> Result<MeltQuoteBolt11Response<QuoteId>, Error> {
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("process_melt_request");
// Try to get input_ys from the stored melt request, fall back to original request if not found
let input_ys: Vec<_> = tx.get_proof_ys_by_quote_id(&quote.id).await?;
assert!(!input_ys.is_empty());
tracing::debug!(
"Updating {} proof states to Spent for quote {}",
input_ys.len(),
quote.id
let init_saga = MeltSaga::new(
std::sync::Arc::new(self.clone()),
self.localstore.clone(),
std::sync::Arc::clone(&self.pubsub_manager),
);
if total_spent < quote.amount {
return Err(Error::AmountUndefined);
}
// Step 1: Setup (TX1 - reserves inputs and outputs)
let setup_saga = init_saga.setup_melt(melt_request, verification).await?;
let update_proof_states_result = proof_writer
.update_proofs_states(&mut tx, &input_ys, State::Spent)
.await;
// Step 2: Attempt internal settlement (returns saga + SettlementDecision)
// Note: Compensation is handled internally if this fails
let (setup_saga, settlement) = setup_saga.attempt_internal_settlement(melt_request).await?;
if update_proof_states_result.is_err() {
#[cfg(feature = "prometheus")]
self.record_melt_quote_failure("process_melt_request");
return Err(update_proof_states_result.err().unwrap());
}
tracing::debug!("Successfully updated proof states to Spent");
// Step 3: Make payment (internal or external)
let payment_saga = setup_saga.make_payment(settlement).await?;
tx.update_melt_quote_state(&quote.id, MeltQuoteState::Paid, payment_preimage.clone())
.await?;
let mut change = None;
let MeltRequestInfo {
inputs_amount,
inputs_fee,
change_outputs,
} = tx
.get_melt_request_and_blinded_messages(&quote.id)
.await?
.ok_or(Error::UnknownQuote)?;
// Check if there is change to return
if inputs_amount > total_spent {
// Check if wallet provided change outputs
if !change_outputs.is_empty() {
let outputs = change_outputs;
let blinded_messages: Vec<PublicKey> =
outputs.iter().map(|b| b.blinded_secret).collect();
if tx
.get_blind_signatures(&blinded_messages)
.await?
.iter()
.flatten()
.next()
.is_some()
{
tracing::info!("Output has already been signed");
return Err(Error::BlindedMessageAlreadySigned);
}
let change_target = inputs_amount - total_spent - inputs_fee;
let fee_and_amounts = self
.keysets
.load()
.iter()
.filter_map(|keyset| {
if keyset.active && Some(keyset.id) == outputs.first().map(|x| x.keyset_id)
{
Some((keyset.input_fee_ppk, keyset.amounts.clone()).into())
} else {
None
}
})
.next()
.unwrap_or_else(|| {
(0, (0..32).map(|x| 2u64.pow(x)).collect::<Vec<_>>()).into()
});
let mut amounts = change_target.split(&fee_and_amounts);
if outputs.len().lt(&amounts.len()) {
tracing::debug!(
"Providing change requires {} blinded messages, but only {} provided",
amounts.len(),
outputs.len()
);
// In the case that not enough outputs are provided to return all change
// Reverse sort the amounts so that the most amount of change possible is
// returned. The rest is burnt
amounts.sort_by(|a, b| b.cmp(a));
}
let mut blinded_messages = vec![];
for (amount, mut blinded_message) in amounts.iter().zip(outputs.clone()) {
blinded_message.amount = *amount;
blinded_messages.push(blinded_message);
}
// commit db transaction before calling the signatory
tx.commit().await?;
let change_sigs = self.blind_sign(blinded_messages).await?;
let mut tx = self.localstore.begin_transaction().await?;
tx.add_blind_signatures(
&outputs[0..change_sigs.len()]
.iter()
.map(|o| o.blinded_secret)
.collect::<Vec<PublicKey>>(),
&change_sigs,
Some(quote.id.clone()),
)
.await?;
change = Some(change_sigs);
proof_writer.commit();
tx.delete_melt_request(&quote.id).await?;
tx.commit().await?;
} else {
tracing::info!(
"Inputs for {} {} greater then spent on melt {} but change outputs not provided.",
quote.id,
inputs_amount,
total_spent
);
proof_writer.commit();
tx.delete_melt_request(&quote.id).await?;
tx.commit().await?;
}
} else {
tracing::debug!("No change required for melt {}", quote.id);
proof_writer.commit();
tx.delete_melt_request(&quote.id).await?;
tx.commit().await?;
}
self.pubsub_manager.melt_quote_status(
&quote,
payment_preimage.clone(),
change.clone(),
MeltQuoteState::Paid,
);
tracing::debug!(
"Melt for quote {} completed total spent {}, total inputs: {}, change given: {}",
quote.id,
total_spent,
inputs_amount,
change
.as_ref()
.map(|c| Amount::try_sum(c.iter().map(|a| a.amount))
.expect("Change cannot overflow"))
.unwrap_or_default()
);
let response = MeltQuoteBolt11Response {
amount: quote.amount,
paid: Some(true),
payment_preimage,
change,
quote: quote.id,
fee_reserve: quote.fee_reserve,
state: MeltQuoteState::Paid,
expiry: quote.expiry,
request: Some(quote.request.to_string()),
unit: Some(quote.unit.clone()),
};
#[cfg(feature = "prometheus")]
{
METRICS.dec_in_flight_requests("process_melt_request");
METRICS.record_mint_operation("process_melt_request", true);
}
Ok(response)
}
#[cfg(feature = "prometheus")]
fn record_melt_quote_failure(&self, operation: &str) {
METRICS.dec_in_flight_requests(operation);
METRICS.record_mint_operation(operation, false);
METRICS.record_error();
// Step 4: Finalize (TX2 - marks spent, issues change)
payment_saga.finalize().await
}
}

View File

@@ -0,0 +1,65 @@
//! Compensation actions for the melt saga pattern.
//!
//! When a saga step fails, compensating actions are executed in reverse order (LIFO)
//! to undo all completed steps and restore the database to its pre-saga state.
use async_trait::async_trait;
use cdk_common::database::DynMintDatabase;
use cdk_common::{Error, PublicKey, QuoteId};
use tracing::instrument;
/// Trait for compensating actions in the saga pattern.
///
/// Compensating actions are registered as steps complete and executed in reverse
/// order (LIFO) if the saga fails. Each action should be idempotent.
#[async_trait]
pub trait CompensatingAction: Send + Sync {
async fn execute(&self, db: &DynMintDatabase) -> Result<(), Error>;
fn name(&self) -> &'static str;
}
/// Compensation action to remove melt setup and reset quote state.
///
/// This compensation is used when payment fails or finalization fails after
/// the setup transaction has committed. It removes:
/// - Input proofs (identified by input_ys)
/// - Output blinded messages (identified by blinded_secrets)
/// - Melt request tracking record
///
/// And resets:
/// - Quote state from Pending back to Unpaid
///
/// This restores the database to its pre-melt state, allowing the user to retry.
pub struct RemoveMeltSetup {
/// Y values (public keys) from the input proofs
pub input_ys: Vec<PublicKey>,
/// Blinded secrets (B values) from the change output blinded messages
pub blinded_secrets: Vec<PublicKey>,
/// Quote ID to reset state
pub quote_id: QuoteId,
}
#[async_trait]
impl CompensatingAction for RemoveMeltSetup {
#[instrument(skip_all)]
async fn execute(&self, db: &DynMintDatabase) -> Result<(), Error> {
tracing::info!(
"Compensation: Removing melt setup for quote {} ({} proofs, {} blinded messages)",
self.quote_id,
self.input_ys.len(),
self.blinded_secrets.len()
);
super::super::shared::rollback_melt_quote(
db,
&self.quote_id,
&self.input_ys,
&self.blinded_secrets,
)
.await
}
fn name(&self) -> &'static str {
"RemoveMeltSetup"
}
}

View File

@@ -0,0 +1,909 @@
use std::collections::VecDeque;
use std::sync::Arc;
use cdk_common::amount::to_unit;
use cdk_common::database::mint::MeltRequestInfo;
use cdk_common::database::DynMintDatabase;
use cdk_common::mint::{MeltSagaState, Operation, Saga};
use cdk_common::nuts::MeltQuoteState;
use cdk_common::{Amount, Error, ProofsMethods, PublicKey, QuoteId, State};
#[cfg(feature = "prometheus")]
use cdk_prometheus::METRICS;
use tokio::sync::Mutex;
use tracing::instrument;
use self::compensation::{CompensatingAction, RemoveMeltSetup};
use self::state::{Initial, PaymentConfirmed, SettlementDecision, SetupComplete};
use crate::cdk_payment::MakePaymentResponse;
use crate::mint::subscription::PubSubManager;
use crate::mint::verification::Verification;
use crate::mint::{MeltQuoteBolt11Response, MeltRequest};
mod compensation;
mod state;
#[cfg(test)]
mod tests;
/// Saga pattern implementation for atomic melt operations.
///
/// # Why Use the Saga Pattern for Melt?
///
/// The melt operation is more complex than swap because it involves:
/// 1. Database transactions (setup and finalize)
/// 2. External payment operations (Lightning Network)
/// 3. Uncertain payment states (pending/unknown)
/// 4. Change calculation based on actual payment amount
///
/// Traditional ACID transactions cannot span:
/// 1. Multiple database transactions (TX1: setup, TX2: finalize)
/// 2. External payment operations (LN backend calls)
/// 3. Asynchronous payment confirmation
///
/// The saga pattern solves this by:
/// - Breaking the operation into discrete steps with clear state transitions
/// - Recording compensating actions for each forward step
/// - Automatically rolling back via compensations if any step fails
/// - Handling payment state uncertainty explicitly
///
/// # Transaction Boundaries
///
/// - **TX1 (setup_melt)**: Atomically verifies quote, adds input proofs (pending),
/// adds change output blinded messages, creates melt request tracking record
/// - **Payment (make_payment)**: Non-transactional external LN payment operation
/// - **TX2 (finalize)**: Atomically updates quote state, marks inputs spent,
/// signs change outputs, deletes tracking record
///
/// # Expected Flow
///
/// 1. **setup_melt**: Verifies and reserves inputs, prepares change outputs
/// - Compensation: Removes inputs, outputs, resets quote state if later steps fail
/// 2. **make_payment**: Calls LN backend to make payment
/// - Triggers compensation if payment fails
/// - Special handling for pending/unknown states
/// 3. **finalize**: Commits the melt, issues change, marks complete
/// - Triggers compensation if finalization fails
/// - Clears compensations on success (melt complete)
///
/// # Failure Handling
///
/// If any step fails after setup_melt, all compensating actions are executed in reverse
/// order to restore the database to its pre-melt state. This ensures no partial melts
/// leave the system in an inconsistent state.
///
/// # Payment State Complexity
///
/// Unlike swap, melt must handle uncertain payment states:
/// - **Paid**: Proceed to finalize
/// - **Failed/Unpaid**: Compensate and return error
/// - **Pending/Unknown**: Proofs remain pending, saga cannot complete
/// (current behavior: leave proofs pending, return error for manual intervention)
///
/// # Typestate Pattern
///
/// This saga uses the **typestate pattern** to enforce state transitions at compile-time.
/// Each state (Initial, SetupComplete, PaymentConfirmed) is a distinct type, and operations
/// are only available on the appropriate type:
///
/// ```text
/// MeltSaga<Initial>
/// └─> setup_melt() -> MeltSaga<SetupComplete>
/// ├─> attempt_internal_settlement() -> SettlementDecision (conditional)
/// └─> make_payment(SettlementDecision) -> MeltSaga<PaymentConfirmed>
/// └─> finalize() -> MeltQuoteBolt11Response
/// ```
///
/// **Benefits:**
/// - Invalid state transitions (e.g., `finalize()` before `make_payment()`) won't compile
/// - State-specific data (e.g., payment_result) only exists in the appropriate state type
/// - No runtime state checks or `Option<T>` unwrapping needed
/// - IDE autocomplete only shows valid operations for each state
pub struct MeltSaga<S> {
mint: Arc<super::Mint>,
db: DynMintDatabase,
pubsub: Arc<PubSubManager>,
/// Compensating actions in LIFO order (most recent first)
compensations: Arc<Mutex<VecDeque<Box<dyn CompensatingAction>>>>,
/// Operation for tracking
operation: Operation,
/// Tracks if metrics were incremented (for cleanup)
#[cfg(feature = "prometheus")]
metrics_incremented: bool,
/// State-specific data
state_data: S,
}
impl MeltSaga<Initial> {
pub fn new(mint: Arc<super::Mint>, db: DynMintDatabase, pubsub: Arc<PubSubManager>) -> Self {
#[cfg(feature = "prometheus")]
METRICS.inc_in_flight_requests("melt_bolt11");
Self {
mint,
db,
pubsub,
compensations: Arc::new(Mutex::new(VecDeque::new())),
operation: Operation::new_melt(),
#[cfg(feature = "prometheus")]
metrics_incremented: true,
state_data: Initial,
}
}
/// Sets up the melt by atomically verifying and reserving inputs/outputs.
///
/// This is the first transaction (TX1) in the saga and must complete before payment.
///
/// # What This Does
///
/// Within a single database transaction:
/// 1. Verifies the melt request (inputs, quote state, balance)
/// 2. Adds input proofs to the database with Pending state
/// 3. Updates quote state from Unpaid/Failed to Pending
/// 4. Adds change output blinded messages to the database
/// 5. Creates melt request tracking record
/// 6. Publishes proof state changes via pubsub
///
/// # Compensation
///
/// Registers a compensation action that will:
/// - Remove input proofs
/// - Remove blinded messages
/// - Reset quote state from Pending to Unpaid
/// - Delete melt request tracking record
///
/// This compensation runs if payment or finalization fails.
///
/// # Errors
///
/// - `PendingQuote`: Quote is already in Pending state
/// - `PaidQuote`: Quote has already been paid
/// - `TokenAlreadySpent`: Input proofs have already been spent
/// - `UnitMismatch`: Input unit doesn't match quote unit
#[instrument(skip_all)]
pub async fn setup_melt(
self,
melt_request: &MeltRequest<QuoteId>,
input_verification: Verification,
) -> Result<MeltSaga<SetupComplete>, Error> {
tracing::info!("TX1: Setting up melt (verify + inputs + outputs)");
let Verification {
amount: input_amount,
unit: input_unit,
} = input_verification;
let mut tx = self.db.begin_transaction().await?;
// Add proofs to the database
if let Err(err) = tx
.add_proofs(
melt_request.inputs().clone(),
Some(melt_request.quote_id().to_owned()),
&self.operation,
)
.await
{
tx.rollback().await?;
return Err(match err {
cdk_common::database::Error::Duplicate => Error::TokenPending,
cdk_common::database::Error::AttemptUpdateSpentProof => Error::TokenAlreadySpent,
err => Error::Database(err),
});
}
let input_ys = melt_request.inputs().ys()?;
// Update proof states to Pending
let original_states = match tx.update_proofs_states(&input_ys, State::Pending).await {
Ok(states) => states,
Err(cdk_common::database::Error::AttemptUpdateSpentProof)
| Err(cdk_common::database::Error::AttemptRemoveSpentProof) => {
tx.rollback().await?;
return Err(Error::TokenAlreadySpent);
}
Err(err) => {
tx.rollback().await?;
return Err(err.into());
}
};
// Check for forbidden states (Pending or Spent)
let has_forbidden_state = original_states
.iter()
.any(|state| matches!(state, Some(State::Pending) | Some(State::Spent)));
if has_forbidden_state {
tx.rollback().await?;
return Err(
if original_states
.iter()
.any(|s| matches!(s, Some(State::Pending)))
{
Error::TokenPending
} else {
Error::TokenAlreadySpent
},
);
}
// Publish proof state changes
for pk in input_ys.iter() {
self.pubsub.proof_state((*pk, State::Pending));
}
// Update quote state to Pending
let (state, quote) = tx
.update_melt_quote_state(melt_request.quote(), MeltQuoteState::Pending, None)
.await?;
if input_unit != Some(quote.unit.clone()) {
tx.rollback().await?;
return Err(Error::UnitMismatch);
}
match state {
MeltQuoteState::Unpaid | MeltQuoteState::Failed => {}
MeltQuoteState::Pending => {
tx.rollback().await?;
return Err(Error::PendingQuote);
}
MeltQuoteState::Paid => {
tx.rollback().await?;
return Err(Error::PaidQuote);
}
MeltQuoteState::Unknown => {
tx.rollback().await?;
return Err(Error::UnknownPaymentState);
}
}
self.pubsub
.melt_quote_status(&quote, None, None, MeltQuoteState::Pending);
let fee = self.mint.get_proofs_fee(melt_request.inputs()).await?;
let required_total = quote.amount + quote.fee_reserve + fee;
if input_amount < required_total {
tracing::info!(
"Melt request unbalanced: inputs {}, amount {}, fee {}",
input_amount,
quote.amount,
fee
);
tx.rollback().await?;
return Err(Error::TransactionUnbalanced(
input_amount.into(),
quote.amount.into(),
(fee + quote.fee_reserve).into(),
));
}
// Verify outputs if provided
if let Some(outputs) = &melt_request.outputs() {
if !outputs.is_empty() {
let output_verification = match self.mint.verify_outputs(&mut tx, outputs).await {
Ok(verification) => verification,
Err(err) => {
tx.rollback().await?;
return Err(err);
}
};
if input_unit != output_verification.unit {
tx.rollback().await?;
return Err(Error::UnitMismatch);
}
}
}
let inputs_fee = self.mint.get_proofs_fee(melt_request.inputs()).await?;
// Add melt request tracking record
tx.add_melt_request(
melt_request.quote_id(),
melt_request.inputs_amount()?,
inputs_fee,
)
.await?;
// Add change output blinded messages
tx.add_blinded_messages(
Some(melt_request.quote_id()),
melt_request.outputs().as_ref().unwrap_or(&Vec::new()),
&self.operation,
)
.await?;
// Get blinded secrets for compensation
let blinded_secrets: Vec<PublicKey> = melt_request
.outputs()
.as_ref()
.unwrap_or(&Vec::new())
.iter()
.map(|bm| bm.blinded_secret)
.collect();
// Persist saga state for crash recovery (atomic with TX1)
let saga = Saga::new_melt(
*self.operation.id(),
MeltSagaState::SetupComplete,
input_ys.clone(),
blinded_secrets.clone(),
quote.id.to_string(),
);
if let Err(err) = tx.add_saga(&saga).await {
tx.rollback().await?;
return Err(err.into());
}
tx.commit().await?;
// Store blinded messages for state
let blinded_messages_vec = melt_request.outputs().clone().unwrap_or_default();
// Register compensation (uses LIFO via push_front)
let compensations = Arc::clone(&self.compensations);
compensations
.lock()
.await
.push_front(Box::new(RemoveMeltSetup {
input_ys: input_ys.clone(),
blinded_secrets,
quote_id: quote.id.clone(),
}));
// Transition to SetupComplete state
Ok(MeltSaga {
mint: self.mint,
db: self.db,
pubsub: self.pubsub,
compensations: self.compensations,
operation: self.operation,
#[cfg(feature = "prometheus")]
metrics_incremented: self.metrics_incremented,
state_data: SetupComplete {
quote,
input_ys,
blinded_messages: blinded_messages_vec,
},
})
}
}
impl MeltSaga<SetupComplete> {
/// Attempts to settle the melt internally (melt-to-mint on same mint).
///
/// This checks if the payment request corresponds to an existing mint quote
/// on the same mint, and if so, settles it atomically within a transaction.
///
/// # What This Does
///
/// Within a single database transaction:
/// 1. Checks if payment request matches a mint quote on this mint
/// 2. If not a match or different unit: returns (self, RequiresExternalPayment)
/// 3. If match found: validates quote state and amount
/// 4. Increments the mint quote's paid amount
/// 5. Publishes mint quote payment notification
/// 6. Returns (self, Internal{amount})
///
/// # Compensation
///
/// If internal settlement fails, this method automatically calls compensate_all()
/// to roll back the setup_melt changes before returning the error. The saga is
/// consumed on error, so the caller cannot continue.
///
/// # Returns
///
/// - `Ok((self, Internal{amount}))`: Internal settlement succeeded, saga can continue
/// - `Ok((self, RequiresExternalPayment))`: Not an internal payment, saga can continue
/// - `Err(_)`: Internal settlement attempted but failed (compensations executed, saga consumed)
///
/// # Errors
///
/// - `RequestAlreadyPaid`: Mint quote already settled
/// - `InsufficientFunds`: Not enough input proofs for mint quote amount
/// - `Internal`: Database error during settlement
#[instrument(skip_all)]
pub async fn attempt_internal_settlement(
self,
melt_request: &MeltRequest<QuoteId>,
) -> Result<(Self, SettlementDecision), Error> {
tracing::info!("Checking for internal settlement opportunity");
let mut tx = self.db.begin_transaction().await?;
let mint_quote = match tx
.get_mint_quote_by_request(&self.state_data.quote.request.to_string())
.await
{
Ok(Some(mint_quote)) if mint_quote.unit == self.state_data.quote.unit => mint_quote,
Ok(_) => {
tx.rollback().await?;
tracing::debug!("Not an internal payment or unit mismatch");
return Ok((self, SettlementDecision::RequiresExternalPayment));
}
Err(err) => {
tx.rollback().await?;
tracing::debug!("Error checking for mint quote: {}", err);
self.compensate_all().await?;
return Err(Error::Internal);
}
};
// Mint quote has already been settled
if (mint_quote.state() == cdk_common::nuts::MintQuoteState::Issued
|| mint_quote.state() == cdk_common::nuts::MintQuoteState::Paid)
&& mint_quote.payment_method == crate::mint::PaymentMethod::Bolt11
{
tx.rollback().await?;
self.compensate_all().await?;
return Err(Error::RequestAlreadyPaid);
}
let inputs_amount_quote_unit = melt_request.inputs_amount().map_err(|_| {
tracing::error!("Proof inputs in melt quote overflowed");
Error::AmountOverflow
})?;
if let Some(amount) = mint_quote.amount {
if amount > inputs_amount_quote_unit {
tracing::debug!(
"Not enough inputs provided: {} needed {}",
inputs_amount_quote_unit,
amount
);
tx.rollback().await?;
self.compensate_all().await?;
return Err(Error::InsufficientFunds);
}
}
let amount = self.state_data.quote.amount;
tracing::info!(
"Mint quote {} paid {} from internal payment.",
mint_quote.id,
amount
);
let total_paid = tx
.increment_mint_quote_amount_paid(
&mint_quote.id,
amount,
self.state_data.quote.id.to_string(),
)
.await?;
self.pubsub.mint_quote_payment(&mint_quote, total_paid);
tracing::info!(
"Melt quote {} paid Mint quote {}",
self.state_data.quote.id,
mint_quote.id
);
tx.commit().await?;
Ok((self, SettlementDecision::Internal { amount }))
}
/// Makes payment via Lightning Network backend or internal settlement.
///
/// This is an external operation that happens after `setup_melt` and before `finalize`.
/// No database changes occur in this step (except for internal settlement case).
///
/// # What This Does
///
/// 1. Takes a SettlementDecision from attempt_internal_settlement
/// 2. If Internal: creates payment result directly
/// 3. If RequiresExternalPayment: calls LN backend
/// 4. Handles payment result states with idempotent verification
/// 5. Transitions to PaymentConfirmed state on success
///
/// # Idempotent Payment Verification
///
/// Lightning payments are asynchronous, and the LN backend may return different
/// states for the same payment query due to:
/// - Network latency between payment initiation and confirmation
/// - Backend database replication lag
/// - HTLC settlement timing
///
/// **Critical Principle**: If `check_payment_state()` confirms the payment as Paid,
/// we MUST proceed to finalize, regardless of what `make_payment()` initially returned.
/// This ensures the saga is idempotent with respect to payment confirmation.
///
/// # Failure Handling
///
/// If payment is confirmed as failed/unpaid, all registered compensations are
/// executed to roll back the setup transaction.
///
/// # Errors
///
/// - `PaymentFailed`: Payment confirmed as failed/unpaid
/// - `PendingQuote`: Payment is pending (will be resolved by startup check)
#[instrument(skip_all)]
pub async fn make_payment(
self,
settlement: SettlementDecision,
) -> Result<MeltSaga<PaymentConfirmed>, Error> {
tracing::info!("Making payment (external LN operation or internal settlement)");
let payment_result = match settlement {
SettlementDecision::Internal { amount } => {
tracing::info!(
"Payment settled internally for {} {}",
amount,
self.state_data.quote.unit
);
MakePaymentResponse {
status: MeltQuoteState::Paid,
total_spent: amount,
unit: self.state_data.quote.unit.clone(),
payment_proof: None,
payment_lookup_id: self
.state_data
.quote
.request_lookup_id
.clone()
.unwrap_or_else(|| {
cdk_common::payment::PaymentIdentifier::CustomId(
self.state_data.quote.id.to_string(),
)
}),
}
}
SettlementDecision::RequiresExternalPayment => {
// Get LN payment processor
let ln = self
.mint
.payment_processors
.get(&crate::types::PaymentProcessorKey::new(
self.state_data.quote.unit.clone(),
self.state_data.quote.payment_method.clone(),
))
.ok_or_else(|| {
tracing::info!(
"Could not get ln backend for {}, {}",
self.state_data.quote.unit,
self.state_data.quote.payment_method
);
Error::UnsupportedUnit
})?;
// Make payment with idempotent verification
let payment_response = match ln
.make_payment(
&self.state_data.quote.unit,
self.state_data.quote.clone().try_into()?,
)
.await
{
Ok(pay)
if pay.status == MeltQuoteState::Unknown
|| pay.status == MeltQuoteState::Failed =>
{
tracing::warn!(
"Got {} status when paying melt quote {} for {} {}. Verifying with backend...",
pay.status,
self.state_data.quote.id,
self.state_data.quote.amount,
self.state_data.quote.unit
);
let check_response = self
.check_payment_state(Arc::clone(ln), &pay.payment_lookup_id)
.await?;
if check_response.status == MeltQuoteState::Paid {
// Race condition: Payment succeeded during verification
tracing::info!(
"Payment initially returned {} but confirmed as Paid. Proceeding to finalize.",
pay.status
);
check_response
} else {
check_response
}
}
Ok(pay) => pay,
Err(err) => {
if matches!(err, crate::cdk_payment::Error::InvoiceAlreadyPaid) {
tracing::info!("Invoice already paid, verifying payment status");
} else {
// Other error - check if payment actually succeeded
tracing::error!(
"Error returned attempting to pay: {} {}",
self.state_data.quote.id,
err
);
}
let lookup_id = self
.state_data
.quote
.request_lookup_id
.as_ref()
.ok_or_else(|| {
tracing::error!(
"No payment id, cannot verify payment status for {} after error",
self.state_data.quote.id
);
Error::Internal
})?;
let check_response =
self.check_payment_state(Arc::clone(ln), lookup_id).await?;
tracing::info!(
"Initial payment attempt for {} errored. Follow up check stateus: {}",
self.state_data.quote.id,
check_response.status
);
check_response
}
};
match payment_response.status {
MeltQuoteState::Paid => payment_response,
MeltQuoteState::Unpaid | MeltQuoteState::Failed => {
tracing::info!(
"Lightning payment for quote {} failed.",
self.state_data.quote.id
);
self.compensate_all().await?;
return Err(Error::PaymentFailed);
}
MeltQuoteState::Unknown => {
tracing::warn!(
"LN payment unknown, proofs remain pending for quote: {}",
self.state_data.quote.id
);
return Err(Error::PaymentFailed);
}
MeltQuoteState::Pending => {
tracing::warn!(
"LN payment pending, proofs remain pending for quote: {}",
self.state_data.quote.id
);
return Err(Error::PendingQuote);
}
}
}
};
// TODO: Add total spent > quote check
// Transition to PaymentConfirmed state
Ok(MeltSaga {
mint: self.mint,
db: self.db,
pubsub: self.pubsub,
compensations: self.compensations,
operation: self.operation,
#[cfg(feature = "prometheus")]
metrics_incremented: self.metrics_incremented,
state_data: PaymentConfirmed {
quote: self.state_data.quote,
input_ys: self.state_data.input_ys,
blinded_messages: self.state_data.blinded_messages,
payment_result,
},
})
}
/// Helper to check payment state with LN backend
async fn check_payment_state(
&self,
ln: Arc<
dyn cdk_common::payment::MintPayment<Err = cdk_common::payment::Error> + Send + Sync,
>,
lookup_id: &cdk_common::payment::PaymentIdentifier,
) -> Result<MakePaymentResponse, Error> {
match ln.check_outgoing_payment(lookup_id).await {
Ok(response) => Ok(response),
Err(check_err) => {
tracing::error!(
"Could not check the status of payment for {}. Proofs stuck as pending",
lookup_id
);
tracing::error!("Checking payment error: {}", check_err);
Err(Error::Internal)
}
}
}
}
impl MeltSaga<PaymentConfirmed> {
/// Finalizes the melt by committing signatures and marking inputs as spent.
///
/// This is the second and final transaction (TX2) in the saga and completes the melt.
///
/// # What This Does
///
/// Within a single database transaction:
/// 1. Updates quote state to Paid
/// 2. Updates payment lookup ID if changed
/// 3. Marks input proofs as Spent
/// 4. Calculates and signs change outputs (if applicable)
/// 5. Deletes melt request tracking record
/// 6. Publishes quote status changes via pubsub
/// 7. Clears all registered compensations (melt successfully completed)
///
/// # Change Handling
///
/// If inputs > total_spent:
/// - If change outputs were provided: sign them and return
/// - If no change outputs: change is burnt (logged as info)
///
/// # Success
///
/// On success, compensations are cleared and the melt is complete.
///
/// # Errors
///
/// - `TokenAlreadySpent`: Input proofs were already spent
/// - `BlindedMessageAlreadySigned`: Change outputs already signed
#[instrument(skip_all)]
pub async fn finalize(self) -> Result<MeltQuoteBolt11Response<QuoteId>, Error> {
tracing::info!("TX2: Finalizing melt (mark spent + change)");
let total_spent = to_unit(
self.state_data.payment_result.total_spent,
&self.state_data.payment_result.unit,
&self.state_data.quote.unit,
)
.unwrap_or_default();
let payment_preimage = self.state_data.payment_result.payment_proof.clone();
let payment_lookup_id = &self.state_data.payment_result.payment_lookup_id;
let mut tx = self.db.begin_transaction().await?;
// Get melt request info first (needed for validation and change)
let MeltRequestInfo {
inputs_amount,
inputs_fee,
change_outputs,
} = tx
.get_melt_request_and_blinded_messages(&self.state_data.quote.id)
.await?
.ok_or(Error::UnknownQuote)?;
// Use shared core finalization logic
if let Err(err) = super::shared::finalize_melt_core(
&mut tx,
&self.pubsub,
&self.state_data.quote,
&self.state_data.input_ys,
inputs_amount,
inputs_fee,
total_spent,
payment_preimage.clone(),
payment_lookup_id,
)
.await
{
tx.rollback().await?;
self.compensate_all().await?;
return Err(err);
}
let needs_change = inputs_amount > total_spent;
// Handle change: either sign change outputs or just commit TX1
let (change, mut tx) = if !needs_change {
// No change required - just commit TX1
tracing::debug!("No change required for melt {}", self.state_data.quote.id);
(None, tx)
} else {
// We commit tx here as process_change can make external call to blind sign
// We do not want to hold db txs across external calls
tx.commit().await?;
super::shared::process_melt_change(
&self.mint,
&self.db,
&self.state_data.quote.id,
inputs_amount,
total_spent,
inputs_fee,
change_outputs,
)
.await?
};
tx.delete_melt_request(&self.state_data.quote.id).await?;
// Delete saga - melt completed successfully (best-effort)
if let Err(e) = tx.delete_saga(self.operation.id()).await {
tracing::warn!("Failed to delete saga in finalize: {}", e);
// Don't rollback - melt succeeded
}
tx.commit().await?;
self.pubsub.melt_quote_status(
&self.state_data.quote,
payment_preimage.clone(),
change.clone(),
MeltQuoteState::Paid,
);
tracing::debug!(
"Melt for quote {} completed total spent {}, total inputs: {}, change given: {}",
self.state_data.quote.id,
total_spent,
inputs_amount,
change
.as_ref()
.map(|c| Amount::try_sum(c.iter().map(|a| a.amount))
.expect("Change cannot overflow"))
.unwrap_or_default()
);
self.compensations.lock().await.clear();
#[cfg(feature = "prometheus")]
if self.metrics_incremented {
METRICS.dec_in_flight_requests("melt_bolt11");
METRICS.record_mint_operation("melt_bolt11", true);
}
let response = MeltQuoteBolt11Response {
amount: self.state_data.quote.amount,
paid: Some(true),
payment_preimage,
change,
quote: self.state_data.quote.id,
fee_reserve: self.state_data.quote.fee_reserve,
state: MeltQuoteState::Paid,
expiry: self.state_data.quote.expiry,
request: Some(self.state_data.quote.request.to_string()),
unit: Some(self.state_data.quote.unit.clone()),
};
Ok(response)
}
}
impl<S> MeltSaga<S> {
/// Execute all compensating actions and consume the saga.
///
/// This method takes ownership of self to ensure the saga cannot be used
/// after compensation has been triggered.
///
/// This is called internally by saga methods when they need to compensate.
#[instrument(skip_all)]
async fn compensate_all(self) -> Result<(), Error> {
let mut compensations = self.compensations.lock().await;
if compensations.is_empty() {
return Ok(());
}
#[cfg(feature = "prometheus")]
if self.metrics_incremented {
METRICS.dec_in_flight_requests("melt_bolt11");
METRICS.record_mint_operation("melt_bolt11", false);
METRICS.record_error();
}
tracing::warn!("Running {} compensating actions", compensations.len());
while let Some(compensation) = compensations.pop_front() {
tracing::debug!("Running compensation: {}", compensation.name());
if let Err(e) = compensation.execute(&self.db).await {
tracing::error!(
"Compensation {} failed: {}. Continuing...",
compensation.name(),
e
);
}
}
Ok(())
}
}

View File

@@ -0,0 +1,46 @@
use cdk_common::nuts::BlindedMessage;
use cdk_common::{Amount, PublicKey};
use crate::cdk_payment::MakePaymentResponse;
use crate::mint::MeltQuote;
/// Initial state - no data yet.
///
/// The melt saga starts in this state. Only the `setup_melt` method is available.
pub struct Initial;
/// Setup complete - has quote, input Ys, and blinded messages.
///
/// After successful setup, the saga transitions to this state.
/// The `attempt_internal_settlement` and `make_payment` methods are available.
pub struct SetupComplete {
pub quote: MeltQuote,
pub input_ys: Vec<PublicKey>,
pub blinded_messages: Vec<BlindedMessage>,
}
/// Payment confirmed - has everything including payment result.
///
/// After successful payment (internal or external), the saga transitions to this state.
/// Only the `finalize` method is available.
pub struct PaymentConfirmed {
pub quote: MeltQuote,
pub input_ys: Vec<PublicKey>,
#[allow(dead_code)] // Stored for completeness, accessed from DB in finalize
pub blinded_messages: Vec<BlindedMessage>,
pub payment_result: MakePaymentResponse,
}
/// Result of attempting internal settlement for a melt operation.
///
/// This enum represents the decision point in the melt flow:
/// - Internal settlement succeeded → skip external Lightning payment
/// - External payment required → proceed with Lightning Network call
#[derive(Debug, Clone)]
pub enum SettlementDecision {
/// Payment was settled internally (melt-to-mint on the same mint).
/// Contains the amount that was settled.
Internal { amount: Amount },
/// Payment requires external Lightning Network settlement.
RequiresExternalPayment,
}

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,443 @@
//! Shared logic for melt operations across saga and startup check.
//!
//! This module contains common functions used by both:
//! - `melt_saga`: Normal melt operation flow
//! - `start_up_check`: Recovery of interrupted melts during startup
//!
//! The functions here ensure consistency between these two code paths.
use cdk_common::database::{self, DynMintDatabase};
use cdk_common::nuts::{BlindSignature, BlindedMessage, MeltQuoteState, State};
use cdk_common::{Amount, Error, PublicKey, QuoteId};
use cdk_signatory::signatory::SignatoryKeySet;
use crate::mint::subscription::PubSubManager;
use crate::mint::MeltQuote;
/// Retrieves fee and amount configuration for the keyset matching the change outputs.
///
/// Searches active keysets for one matching the first output's keyset_id.
/// Used during change calculation for melts.
///
/// # Arguments
///
/// * `keysets` - Arc reference to the loaded keysets
/// * `outputs` - Change output blinded messages
///
/// # Returns
///
/// Fee per thousand and allowed amounts for the keyset, or default if not found
pub fn get_keyset_fee_and_amounts(
keysets: &arc_swap::ArcSwap<Vec<SignatoryKeySet>>,
outputs: &[BlindedMessage],
) -> cdk_common::amount::FeeAndAmounts {
keysets
.load()
.iter()
.filter_map(|keyset| {
if keyset.active && Some(keyset.id) == outputs.first().map(|x| x.keyset_id) {
Some((keyset.input_fee_ppk, keyset.amounts.clone()).into())
} else {
None
}
})
.next()
.unwrap_or_else(|| (0, (0..32).map(|x| 2u64.pow(x)).collect::<Vec<_>>()).into())
}
/// Rolls back a melt quote by removing all setup artifacts and resetting state.
///
/// This function is used by both:
/// - `melt_saga::compensation::RemoveMeltSetup` when saga fails
/// - `start_up_check::rollback_failed_melt_quote` when recovering failed payments
///
/// # What This Does
///
/// Within a single database transaction:
/// 1. Removes input proofs from database
/// 2. Removes change output blinded messages
/// 3. Resets quote state from Pending to Unpaid
/// 4. Deletes melt request tracking record
///
/// This restores the database to its pre-melt state, allowing retry.
///
/// # Arguments
///
/// * `db` - Database connection
/// * `quote_id` - ID of the quote to rollback
/// * `input_ys` - Y values (public keys) from input proofs
/// * `blinded_secrets` - Blinded secrets from change outputs
///
/// # Errors
///
/// Returns database errors if transaction fails
pub async fn rollback_melt_quote(
db: &DynMintDatabase,
quote_id: &QuoteId,
input_ys: &[PublicKey],
blinded_secrets: &[PublicKey],
) -> Result<(), Error> {
if input_ys.is_empty() && blinded_secrets.is_empty() {
return Ok(());
}
tracing::info!(
"Rolling back melt quote {} ({} proofs, {} blinded messages)",
quote_id,
input_ys.len(),
blinded_secrets.len()
);
let mut tx = db.begin_transaction().await?;
// Remove input proofs
if !input_ys.is_empty() {
tx.remove_proofs(input_ys, Some(quote_id.clone())).await?;
}
// Remove blinded messages (change outputs)
if !blinded_secrets.is_empty() {
tx.delete_blinded_messages(blinded_secrets).await?;
}
// Reset quote state from Pending to Unpaid
let (previous_state, _quote) = tx
.update_melt_quote_state(quote_id, MeltQuoteState::Unpaid, None)
.await?;
if previous_state != MeltQuoteState::Pending {
tracing::warn!(
"Unexpected quote state during rollback: expected Pending, got {}",
previous_state
);
}
// Delete melt request tracking record
tx.delete_melt_request(quote_id).await?;
tx.commit().await?;
tracing::info!("Successfully rolled back melt quote {}", quote_id);
Ok(())
}
/// Processes change for a melt operation.
///
/// This function handles the complete change workflow:
/// 1. Calculate change target amount
/// 2. Split into denominations based on keyset configuration
/// 3. Sign change outputs (external call to blind_sign)
/// 4. Store signatures in database (new transaction)
///
/// # Transaction Management
///
/// This function expects that the caller has already committed or will rollback
/// their current transaction before calling. It will:
/// - Call blind_sign (external, no DB lock held)
/// - Open a new transaction to store signatures
/// - Return the new transaction for the caller to commit
///
/// # Arguments
///
/// * `mint` - Mint instance (for keysets and blind_sign)
/// * `db` - Database connection
/// * `quote_id` - Quote ID for associating signatures
/// * `inputs_amount` - Total amount from input proofs
/// * `total_spent` - Amount spent on payment
/// * `inputs_fee` - Fee paid for inputs
/// * `change_outputs` - Blinded messages for change
///
/// # Returns
///
/// Tuple of:
/// - `Option<Vec<BlindSignature>>` - Signed change outputs (if any)
/// - `Box<dyn MintTransaction>` - New transaction with signatures stored
///
/// # Errors
///
/// Returns error if:
/// - Change calculation fails
/// - Blind signing fails
/// - Database operations fail
pub async fn process_melt_change<'a>(
mint: &super::super::Mint,
db: &'a DynMintDatabase,
quote_id: &QuoteId,
inputs_amount: Amount,
total_spent: Amount,
inputs_fee: Amount,
change_outputs: Vec<BlindedMessage>,
) -> Result<
(
Option<Vec<BlindSignature>>,
Box<dyn database::MintTransaction<'a, database::Error> + Send + Sync + 'a>,
),
Error,
> {
// Check if change is needed
let needs_change = inputs_amount > total_spent;
if !needs_change || change_outputs.is_empty() {
// No change needed - open transaction and return empty result
let tx = db.begin_transaction().await?;
return Ok((None, tx));
}
let change_target = inputs_amount - total_spent - inputs_fee;
// Get keyset configuration
let fee_and_amounts = get_keyset_fee_and_amounts(&mint.keysets, &change_outputs);
// Split change into denominations
let mut amounts = change_target.split(&fee_and_amounts);
if change_outputs.len() < amounts.len() {
tracing::debug!(
"Providing change requires {} blinded messages, but only {} provided",
amounts.len(),
change_outputs.len()
);
amounts.sort_by(|a, b| b.cmp(a));
}
// Prepare blinded messages with amounts
let mut blinded_messages_to_sign = vec![];
for (amount, mut blinded_message) in amounts.iter().zip(change_outputs.iter().cloned()) {
blinded_message.amount = *amount;
blinded_messages_to_sign.push(blinded_message);
}
// External call: sign change outputs (no DB transaction held)
let change_sigs = mint.blind_sign(blinded_messages_to_sign.clone()).await?;
// Open new transaction to store signatures
let mut tx = db.begin_transaction().await?;
let blinded_secrets: Vec<_> = blinded_messages_to_sign
.iter()
.map(|bm| bm.blinded_secret)
.collect();
tx.add_blind_signatures(&blinded_secrets, &change_sigs, Some(quote_id.clone()))
.await?;
Ok((Some(change_sigs), tx))
}
/// Finalizes a melt quote by updating proofs, quote state, and publishing changes.
///
/// This function performs the core finalization operations that are common to both
/// the saga finalize step and startup check recovery:
/// 1. Validates amounts (total_spent vs quote amount, inputs vs total_spent)
/// 2. Marks input proofs as SPENT
/// 3. Publishes proof state changes
/// 4. Updates quote state to PAID
/// 5. Updates payment lookup ID if changed
/// 6. Deletes melt request tracking
///
/// # Transaction Management
///
/// This function expects an open transaction and will NOT commit it.
/// The caller is responsible for committing the transaction.
///
/// # Arguments
///
/// * `tx` - Open database transaction
/// * `pubsub` - Pubsub manager for state notifications
/// * `quote` - Melt quote being finalized
/// * `input_ys` - Y values of input proofs
/// * `inputs_amount` - Total amount from inputs
/// * `inputs_fee` - Fee for inputs
/// * `total_spent` - Amount spent on payment
/// * `payment_preimage` - Payment preimage (if any)
/// * `payment_lookup_id` - Payment lookup identifier
///
/// # Returns
///
/// `Ok(())` if finalization succeeds
///
/// # Errors
///
/// Returns error if:
/// - Amount validation fails
/// - Proofs are already spent
/// - Database operations fail
#[allow(clippy::too_many_arguments)]
pub async fn finalize_melt_core(
tx: &mut Box<dyn database::MintTransaction<'_, database::Error> + Send + Sync + '_>,
pubsub: &PubSubManager,
quote: &MeltQuote,
input_ys: &[PublicKey],
inputs_amount: Amount,
inputs_fee: Amount,
total_spent: Amount,
payment_preimage: Option<String>,
payment_lookup_id: &cdk_common::payment::PaymentIdentifier,
) -> Result<(), Error> {
// Validate quote amount vs payment amount
if quote.amount > total_spent {
tracing::error!(
"Payment amount {} is less than quote amount {} for quote {}",
total_spent,
quote.amount,
quote.id
);
return Err(Error::IncorrectQuoteAmount);
}
// Validate inputs amount
if inputs_amount - inputs_fee < total_spent {
tracing::error!("Over paid melt quote {}", quote.id);
return Err(Error::IncorrectQuoteAmount);
}
// Update quote state to Paid
tx.update_melt_quote_state(&quote.id, MeltQuoteState::Paid, payment_preimage.clone())
.await?;
// Update payment lookup ID if changed
if quote.request_lookup_id.as_ref() != Some(payment_lookup_id) {
tracing::info!(
"Payment lookup id changed post payment from {:?} to {}",
&quote.request_lookup_id,
payment_lookup_id
);
tx.update_melt_quote_request_lookup_id(&quote.id, payment_lookup_id)
.await?;
}
// Mark input proofs as spent
match tx.update_proofs_states(input_ys, State::Spent).await {
Ok(_) => {}
Err(database::Error::AttemptUpdateSpentProof) => {
tracing::info!("Proofs for quote {} already marked as spent", quote.id);
return Ok(());
}
Err(err) => {
return Err(err.into());
}
}
// Publish proof state changes
for pk in input_ys.iter() {
pubsub.proof_state((*pk, State::Spent));
}
Ok(())
}
/// High-level melt finalization that handles the complete workflow.
///
/// This function orchestrates:
/// 1. Getting melt request info
/// 2. Getting input proof Y values
/// 3. Processing change (if needed)
/// 4. Core finalization operations
/// 5. Transaction commit
/// 6. Pubsub notification
///
/// # Arguments
///
/// * `mint` - Mint instance
/// * `db` - Database connection
/// * `pubsub` - Pubsub manager
/// * `quote` - Melt quote to finalize
/// * `total_spent` - Amount spent on payment
/// * `payment_preimage` - Payment preimage (if any)
/// * `payment_lookup_id` - Payment lookup identifier
///
/// # Returns
///
/// `Option<Vec<BlindSignature>>` - Change signatures (if any)
pub async fn finalize_melt_quote(
mint: &super::super::Mint,
db: &DynMintDatabase,
pubsub: &PubSubManager,
quote: &MeltQuote,
total_spent: Amount,
payment_preimage: Option<String>,
payment_lookup_id: &cdk_common::payment::PaymentIdentifier,
) -> Result<Option<Vec<BlindSignature>>, Error> {
use cdk_common::amount::to_unit;
tracing::info!("Finalizing melt quote {}", quote.id);
// Convert total_spent to quote unit
let total_spent = to_unit(total_spent, &quote.unit, &quote.unit).unwrap_or(total_spent);
let mut tx = db.begin_transaction().await?;
// Get melt request info
let melt_request_info = match tx.get_melt_request_and_blinded_messages(&quote.id).await? {
Some(info) => info,
None => {
tracing::warn!(
"No melt request found for quote {} - may have been completed already",
quote.id
);
tx.rollback().await?;
return Ok(None);
}
};
// Get input proof Y values
let input_ys = tx.get_proof_ys_by_quote_id(&quote.id).await?;
if input_ys.is_empty() {
tracing::warn!(
"No input proofs found for quote {} - may have been completed already",
quote.id
);
tx.rollback().await?;
return Ok(None);
}
// Core finalization (marks proofs spent, updates quote)
finalize_melt_core(
&mut tx,
pubsub,
quote,
&input_ys,
melt_request_info.inputs_amount,
melt_request_info.inputs_fee,
total_spent,
payment_preimage.clone(),
payment_lookup_id,
)
.await?;
// Close transaction before external call
tx.commit().await?;
// Process change (if needed) - opens new transaction
let (change_sigs, mut tx) = process_melt_change(
mint,
db,
&quote.id,
melt_request_info.inputs_amount,
total_spent,
melt_request_info.inputs_fee,
melt_request_info.change_outputs.clone(),
)
.await?;
// Delete melt request tracking
tx.delete_melt_request(&quote.id).await?;
// Commit transaction
tx.commit().await?;
// Publish quote status change
pubsub.melt_quote_status(
quote,
payment_preimage,
change_sigs.clone(),
MeltQuoteState::Paid,
);
tracing::info!("Successfully finalized melt quote {}", quote.id);
Ok(change_sigs)
}

View File

@@ -9,7 +9,7 @@ use cdk_common::amount::to_unit;
use cdk_common::common::{PaymentProcessorKey, QuoteTTL};
#[cfg(feature = "auth")]
use cdk_common::database::DynMintAuthDatabase;
use cdk_common::database::{self, DynMintDatabase, MintTransaction};
use cdk_common::database::{self, DynMintDatabase};
use cdk_common::nuts::{self, BlindSignature, BlindedMessage, CurrencyUnit, Id, Kind};
use cdk_common::payment::{DynMintPayment, WaitPaymentResponse};
pub use cdk_common::quote_id::QuoteId;
@@ -28,9 +28,9 @@ use tracing::instrument;
use crate::error::Error;
use crate::fees::calculate_fee;
use crate::nuts::*;
use crate::Amount;
#[cfg(feature = "auth")]
use crate::OidcClient;
use crate::{cdk_database, Amount};
#[cfg(feature = "auth")]
pub(crate) mod auth;
@@ -40,7 +40,6 @@ mod issue;
mod keysets;
mod ln;
mod melt;
mod proof_writer;
mod start_up_check;
mod subscription;
mod swap;
@@ -240,14 +239,21 @@ impl Mint {
/// - Payment processor initialization and startup
/// - Invoice payment monitoring across all configured payment processors
pub async fn start(&self) -> Result<(), Error> {
// Checks the status of all pending melt quotes
// Pending melt quotes where the payment has gone through inputs are burnt
// Pending melt quotes where the payment has **failed** inputs are reset to unspent
self.check_pending_melt_quotes().await?;
// Recover from incomplete swap sagas
// This cleans up incomplete swap operations using persisted saga state
self.recover_from_incomplete_sagas().await?;
if let Err(e) = self.recover_from_incomplete_sagas().await {
tracing::error!("Failed to recover incomplete swap sagas: {}", e);
// Don't fail startup
}
// Recover from incomplete melt sagas
// This cleans up incomplete melt operations using persisted saga state
// Now includes checking payment status with LN backend to determine
// whether to finalize (if paid) or compensate (if failed/unpaid)
if let Err(e) = self.recover_from_incomplete_melt_sagas().await {
tracing::error!("Failed to recover incomplete melt sagas: {}", e);
// Don't fail startup
}
let mut task_state = self.task_state.lock().await;
@@ -890,78 +896,6 @@ impl Mint {
result
}
/// Verify melt request is valid
/// Check to see if there is a corresponding mint quote for a melt.
/// In this case the mint can settle the payment internally and no ln payment is
/// needed
#[instrument(skip_all)]
pub async fn handle_internal_melt_mint(
&self,
tx: &mut Box<dyn MintTransaction<'_, cdk_database::Error> + Send + Sync + '_>,
melt_quote: &MeltQuote,
melt_request: &MeltRequest<QuoteId>,
) -> Result<Option<Amount>, Error> {
let mint_quote = match tx
.get_mint_quote_by_request(&melt_quote.request.to_string())
.await
{
Ok(Some(mint_quote)) if mint_quote.unit == melt_quote.unit => mint_quote,
// Not an internal melt -> mint or unit mismatch
Ok(_) => return Ok(None),
Err(err) => {
tracing::debug!("Error attempting to get mint quote: {}", err);
return Err(Error::Internal);
}
};
// Mint quote has already been settled, proofs should not be burned or held.
if (mint_quote.state() == MintQuoteState::Issued
|| mint_quote.state() == MintQuoteState::Paid)
&& mint_quote.payment_method == PaymentMethod::Bolt11
{
return Err(Error::RequestAlreadyPaid);
}
let inputs_amount_quote_unit = melt_request.inputs_amount().map_err(|_| {
tracing::error!("Proof inputs in melt quote overflowed");
Error::AmountOverflow
})?;
if let Some(amount) = mint_quote.amount {
if amount > inputs_amount_quote_unit {
tracing::debug!(
"Not enough inputs provided: {} needed {}",
inputs_amount_quote_unit,
amount
);
return Err(Error::InsufficientFunds);
}
}
let amount = melt_quote.amount;
tracing::info!(
"Mint quote {} paid {} from internal payment.",
mint_quote.id,
amount
);
let total_paid = tx
.increment_mint_quote_amount_paid(&mint_quote.id, amount, melt_quote.id.to_string())
.await?;
self.pubsub_manager
.mint_quote_payment(&mint_quote, total_paid);
tracing::info!(
"Melt quote {} paid Mint quote {}",
melt_quote.id,
mint_quote.id
);
Ok(Some(amount))
}
/// Restore
#[instrument(skip_all)]
pub async fn restore(&self, request: RestoreRequest) -> Result<RestoreResponse, Error> {

View File

@@ -1,237 +0,0 @@
//! Proof writer
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use cdk_common::database::{self, DynMintDatabase, MintTransaction};
use cdk_common::mint::Operation;
use cdk_common::{Error, Proofs, ProofsMethods, PublicKey, QuoteId, State};
use super::subscription::PubSubManager;
type Tx<'a, 'b> = Box<dyn MintTransaction<'a, database::Error> + Send + Sync + 'b>;
/// Proof writer
///
/// This is a proof writer that emulates a database transaction but without holding the transaction
/// alive while waiting for external events to be fully committed to the database; instead, it
/// maintains a `pending` state.
///
/// This struct allows for premature exit on error, enabling it to remove proofs or reset their
/// status.
///
/// This struct is not fully ACID. If the process exits due to a panic, and the `Drop` function
/// cannot be run, the reset process should reset the state.
pub struct ProofWriter {
db: Option<DynMintDatabase>,
pubsub_manager: Arc<PubSubManager>,
proof_original_states: Option<HashMap<PublicKey, Option<State>>>,
}
impl ProofWriter {
/// Creates a new ProofWriter on top of the database
pub fn new(db: DynMintDatabase, pubsub_manager: Arc<PubSubManager>) -> Self {
Self {
db: Some(db),
pubsub_manager,
proof_original_states: Some(Default::default()),
}
}
/// The changes are permanent, consume the struct removing the database, so the Drop does
/// nothing
pub fn commit(mut self) {
self.db.take();
self.proof_original_states.take();
}
/// Add proofs
pub async fn add_proofs(
&mut self,
tx: &mut Tx<'_, '_>,
proofs: &Proofs,
quote_id: Option<QuoteId>,
operation_id: &Operation,
) -> Result<Vec<PublicKey>, Error> {
let proof_states = if let Some(proofs) = self.proof_original_states.as_mut() {
proofs
} else {
return Err(Error::Internal);
};
if let Some(err) = tx
.add_proofs(proofs.clone(), quote_id, operation_id)
.await
.err()
{
return match err {
cdk_common::database::Error::Duplicate => Err(Error::TokenPending),
cdk_common::database::Error::AttemptUpdateSpentProof => {
Err(Error::TokenAlreadySpent)
}
err => Err(Error::Database(err)),
};
}
let ys = proofs.ys()?;
for pk in ys.iter() {
proof_states.insert(*pk, None);
}
self.update_proofs_states(tx, &ys, State::Pending).await?;
Ok(ys)
}
/// Update proof status
pub async fn update_proofs_states(
&mut self,
tx: &mut Tx<'_, '_>,
ys: &[PublicKey],
new_proof_state: State,
) -> Result<(), Error> {
let proof_states = if let Some(proofs) = self.proof_original_states.as_mut() {
proofs
} else {
return Err(Error::Internal);
};
let original_proofs_state = match tx.update_proofs_states(ys, new_proof_state).await {
Ok(states) => states,
Err(database::Error::AttemptUpdateSpentProof)
| Err(database::Error::AttemptRemoveSpentProof) => {
return Err(Error::TokenAlreadySpent)
}
Err(err) => return Err(err.into()),
};
if ys.len() != original_proofs_state.len() {
return Err(Error::Internal);
}
let proofs_state = original_proofs_state
.iter()
.flatten()
.map(|x| x.to_owned())
.collect::<HashSet<State>>();
let forbidden_states = if new_proof_state == State::Pending {
// If the new state is `State::Pending` it cannot be pending already
vec![State::Pending, State::Spent]
} else {
// For other state it cannot be spent
vec![State::Spent]
};
for forbidden_state in forbidden_states.iter() {
if proofs_state.contains(forbidden_state) {
reset_proofs_to_original_state(tx, ys, original_proofs_state).await?;
return Err(if proofs_state.contains(&State::Pending) {
Error::TokenPending
} else {
Error::TokenAlreadySpent
});
}
}
for (idx, ys) in ys.iter().enumerate() {
proof_states
.entry(*ys)
.or_insert(original_proofs_state[idx]);
}
for pk in ys {
self.pubsub_manager.proof_state((*pk, new_proof_state));
}
Ok(())
}
/// Rollback all changes in this ProofWriter consuming it.
pub async fn rollback(mut self) -> Result<(), Error> {
let db = if let Some(db) = self.db.take() {
db
} else {
return Ok(());
};
let mut tx = db.begin_transaction().await?;
let (ys, original_states) = if let Some(proofs) = self.proof_original_states.take() {
proofs.into_iter().unzip::<_, _, Vec<_>, Vec<_>>()
} else {
return Ok(());
};
tracing::info!(
"Rollback {} proofs to their original states {:?}",
ys.len(),
original_states
);
reset_proofs_to_original_state(&mut tx, &ys, original_states).await?;
tx.commit().await?;
Ok(())
}
}
/// Resets proofs to their original states or removes them
#[inline(always)]
async fn reset_proofs_to_original_state(
tx: &mut Tx<'_, '_>,
ys: &[PublicKey],
original_states: Vec<Option<State>>,
) -> Result<(), Error> {
let mut ys_by_state = HashMap::new();
let mut unknown_proofs = Vec::new();
for (y, state) in ys.iter().zip(original_states) {
if let Some(state) = state {
// Skip attempting to update proofs that were originally spent
if state != State::Spent {
ys_by_state.entry(state).or_insert_with(Vec::new).push(*y);
}
} else {
unknown_proofs.push(*y);
}
}
for (state, ys) in ys_by_state {
tx.update_proofs_states(&ys, state).await?;
}
if !unknown_proofs.is_empty() {
tx.remove_proofs(&unknown_proofs, None).await?;
}
Ok(())
}
#[inline(always)]
async fn rollback(
db: DynMintDatabase,
ys: Vec<PublicKey>,
original_states: Vec<Option<State>>,
) -> Result<(), Error> {
let mut tx = db.begin_transaction().await?;
reset_proofs_to_original_state(&mut tx, &ys, original_states).await?;
tx.commit().await?;
Ok(())
}
impl Drop for ProofWriter {
fn drop(&mut self) {
let db = if let Some(db) = self.db.take() {
db
} else {
return;
};
let (ys, states) = if let Some(proofs) = self.proof_original_states.take() {
proofs.into_iter().unzip()
} else {
return;
};
tokio::spawn(rollback(db, ys, states));
}
}

View File

@@ -3,88 +3,103 @@
//! These checks are need in the case the mint was offline and the lightning node was node.
//! These ensure that the status of the mint or melt quote matches in the mint db and on the node.
use std::str::FromStr;
use cdk_common::mint::OperationKind;
use cdk_common::QuoteId;
use super::{Error, Mint};
use crate::mint::swap::swap_saga::compensation::{CompensatingAction, RemoveSwapSetup};
use crate::mint::{MeltQuote, MeltQuoteState, PaymentMethod};
use crate::mint::{MeltQuote, MeltQuoteState};
use crate::types::PaymentProcessorKey;
impl Mint {
/// Checks the states of melt quotes that are **PENDING** or **UNKNOWN** to the mint with the ln node
pub async fn check_pending_melt_quotes(&self) -> Result<(), Error> {
// TODO: We should have a db query to do this filtering
let melt_quotes = self.localstore.get_melt_quotes().await?;
let pending_quotes: Vec<MeltQuote> = melt_quotes
.into_iter()
.filter(|q| q.state == MeltQuoteState::Pending || q.state == MeltQuoteState::Unknown)
.collect();
tracing::info!("There are {} pending melt quotes.", pending_quotes.len());
/// Checks the payment status of a melt quote with the LN backend
///
/// This is a helper function used by saga recovery to determine whether to
/// finalize or compensate an incomplete melt operation.
///
/// # Returns
///
/// - `Ok(MakePaymentResponse)`: Payment status successfully retrieved from backend
/// - `Err(Error)`: Failed to check payment status (backend unavailable, no lookup_id, etc.)
async fn check_melt_payment_status(
&self,
quote: &MeltQuote,
) -> Result<crate::cdk_payment::MakePaymentResponse, Error> {
let ln_key = PaymentProcessorKey {
unit: quote.unit.clone(),
method: quote.payment_method.clone(),
};
if pending_quotes.is_empty() {
return Ok(());
}
let ln_backend = self.payment_processors.get(&ln_key).ok_or_else(|| {
tracing::warn!("No backend for ln key: {:?}", ln_key);
Error::UnsupportedUnit
})?;
let mut tx = self.localstore.begin_transaction().await?;
let lookup_id = quote.request_lookup_id.as_ref().ok_or_else(|| {
tracing::warn!(
"No lookup_id for melt quote {}, cannot check payment status",
quote.id
);
Error::Internal
})?;
for pending_quote in pending_quotes {
tracing::debug!("Checking status for melt quote {}.", pending_quote.id);
let ln_key = PaymentProcessorKey {
unit: pending_quote.unit,
method: PaymentMethod::Bolt11,
};
let ln_backend = match self.payment_processors.get(&ln_key) {
Some(ln_backend) => ln_backend,
None => {
tracing::warn!("No backend for ln key: {:?}", ln_key);
continue;
}
};
if let Some(lookup_id) = pending_quote.request_lookup_id {
let pay_invoice_response = ln_backend.check_outgoing_payment(&lookup_id).await?;
tracing::warn!(
"There is no stored melt request for pending melt quote: {}",
pending_quote.id
);
let melt_quote_state = match pay_invoice_response.status {
MeltQuoteState::Unpaid => MeltQuoteState::Unpaid,
MeltQuoteState::Paid => MeltQuoteState::Paid,
MeltQuoteState::Pending => MeltQuoteState::Pending,
MeltQuoteState::Failed => MeltQuoteState::Unpaid,
MeltQuoteState::Unknown => MeltQuoteState::Unpaid,
};
if let Err(err) = tx
.update_melt_quote_state(
&pending_quote.id,
melt_quote_state,
pay_invoice_response.payment_proof,
)
.await
{
// Check payment status with LN backend
let pay_invoice_response =
ln_backend
.check_outgoing_payment(lookup_id)
.await
.map_err(|err| {
tracing::error!(
"Could not update quote {} to state {}, current state {}, {}",
pending_quote.id,
melt_quote_state,
pending_quote.state,
"Failed to check payment status for quote {}: {}",
quote.id,
err
);
};
}
}
Error::Internal
})?;
tx.commit().await?;
tracing::info!(
"Payment status for melt quote {}: {}",
quote.id,
pay_invoice_response.status
);
Ok(pay_invoice_response)
}
/// Finalizes a paid melt quote during startup check
///
/// Uses shared finalization logic from melt::shared module
async fn finalize_paid_melt_quote(
&self,
quote: &MeltQuote,
total_spent: cdk_common::Amount,
payment_preimage: Option<String>,
payment_lookup_id: &cdk_common::payment::PaymentIdentifier,
) -> Result<(), Error> {
tracing::info!("Finalizing paid melt quote {} during startup", quote.id);
// Use shared finalization
super::melt::shared::finalize_melt_quote(
self,
&self.localstore,
&self.pubsub_manager,
quote,
total_spent,
payment_preimage,
payment_lookup_id,
)
.await?;
tracing::info!(
"Successfully finalized melt quote {} during startup check",
quote.id
);
Ok(())
}
/// Recover from incomplete swap sagas
///
/// Checks all persisted sagas for swap operations and compensates
/// incomplete ones by removing both proofs and blinded messages.
pub async fn recover_from_incomplete_sagas(&self) -> Result<(), Error> {
@@ -145,4 +160,368 @@ impl Mint {
Ok(())
}
/// Recover from incomplete melt sagas
///
/// Checks all persisted sagas for melt operations and determines whether to:
/// - **Finalize**: If payment was confirmed as PAID on LN backend
/// - **Compensate**: If payment was confirmed as UNPAID/FAILED or never sent
/// - **Skip**: If payment is PENDING/UNKNOWN (leave for check_pending_melt_quotes)
///
/// This recovery handles SetupComplete state which means:
/// - Proofs were reserved (marked as PENDING)
/// - Change outputs were added
/// - Payment may or may not have been sent
///
/// # Critical Bug Fix
///
/// Previously, this function always compensated (rolled back) incomplete sagas without
/// checking if the payment actually succeeded on the LN backend. This could cause the
/// mint to lose funds if:
/// 1. Payment succeeded on LN backend
/// 2. Mint crashed before finalize() committed
/// 3. Recovery compensated (returned proofs) instead of finalizing
///
/// Now we check the LN backend payment status before deciding whether to compensate or finalize.
pub async fn recover_from_incomplete_melt_sagas(&self) -> Result<(), Error> {
let incomplete_sagas = self
.localstore
.get_incomplete_sagas(OperationKind::Melt)
.await?;
if incomplete_sagas.is_empty() {
tracing::info!("No incomplete melt sagas found to recover.");
return Ok(());
}
let total_sagas = incomplete_sagas.len();
tracing::info!("Found {} incomplete melt sagas to recover.", total_sagas);
for saga in incomplete_sagas {
tracing::info!(
"Recovering melt saga {} in state '{}' (created: {}, updated: {})",
saga.operation_id,
saga.state.state(),
saga.created_at,
saga.updated_at
);
// Get quote_id from saga (new field added for efficient lookup)
let quote_id = match saga.quote_id {
Some(ref qid) => qid.clone(),
None => {
tracing::warn!(
"Saga {} has no quote_id (old saga format) - attempting fallback lookup",
saga.operation_id
);
// Fallback: Find quote by matching input_ys (for backward compatibility)
let melt_quotes = match self.localstore.get_melt_quotes().await {
Ok(quotes) => quotes,
Err(e) => {
tracing::error!(
"Failed to get melt quotes for saga {}: {}",
saga.operation_id,
e
);
continue;
}
};
let mut quote_id_found = None;
for quote in melt_quotes {
let tx = self.localstore.begin_transaction().await?;
let proof_ys = tx.get_proof_ys_by_quote_id(&quote.id).await?;
tx.rollback().await?;
if !saga.input_ys.is_empty()
&& !proof_ys.is_empty()
&& saga.input_ys.iter().any(|y| proof_ys.contains(y))
{
quote_id_found = Some(quote.id.clone());
break;
}
}
match quote_id_found {
Some(qid) => qid.to_string(),
None => {
tracing::warn!(
"Could not find quote_id for saga {} - may have been cleaned up already. Deleting orphaned saga.",
saga.operation_id
);
let mut delete_tx = self.localstore.begin_transaction().await?;
if let Err(e) = delete_tx.delete_saga(&saga.operation_id).await {
tracing::error!(
"Failed to delete orphaned saga {}: {}",
saga.operation_id,
e
);
delete_tx.rollback().await?;
} else {
delete_tx.commit().await?;
}
continue;
}
}
}
};
// Get the quote from database
let quote_id_parsed = match QuoteId::from_str(&quote_id) {
Ok(id) => id,
Err(e) => {
tracing::error!(
"Failed to parse quote_id '{}' for saga {}: {:?}. Skipping saga.",
quote_id,
saga.operation_id,
e
);
continue;
}
};
let quote = match self.localstore.get_melt_quote(&quote_id_parsed).await {
Ok(Some(q)) => q,
Ok(None) => {
tracing::warn!(
"Quote {} for saga {} not found - may have been cleaned up. Deleting orphaned saga.",
quote_id,
saga.operation_id
);
let mut delete_tx = self.localstore.begin_transaction().await?;
if let Err(e) = delete_tx.delete_saga(&saga.operation_id).await {
tracing::error!(
"Failed to delete orphaned saga {}: {}",
saga.operation_id,
e
);
delete_tx.rollback().await?;
} else {
delete_tx.commit().await?;
}
continue;
}
Err(e) => {
tracing::error!(
"Failed to get quote {} for saga {}: {}. Skipping saga.",
quote_id,
saga.operation_id,
e
);
continue;
}
};
// Check saga state to determine if payment was sent
// SetupComplete means setup transaction committed but payment NOT yet sent
let should_compensate = match &saga.state {
cdk_common::mint::SagaStateEnum::Melt(state) => {
match state {
cdk_common::mint::MeltSagaState::SetupComplete => {
// Setup complete but payment never sent - always compensate
tracing::info!(
"Saga {} in SetupComplete state - payment never sent, will compensate",
saga.operation_id
);
true
}
_ => {
// Other states - should not happen in incomplete sagas, but check payment status anyway
false // Will check payment status below
}
}
}
_ => {
continue; // Skip non-melt sagas
}
};
let should_compensate = if should_compensate {
true
} else if quote.request_lookup_id.is_none() {
// Fallback: No request_lookup_id means payment likely never sent
tracing::info!(
"Saga {} for quote {} has no request_lookup_id - payment never sent, will compensate",
saga.operation_id,
quote_id
);
true
} else {
// Payment was attempted - check LN backend status
tracing::info!(
"Saga {} for quote {} has request_lookup_id - checking payment status with LN backend",
saga.operation_id,
quote_id
);
match self.check_melt_payment_status(&quote).await {
Ok(payment_response) => {
match payment_response.status {
MeltQuoteState::Paid => {
// Payment succeeded - finalize instead of compensating
tracing::info!(
"Saga {} for quote {} - payment PAID on LN backend, will finalize",
saga.operation_id,
quote_id
);
if let Err(err) = self
.finalize_paid_melt_quote(
&quote,
payment_response.total_spent,
payment_response.payment_proof,
&payment_response.payment_lookup_id,
)
.await
{
tracing::error!(
"Failed to finalize paid melt saga {}: {}",
saga.operation_id,
err
);
}
// Delete saga after successful finalization
let mut tx = self.localstore.begin_transaction().await?;
if let Err(e) = tx.delete_saga(&saga.operation_id).await {
tracing::error!(
"Failed to delete saga for {}: {}",
saga.operation_id,
e
);
tx.rollback().await?;
} else {
tx.commit().await?;
tracing::info!(
"Successfully recovered and finalized melt saga {}",
saga.operation_id
);
}
continue; // Skip compensation, saga handled
}
MeltQuoteState::Unpaid | MeltQuoteState::Failed => {
// Payment failed - compensate
tracing::info!(
"Saga {} for quote {} - payment {} on LN backend, will compensate",
saga.operation_id,
quote_id,
payment_response.status
);
true
}
MeltQuoteState::Pending | MeltQuoteState::Unknown => {
// Payment still pending - skip for check_pending_melt_quotes
tracing::info!(
"Saga {} for quote {} - payment {} on LN backend, skipping (will be handled by check_pending_melt_quotes)",
saga.operation_id,
quote_id,
payment_response.status
);
continue; // Skip this saga, don't compensate or finalize
}
}
}
Err(err) => {
// LN backend unavailable - skip this saga, will retry on next recovery cycle
tracing::warn!(
"Failed to check payment status for saga {} quote {}: {}. Skipping for now, will retry on next recovery cycle.",
saga.operation_id,
quote_id,
err
);
continue; // Skip this saga
}
}
};
// Compensate if needed
if should_compensate {
// Use saga data directly for compensation (like swap does)
tracing::info!(
"Compensating melt saga {} (removing {} proofs, {} change outputs)",
saga.operation_id,
saga.input_ys.len(),
saga.blinded_secrets.len()
);
// Compensate using saga data only - don't rely on quote state
let mut tx = self.localstore.begin_transaction().await?;
// Remove blinded messages (change outputs)
if !saga.blinded_secrets.is_empty() {
if let Err(e) = tx.delete_blinded_messages(&saga.blinded_secrets).await {
tracing::error!(
"Failed to delete blinded messages for saga {}: {}",
saga.operation_id,
e
);
tx.rollback().await?;
continue;
}
}
// Remove proofs (inputs) - use None for quote_id like swap does
if !saga.input_ys.is_empty() {
if let Err(e) = tx.remove_proofs(&saga.input_ys, None).await {
tracing::error!(
"Failed to remove proofs for saga {}: {}",
saga.operation_id,
e
);
tx.rollback().await?;
continue;
}
}
// Reset quote state to Unpaid (melt-specific, unlike swap)
if let Err(e) = tx
.update_melt_quote_state(&quote_id_parsed, MeltQuoteState::Unpaid, None)
.await
{
tracing::error!(
"Failed to reset quote state for saga {}: {}",
saga.operation_id,
e
);
tx.rollback().await?;
continue;
}
// Delete melt request tracking record
if let Err(e) = tx.delete_melt_request(&quote_id_parsed).await {
tracing::error!(
"Failed to delete melt request for saga {}: {}",
saga.operation_id,
e
);
// Don't fail if melt request doesn't exist - it might not have been created yet
}
// Delete saga after successful compensation
if let Err(e) = tx.delete_saga(&saga.operation_id).await {
tracing::error!("Failed to delete saga for {}: {}", saga.operation_id, e);
tx.rollback().await?;
continue;
}
tx.commit().await?;
tracing::info!(
"Successfully recovered and compensated melt saga {}",
saga.operation_id
);
}
}
tracing::info!(
"Successfully recovered {} incomplete melt sagas.",
total_sagas
);
Ok(())
}
}

View File

@@ -199,8 +199,8 @@ itest db:
fake-mint-itest db:
#!/usr/bin/env bash
set -euo pipefail
./misc/fake_itests.sh "{{db}}" external_signatory
./misc/fake_itests.sh "{{db}}"
./misc/fake_itests.sh "{{db}}" external_signatory
itest-payment-processor ln:
#!/usr/bin/env bash