mirror of
https://github.com/aljazceru/cdk.git
synced 2025-12-20 22:24:54 +01:00
Fixed race condition
Bug: https://github.com/cashubtc/cdk/actions/runs/15683152414/job/44190084378?pr=822#step:5:19212 Reason: a race condition between removing proofs while melting and the quote states being updated. Solution: 1. Error on duplicate proofs 2. Read quote when updating to avoid race conditions and rollbacks Real solution: A transaction trait in the storage layer. That is coming next
This commit is contained in:
@@ -88,7 +88,7 @@ pub trait QuotesDatabase {
|
|||||||
&self,
|
&self,
|
||||||
quote_id: &Uuid,
|
quote_id: &Uuid,
|
||||||
state: MeltQuoteState,
|
state: MeltQuoteState,
|
||||||
) -> Result<MeltQuoteState, Self::Err>;
|
) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err>;
|
||||||
/// Get all [`mint::MeltQuote`]s
|
/// Get all [`mint::MeltQuote`]s
|
||||||
async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err>;
|
async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err>;
|
||||||
/// Remove [`mint::MeltQuote`]
|
/// Remove [`mint::MeltQuote`]
|
||||||
|
|||||||
@@ -22,6 +22,11 @@ pub enum Error {
|
|||||||
/// Database Error
|
/// Database Error
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
Database(Box<dyn std::error::Error + Send + Sync>),
|
Database(Box<dyn std::error::Error + Send + Sync>),
|
||||||
|
|
||||||
|
/// Duplicate entry
|
||||||
|
#[error("Duplicate entry")]
|
||||||
|
Duplicate,
|
||||||
|
|
||||||
/// DHKE error
|
/// DHKE error
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
DHKE(#[from] crate::dhke::Error),
|
DHKE(#[from] crate::dhke::Error),
|
||||||
|
|||||||
@@ -470,12 +470,13 @@ impl MintQuotesDatabase for MintRedbDatabase {
|
|||||||
&self,
|
&self,
|
||||||
quote_id: &Uuid,
|
quote_id: &Uuid,
|
||||||
state: MeltQuoteState,
|
state: MeltQuoteState,
|
||||||
) -> Result<MeltQuoteState, Self::Err> {
|
) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> {
|
||||||
let write_txn = self.db.begin_write().map_err(Error::from)?;
|
let write_txn = self.db.begin_write().map_err(Error::from)?;
|
||||||
|
|
||||||
let current_state;
|
let current_state;
|
||||||
{
|
|
||||||
let mut melt_quote: mint::MeltQuote;
|
let mut melt_quote: mint::MeltQuote;
|
||||||
|
|
||||||
|
{
|
||||||
let mut table = write_txn
|
let mut table = write_txn
|
||||||
.open_table(MELT_QUOTES_TABLE)
|
.open_table(MELT_QUOTES_TABLE)
|
||||||
.map_err(Error::from)?;
|
.map_err(Error::from)?;
|
||||||
@@ -506,7 +507,7 @@ impl MintQuotesDatabase for MintRedbDatabase {
|
|||||||
}
|
}
|
||||||
write_txn.commit().map_err(Error::from)?;
|
write_txn.commit().map_err(Error::from)?;
|
||||||
|
|
||||||
Ok(current_state)
|
Ok((current_state, melt_quote))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {
|
async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {
|
||||||
|
|||||||
@@ -4,7 +4,7 @@ use std::sync::{mpsc as std_mpsc, Arc, Mutex};
|
|||||||
use std::thread::spawn;
|
use std::thread::spawn;
|
||||||
use std::time::Instant;
|
use std::time::Instant;
|
||||||
|
|
||||||
use rusqlite::{Connection, TransactionBehavior};
|
use rusqlite::{ffi, Connection, ErrorCode, TransactionBehavior};
|
||||||
use tokio::sync::{mpsc, oneshot};
|
use tokio::sync::{mpsc, oneshot};
|
||||||
|
|
||||||
use crate::common::SqliteConnectionManager;
|
use crate::common::SqliteConnectionManager;
|
||||||
@@ -202,6 +202,26 @@ fn rusqlite_spawn_worker_threads(
|
|||||||
Ok(ok) => reply_to.send(ok),
|
Ok(ok) => reply_to.send(ok),
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
tracing::error!("Failed query with error {:?}", err);
|
tracing::error!("Failed query with error {:?}", err);
|
||||||
|
let err = if let Error::Sqlite(rusqlite::Error::SqliteFailure(
|
||||||
|
ffi::Error {
|
||||||
|
code,
|
||||||
|
extended_code,
|
||||||
|
},
|
||||||
|
_,
|
||||||
|
)) = &err
|
||||||
|
{
|
||||||
|
if *code == ErrorCode::ConstraintViolation
|
||||||
|
&& (*extended_code == ffi::SQLITE_CONSTRAINT_PRIMARYKEY
|
||||||
|
|| *extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE)
|
||||||
|
{
|
||||||
|
Error::Duplicate
|
||||||
|
} else {
|
||||||
|
err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err
|
||||||
|
};
|
||||||
|
|
||||||
reply_to.send(DbResponse::Error(err))
|
reply_to.send(DbResponse::Error(err))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@@ -331,6 +351,27 @@ fn rusqlite_worker_manager(
|
|||||||
tx_id,
|
tx_id,
|
||||||
err
|
err
|
||||||
);
|
);
|
||||||
|
let err = if let Error::Sqlite(
|
||||||
|
rusqlite::Error::SqliteFailure(
|
||||||
|
ffi::Error {
|
||||||
|
code,
|
||||||
|
extended_code,
|
||||||
|
},
|
||||||
|
_,
|
||||||
|
),
|
||||||
|
) = &err
|
||||||
|
{
|
||||||
|
if *code == ErrorCode::ConstraintViolation
|
||||||
|
&& (*extended_code == ffi::SQLITE_CONSTRAINT_PRIMARYKEY
|
||||||
|
|| *extended_code == ffi::SQLITE_CONSTRAINT_UNIQUE)
|
||||||
|
{
|
||||||
|
Error::Duplicate
|
||||||
|
} else {
|
||||||
|
err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
err
|
||||||
|
};
|
||||||
reply_to.send(DbResponse::Error(err))
|
reply_to.send(DbResponse::Error(err))
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|||||||
@@ -9,6 +9,10 @@ pub enum Error {
|
|||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
Sqlite(#[from] rusqlite::Error),
|
Sqlite(#[from] rusqlite::Error),
|
||||||
|
|
||||||
|
/// Duplicate entry
|
||||||
|
#[error("Record already exists")]
|
||||||
|
Duplicate,
|
||||||
|
|
||||||
/// Pool error
|
/// Pool error
|
||||||
#[error(transparent)]
|
#[error(transparent)]
|
||||||
Pool(#[from] crate::pool::Error<rusqlite::Error>),
|
Pool(#[from] crate::pool::Error<rusqlite::Error>),
|
||||||
@@ -98,6 +102,9 @@ pub enum Error {
|
|||||||
|
|
||||||
impl From<Error> for cdk_common::database::Error {
|
impl From<Error> for cdk_common::database::Error {
|
||||||
fn from(e: Error) -> Self {
|
fn from(e: Error) -> Self {
|
||||||
Self::Database(Box::new(e))
|
match e {
|
||||||
|
Error::Duplicate => Self::Duplicate,
|
||||||
|
e => Self::Database(Box::new(e)),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -674,10 +674,10 @@ ON CONFLICT(request_lookup_id) DO UPDATE SET
|
|||||||
&self,
|
&self,
|
||||||
quote_id: &Uuid,
|
quote_id: &Uuid,
|
||||||
state: MeltQuoteState,
|
state: MeltQuoteState,
|
||||||
) -> Result<MeltQuoteState, Self::Err> {
|
) -> Result<(MeltQuoteState, mint::MeltQuote), Self::Err> {
|
||||||
let transaction = self.pool.begin().await?;
|
let transaction = self.pool.begin().await?;
|
||||||
|
|
||||||
let quote = query(
|
let mut quote = query(
|
||||||
r#"
|
r#"
|
||||||
SELECT
|
SELECT
|
||||||
id,
|
id,
|
||||||
@@ -732,7 +732,10 @@ ON CONFLICT(request_lookup_id) DO UPDATE SET
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok(quote.state)
|
let old_state = quote.state;
|
||||||
|
quote.state = state;
|
||||||
|
|
||||||
|
Ok((old_state, quote))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_melt_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err> {
|
async fn remove_melt_quote(&self, quote_id: &Uuid) -> Result<(), Self::Err> {
|
||||||
@@ -816,7 +819,7 @@ impl MintProofsDatabase for MintSqliteDatabase {
|
|||||||
for proof in proofs {
|
for proof in proofs {
|
||||||
query(
|
query(
|
||||||
r#"
|
r#"
|
||||||
INSERT OR IGNORE INTO proof
|
INSERT INTO proof
|
||||||
(y, amount, keyset_id, secret, c, witness, state, quote_id, created_time)
|
(y, amount, keyset_id, secret, c, witness, state, quote_id, created_time)
|
||||||
VALUES
|
VALUES
|
||||||
(:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time)
|
(:y, :amount, :keyset_id, :secret, :c, :witness, :state, :quote_id, :created_time)
|
||||||
@@ -852,10 +855,11 @@ impl MintProofsDatabase for MintSqliteDatabase {
|
|||||||
|
|
||||||
let total_deleted = query(
|
let total_deleted = query(
|
||||||
r#"
|
r#"
|
||||||
DELETE FROM proof WHERE y IN (:ys) AND state != 'SPENT'
|
DELETE FROM proof WHERE y IN (:ys) AND state NOT IN (:exclude_state)
|
||||||
"#,
|
"#,
|
||||||
)
|
)
|
||||||
.bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
|
.bind_vec(":ys", ys.iter().map(|y| y.to_bytes().to_vec()).collect())
|
||||||
|
.bind_vec(":exclude_state", vec![State::Spent.to_string()])
|
||||||
.execute(&transaction)
|
.execute(&transaction)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
@@ -974,7 +978,11 @@ impl MintProofsDatabase for MintSqliteDatabase {
|
|||||||
|
|
||||||
if current_states.len() != ys.len() {
|
if current_states.len() != ys.len() {
|
||||||
transaction.rollback().await?;
|
transaction.rollback().await?;
|
||||||
tracing::warn!("Attempted to update state of non-existent proof");
|
tracing::warn!(
|
||||||
|
"Attempted to update state of non-existent proof {} {}",
|
||||||
|
current_states.len(),
|
||||||
|
ys.len()
|
||||||
|
);
|
||||||
return Err(database::Error::ProofNotFound);
|
return Err(database::Error::ProofNotFound);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -297,7 +297,7 @@ impl Mint {
|
|||||||
&self,
|
&self,
|
||||||
melt_request: &MeltRequest<Uuid>,
|
melt_request: &MeltRequest<Uuid>,
|
||||||
) -> Result<MeltQuote, Error> {
|
) -> Result<MeltQuote, Error> {
|
||||||
let state = self
|
let (state, quote) = self
|
||||||
.localstore
|
.localstore
|
||||||
.update_melt_quote_state(melt_request.quote(), MeltQuoteState::Pending)
|
.update_melt_quote_state(melt_request.quote(), MeltQuoteState::Pending)
|
||||||
.await?;
|
.await?;
|
||||||
@@ -309,12 +309,6 @@ impl Mint {
|
|||||||
MeltQuoteState::Unknown => Err(Error::UnknownPaymentState),
|
MeltQuoteState::Unknown => Err(Error::UnknownPaymentState),
|
||||||
}?;
|
}?;
|
||||||
|
|
||||||
let quote = self
|
|
||||||
.localstore
|
|
||||||
.get_melt_quote(melt_request.quote())
|
|
||||||
.await?
|
|
||||||
.ok_or(Error::UnknownQuote)?;
|
|
||||||
|
|
||||||
self.pubsub_manager
|
self.pubsub_manager
|
||||||
.melt_quote_status("e, None, None, MeltQuoteState::Pending);
|
.melt_quote_status("e, None, None, MeltQuoteState::Pending);
|
||||||
|
|
||||||
@@ -347,9 +341,19 @@ impl Mint {
|
|||||||
));
|
));
|
||||||
}
|
}
|
||||||
|
|
||||||
self.localstore
|
if let Some(err) = self
|
||||||
|
.localstore
|
||||||
.add_proofs(melt_request.inputs().clone(), None)
|
.add_proofs(melt_request.inputs().clone(), None)
|
||||||
.await?;
|
.await
|
||||||
|
.err()
|
||||||
|
{
|
||||||
|
match err {
|
||||||
|
cdk_common::database::Error::Duplicate => {
|
||||||
|
// the proofs already exits, it will be errored by `check_ys_spendable`
|
||||||
|
}
|
||||||
|
err => return Err(Error::Database(err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
self.check_ys_spendable(&input_ys, State::Pending).await?;
|
self.check_ys_spendable(&input_ys, State::Pending).await?;
|
||||||
for proof in melt_request.inputs() {
|
for proof in melt_request.inputs() {
|
||||||
|
|||||||
@@ -24,9 +24,19 @@ impl Mint {
|
|||||||
|
|
||||||
// After swap request is fully validated, add the new proofs to DB
|
// After swap request is fully validated, add the new proofs to DB
|
||||||
let input_ys = swap_request.inputs().ys()?;
|
let input_ys = swap_request.inputs().ys()?;
|
||||||
self.localstore
|
if let Some(err) = self
|
||||||
|
.localstore
|
||||||
.add_proofs(swap_request.inputs().clone(), None)
|
.add_proofs(swap_request.inputs().clone(), None)
|
||||||
.await?;
|
.await
|
||||||
|
.err()
|
||||||
|
{
|
||||||
|
match err {
|
||||||
|
cdk_common::database::Error::Duplicate => {
|
||||||
|
// the proofs already exits, it will be errored by `check_ys_spendable`
|
||||||
|
}
|
||||||
|
err => return Err(Error::Database(err)),
|
||||||
|
}
|
||||||
|
}
|
||||||
self.check_ys_spendable(&input_ys, State::Pending).await?;
|
self.check_ys_spendable(&input_ys, State::Pending).await?;
|
||||||
|
|
||||||
let mut promises = Vec::with_capacity(swap_request.outputs().len());
|
let mut promises = Vec::with_capacity(swap_request.outputs().len());
|
||||||
|
|||||||
Reference in New Issue
Block a user