fix: sqlx sqlite settings

feat: use transactions in all sql fns
This commit is contained in:
thesimplekid
2024-09-20 21:07:21 +02:00
parent 4ba2b07418
commit f2867188a8
2 changed files with 491 additions and 167 deletions

View File

@@ -37,7 +37,7 @@ async fn new_mint(fee: u64) -> Mint {
let mnemonic = Mnemonic::generate(12).unwrap(); let mnemonic = Mnemonic::generate(12).unwrap();
let mint = Mint::new( Mint::new(
MINT_URL, MINT_URL,
&mnemonic.to_seed_normalized(""), &mnemonic.to_seed_normalized(""),
mint_info, mint_info,
@@ -45,9 +45,7 @@ async fn new_mint(fee: u64) -> Mint {
supported_units, supported_units,
) )
.await .await
.unwrap(); .unwrap()
mint
} }
async fn initialize() -> &'static Mint { async fn initialize() -> &'static Mint {

View File

@@ -3,6 +3,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::path::Path; use std::path::Path;
use std::str::FromStr; use std::str::FromStr;
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use bitcoin::bip32::DerivationPath; use bitcoin::bip32::DerivationPath;
@@ -18,8 +19,8 @@ use cdk::secret::Secret;
use cdk::{mint, Amount}; use cdk::{mint, Amount};
use error::Error; use error::Error;
use lightning_invoice::Bolt11Invoice; use lightning_invoice::Bolt11Invoice;
use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqliteRow}; use sqlx::sqlite::{SqliteConnectOptions, SqlitePool, SqlitePoolOptions, SqliteRow};
use sqlx::{ConnectOptions, Row}; use sqlx::Row;
pub mod error; pub mod error;
@@ -33,15 +34,16 @@ impl MintSqliteDatabase {
/// Create new [`MintSqliteDatabase`] /// Create new [`MintSqliteDatabase`]
pub async fn new(path: &Path) -> Result<Self, Error> { pub async fn new(path: &Path) -> Result<Self, Error> {
let path = path.to_str().ok_or(Error::InvalidDbPath)?; let path = path.to_str().ok_or(Error::InvalidDbPath)?;
let _conn = SqliteConnectOptions::from_str(path)? let db_options = SqliteConnectOptions::from_str(path)?
.journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) .busy_timeout(Duration::from_secs(5))
.read_only(false) .read_only(false)
.create_if_missing(true) .create_if_missing(true)
.auto_vacuum(sqlx::sqlite::SqliteAutoVacuum::Full) .auto_vacuum(sqlx::sqlite::SqliteAutoVacuum::Full);
.connect()
.await?;
let pool = SqlitePool::connect(path).await?; let pool = SqlitePoolOptions::new()
.max_connections(1)
.connect_with(db_options)
.await?;
Ok(Self { pool }) Ok(Self { pool })
} }
@@ -61,7 +63,8 @@ impl MintDatabase for MintSqliteDatabase {
async fn set_active_keyset(&self, unit: CurrencyUnit, id: Id) -> Result<(), Self::Err> { async fn set_active_keyset(&self, unit: CurrencyUnit, id: Id) -> Result<(), Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?; let mut transaction = self.pool.begin().await.map_err(Error::from)?;
sqlx::query(
let update_res = sqlx::query(
r#" r#"
UPDATE keyset UPDATE keyset
SET active=FALSE SET active=FALSE
@@ -70,10 +73,21 @@ WHERE unit IS ?;
) )
.bind(unit.to_string()) .bind(unit.to_string())
.execute(&mut transaction) .execute(&mut transaction)
.await .await;
.map_err(Error::from)?;
sqlx::query( match update_res {
Ok(_) => (),
Err(err) => {
tracing::error!("SQLite Could not update keyset");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::from(err).into());
}
};
let update_res = sqlx::query(
r#" r#"
UPDATE keyset UPDATE keyset
SET active=TRUE SET active=TRUE
@@ -84,8 +98,19 @@ AND id IS ?;
.bind(unit.to_string()) .bind(unit.to_string())
.bind(id.to_string()) .bind(id.to_string())
.execute(&mut transaction) .execute(&mut transaction)
.await .await;
.map_err(Error::from)?;
match update_res {
Ok(_) => (),
Err(err) => {
tracing::error!("SQLite Could not update keyset");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::from(err).into());
}
};
transaction.commit().await.map_err(Error::from)?; transaction.commit().await.map_err(Error::from)?;
@@ -93,6 +118,8 @@ AND id IS ?;
} }
async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err> { async fn get_active_keyset_id(&self, unit: &CurrencyUnit) -> Result<Option<Id>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let rec = sqlx::query( let rec = sqlx::query(
r#" r#"
SELECT id SELECT id
@@ -102,14 +129,27 @@ AND unit IS ?
"#, "#,
) )
.bind(unit.to_string()) .bind(unit.to_string())
.fetch_one(&self.pool) .fetch_one(&mut transaction)
.await; .await;
let rec = match rec { let rec = match rec {
Ok(rec) => rec, Ok(rec) => {
transaction.commit().await.map_err(Error::from)?;
rec
}
Err(err) => match err { Err(err) => match err {
sqlx::Error::RowNotFound => return Ok(None), sqlx::Error::RowNotFound => {
_ => return Err(Error::SQLX(err).into()), transaction.commit().await.map_err(Error::from)?;
return Ok(None);
}
_ => {
return {
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(Error::SQLX(err).into())
}
}
}, },
}; };
@@ -119,6 +159,8 @@ AND unit IS ?
} }
async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err> { async fn get_active_keysets(&self) -> Result<HashMap<CurrencyUnit, Id>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let recs = sqlx::query( let recs = sqlx::query(
r#" r#"
SELECT id, unit SELECT id, unit
@@ -126,9 +168,12 @@ FROM keyset
WHERE active = 1 WHERE active = 1
"#, "#,
) )
.fetch_all(&self.pool) .fetch_all(&mut transaction)
.await .await;
.map_err(Error::from)?;
match recs {
Ok(recs) => {
transaction.commit().await.map_err(Error::from)?;
let keysets = recs let keysets = recs
.iter() .iter()
@@ -140,12 +185,22 @@ WHERE active = 1
Err(_) => None, Err(_) => None,
}) })
.collect(); .collect();
Ok(keysets) Ok(keysets)
} }
Err(err) => {
tracing::error!("SQLite could not get active keyset");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(Error::from(err).into())
}
}
}
async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), Self::Err> { async fn add_mint_quote(&self, quote: MintQuote) -> Result<(), Self::Err> {
sqlx::query( let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let res = sqlx::query(
r#" r#"
INSERT OR REPLACE INTO mint_quote INSERT OR REPLACE INTO mint_quote
(id, mint_url, amount, unit, request, state, expiry, request_lookup_id) (id, mint_url, amount, unit, request, state, expiry, request_lookup_id)
@@ -160,13 +215,27 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?);
.bind(quote.state.to_string()) .bind(quote.state.to_string())
.bind(quote.expiry as i64) .bind(quote.expiry as i64)
.bind(quote.request_lookup_id) .bind(quote.request_lookup_id)
.execute(&self.pool) .execute(&mut transaction)
.await .await;
.map_err(Error::from)?;
match res {
Ok(_) => {
transaction.commit().await.map_err(Error::from)?;
Ok(()) Ok(())
} }
Err(err) => {
tracing::error!("SQLite Could not update keyset");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(Error::from(err).into())
}
}
}
async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, Self::Err> { async fn get_mint_quote(&self, quote_id: &str) -> Result<Option<MintQuote>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let rec = sqlx::query( let rec = sqlx::query(
r#" r#"
SELECT * SELECT *
@@ -175,24 +244,34 @@ WHERE id=?;
"#, "#,
) )
.bind(quote_id) .bind(quote_id)
.fetch_one(&self.pool) .fetch_one(&mut transaction)
.await; .await;
let rec = match rec { match rec {
Ok(rec) => rec, Ok(rec) => {
Err(err) => match err { transaction.commit().await.map_err(Error::from)?;
sqlx::Error::RowNotFound => return Ok(None),
_ => return Err(Error::SQLX(err).into()),
},
};
Ok(Some(sqlite_row_to_mint_quote(rec)?)) Ok(Some(sqlite_row_to_mint_quote(rec)?))
} }
Err(err) => match err {
sqlx::Error::RowNotFound => {
transaction.commit().await.map_err(Error::from)?;
Ok(None)
}
_ => {
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(Error::SQLX(err).into())
}
},
}
}
async fn get_mint_quote_by_request( async fn get_mint_quote_by_request(
&self, &self,
request: &str, request: &str,
) -> Result<Option<MintQuote>, Self::Err> { ) -> Result<Option<MintQuote>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let rec = sqlx::query( let rec = sqlx::query(
r#" r#"
SELECT * SELECT *
@@ -201,24 +280,35 @@ WHERE request=?;
"#, "#,
) )
.bind(request) .bind(request)
.fetch_one(&self.pool) .fetch_one(&mut transaction)
.await; .await;
let rec = match rec { match rec {
Ok(rec) => rec, Ok(rec) => {
Err(err) => match err { transaction.commit().await.map_err(Error::from)?;
sqlx::Error::RowNotFound => return Ok(None),
_ => return Err(Error::SQLX(err).into()),
},
};
Ok(Some(sqlite_row_to_mint_quote(rec)?)) Ok(Some(sqlite_row_to_mint_quote(rec)?))
} }
Err(err) => match err {
sqlx::Error::RowNotFound => {
transaction.commit().await.map_err(Error::from)?;
Ok(None)
}
_ => {
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(Error::SQLX(err).into())
}
},
}
}
async fn get_mint_quote_by_request_lookup_id( async fn get_mint_quote_by_request_lookup_id(
&self, &self,
request_lookup_id: &str, request_lookup_id: &str,
) -> Result<Option<MintQuote>, Self::Err> { ) -> Result<Option<MintQuote>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let rec = sqlx::query( let rec = sqlx::query(
r#" r#"
SELECT * SELECT *
@@ -227,19 +317,30 @@ WHERE request_lookup_id=?;
"#, "#,
) )
.bind(request_lookup_id) .bind(request_lookup_id)
.fetch_one(&self.pool) .fetch_one(&mut transaction)
.await; .await;
let rec = match rec { match rec {
Ok(rec) => rec, Ok(rec) => {
Err(err) => match err { transaction.commit().await.map_err(Error::from)?;
sqlx::Error::RowNotFound => return Ok(None),
_ => return Err(Error::SQLX(err).into()),
},
};
Ok(Some(sqlite_row_to_mint_quote(rec)?)) Ok(Some(sqlite_row_to_mint_quote(rec)?))
} }
Err(err) => match err {
sqlx::Error::RowNotFound => {
transaction.commit().await.map_err(Error::from)?;
Ok(None)
}
_ => {
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(Error::SQLX(err).into())
}
},
}
}
async fn update_mint_quote_state( async fn update_mint_quote_state(
&self, &self,
quote_id: &str, quote_id: &str,
@@ -256,12 +357,20 @@ WHERE id=?;
) )
.bind(quote_id) .bind(quote_id)
.fetch_one(&mut transaction) .fetch_one(&mut transaction)
.await .await;
.map_err(Error::from)?; let quote = match rec {
Ok(row) => sqlite_row_to_mint_quote(row)?,
Err(err) => {
tracing::error!("SQLite Could not update keyset");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
let quote = sqlite_row_to_mint_quote(rec)?; return Err(Error::from(err).into());
}
};
sqlx::query( let update = sqlx::query(
r#" r#"
UPDATE mint_quote SET state = ? WHERE id = ? UPDATE mint_quote SET state = ? WHERE id = ?
"#, "#,
@@ -269,49 +378,89 @@ WHERE id=?;
.bind(state.to_string()) .bind(state.to_string())
.bind(quote_id) .bind(quote_id)
.execute(&mut transaction) .execute(&mut transaction)
.await .await;
.map_err(Error::from)?;
match update {
Ok(_) => {
transaction.commit().await.map_err(Error::from)?; transaction.commit().await.map_err(Error::from)?;
Ok(quote.state) Ok(quote.state)
} }
Err(err) => {
tracing::error!("SQLite Could not update keyset");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::from(err).into());
}
}
}
async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> { async fn get_mint_quotes(&self) -> Result<Vec<MintQuote>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let rec = sqlx::query( let rec = sqlx::query(
r#" r#"
SELECT * SELECT *
FROM mint_quote FROM mint_quote
"#, "#,
) )
.fetch_all(&self.pool) .fetch_all(&mut transaction)
.await .await;
.map_err(Error::from)?;
let mint_quotes = rec match rec {
Ok(rows) => {
transaction.commit().await.map_err(Error::from)?;
let mint_quotes = rows
.into_iter() .into_iter()
.map(sqlite_row_to_mint_quote) .map(sqlite_row_to_mint_quote)
.collect::<Result<Vec<MintQuote>, _>>()?; .collect::<Result<Vec<MintQuote>, _>>()?;
Ok(mint_quotes) Ok(mint_quotes)
} }
Err(err) => {
tracing::error!("SQLite get mint quotes");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::from(err).into());
}
}
}
async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), Self::Err> { async fn remove_mint_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
sqlx::query( let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let res = sqlx::query(
r#" r#"
DELETE FROM mint_quote DELETE FROM mint_quote
WHERE id=? WHERE id=?
"#, "#,
) )
.bind(quote_id) .bind(quote_id)
.execute(&self.pool) .execute(&mut transaction)
.await .await;
.map_err(Error::from)?;
match res {
Ok(_) => {
transaction.commit().await.map_err(Error::from)?;
Ok(()) Ok(())
} }
Err(err) => {
tracing::error!("SQLite Could not remove mint quote");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(Error::from(err).into())
}
}
}
async fn add_melt_quote(&self, quote: mint::MeltQuote) -> Result<(), Self::Err> { async fn add_melt_quote(&self, quote: mint::MeltQuote) -> Result<(), Self::Err> {
sqlx::query( let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let res = sqlx::query(
r#" r#"
INSERT OR REPLACE INTO melt_quote INSERT OR REPLACE INTO melt_quote
(id, unit, amount, request, fee_reserve, state, expiry, payment_preimage, request_lookup_id) (id, unit, amount, request, fee_reserve, state, expiry, payment_preimage, request_lookup_id)
@@ -327,13 +476,27 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
.bind(quote.expiry as i64) .bind(quote.expiry as i64)
.bind(quote.payment_preimage) .bind(quote.payment_preimage)
.bind(quote.request_lookup_id) .bind(quote.request_lookup_id)
.execute(&self.pool) .execute(&mut transaction)
.await .await;
.map_err(Error::from)?;
match res {
Ok(_) => {
transaction.commit().await.map_err(Error::from)?;
Ok(()) Ok(())
} }
Err(err) => {
tracing::error!("SQLite Could not remove mint quote");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(Error::from(err).into())
}
}
}
async fn get_melt_quote(&self, quote_id: &str) -> Result<Option<mint::MeltQuote>, Self::Err> { async fn get_melt_quote(&self, quote_id: &str) -> Result<Option<mint::MeltQuote>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let rec = sqlx::query( let rec = sqlx::query(
r#" r#"
SELECT * SELECT *
@@ -342,37 +505,60 @@ WHERE id=?;
"#, "#,
) )
.bind(quote_id) .bind(quote_id)
.fetch_one(&self.pool) .fetch_one(&mut transaction)
.await; .await;
let rec = match rec { match rec {
Ok(rec) => rec, Ok(rec) => {
Err(err) => match err { transaction.commit().await.map_err(Error::from)?;
sqlx::Error::RowNotFound => return Ok(None),
_ => return Err(Error::SQLX(err).into()),
},
};
Ok(Some(sqlite_row_to_melt_quote(rec)?)) Ok(Some(sqlite_row_to_melt_quote(rec)?))
} }
Err(err) => match err {
sqlx::Error::RowNotFound => {
transaction.commit().await.map_err(Error::from)?;
Ok(None)
}
_ => {
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(Error::SQLX(err).into())
}
},
}
}
async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> { async fn get_melt_quotes(&self) -> Result<Vec<mint::MeltQuote>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let rec = sqlx::query( let rec = sqlx::query(
r#" r#"
SELECT * SELECT *
FROM melt_quote FROM melt_quote
"#, "#,
) )
.fetch_all(&self.pool) .fetch_all(&mut transaction)
.await .await
.map_err(Error::from)?; .map_err(Error::from);
match rec {
Ok(rec) => {
let melt_quotes = rec let melt_quotes = rec
.into_iter() .into_iter()
.map(sqlite_row_to_melt_quote) .map(sqlite_row_to_melt_quote)
.collect::<Result<Vec<mint::MeltQuote>, _>>()?; .collect::<Result<Vec<mint::MeltQuote>, _>>()?;
Ok(melt_quotes) Ok(melt_quotes)
} }
Err(err) => {
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(err.into())
}
}
}
async fn update_melt_quote_state( async fn update_melt_quote_state(
&self, &self,
@@ -390,12 +576,21 @@ WHERE id=?;
) )
.bind(quote_id) .bind(quote_id)
.fetch_one(&mut transaction) .fetch_one(&mut transaction)
.await .await;
.map_err(Error::from)?;
let quote = sqlite_row_to_melt_quote(rec)?; let quote = match rec {
Ok(rec) => sqlite_row_to_melt_quote(rec)?,
Err(err) => {
tracing::error!("SQLite Could not update keyset");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
sqlx::query( return Err(Error::from(err).into());
}
};
let rec = sqlx::query(
r#" r#"
UPDATE melt_quote SET state = ? WHERE id = ? UPDATE melt_quote SET state = ? WHERE id = ?
"#, "#,
@@ -403,31 +598,56 @@ WHERE id=?;
.bind(state.to_string()) .bind(state.to_string())
.bind(quote_id) .bind(quote_id)
.execute(&mut transaction) .execute(&mut transaction)
.await .await;
.map_err(Error::from)?;
match rec {
Ok(_) => {
transaction.commit().await.map_err(Error::from)?; transaction.commit().await.map_err(Error::from)?;
}
Err(err) => {
tracing::error!("SQLite Could not update melt quote");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::from(err).into());
}
};
Ok(quote.state) Ok(quote.state)
} }
async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), Self::Err> { async fn remove_melt_quote(&self, quote_id: &str) -> Result<(), Self::Err> {
sqlx::query( let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let res = sqlx::query(
r#" r#"
DELETE FROM melt_quote DELETE FROM melt_quote
WHERE id=? WHERE id=?
"#, "#,
) )
.bind(quote_id) .bind(quote_id)
.execute(&self.pool) .execute(&mut transaction)
.await .await;
.map_err(Error::from)?;
match res {
Ok(_) => {
transaction.commit().await.map_err(Error::from)?;
Ok(()) Ok(())
} }
Err(err) => {
tracing::error!("SQLite Could not update melt quote");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(Error::from(err).into())
}
}
}
async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err> { async fn add_keyset_info(&self, keyset: MintKeySetInfo) -> Result<(), Self::Err> {
sqlx::query( let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let res = sqlx::query(
r#" r#"
INSERT OR REPLACE INTO keyset INSERT OR REPLACE INTO keyset
(id, unit, active, valid_from, valid_to, derivation_path, max_order, input_fee_ppk, derivation_path_index) (id, unit, active, valid_from, valid_to, derivation_path, max_order, input_fee_ppk, derivation_path_index)
@@ -443,13 +663,27 @@ VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
.bind(keyset.max_order) .bind(keyset.max_order)
.bind(keyset.input_fee_ppk as i64) .bind(keyset.input_fee_ppk as i64)
.bind(keyset.derivation_path_index) .bind(keyset.derivation_path_index)
.execute(&self.pool) .execute(&mut transaction)
.await .await;
.map_err(Error::from)?;
match res {
Ok(_) => {
transaction.commit().await.map_err(Error::from)?;
Ok(()) Ok(())
} }
Err(err) => {
tracing::error!("SQLite could not add keyset info");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(Error::from(err).into())
}
}
}
async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> { async fn get_keyset_info(&self, id: &Id) -> Result<Option<MintKeySetInfo>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let rec = sqlx::query( let rec = sqlx::query(
r#" r#"
SELECT * SELECT *
@@ -458,35 +692,59 @@ WHERE id=?;
"#, "#,
) )
.bind(id.to_string()) .bind(id.to_string())
.fetch_one(&self.pool) .fetch_one(&mut transaction)
.await; .await;
let rec = match rec { match rec {
Ok(rec) => rec, Ok(rec) => {
Err(err) => match err { transaction.commit().await.map_err(Error::from)?;
sqlx::Error::RowNotFound => return Ok(None),
_ => return Err(Error::SQLX(err).into()),
},
};
Ok(Some(sqlite_row_to_keyset_info(rec)?)) Ok(Some(sqlite_row_to_keyset_info(rec)?))
} }
Err(err) => match err {
sqlx::Error::RowNotFound => {
transaction.commit().await.map_err(Error::from)?;
return Ok(None);
}
_ => {
tracing::error!("SQLite could not get keyset info");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::SQLX(err).into());
}
},
}
}
async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> { async fn get_keyset_infos(&self) -> Result<Vec<MintKeySetInfo>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let recs = sqlx::query( let recs = sqlx::query(
r#" r#"
SELECT * SELECT *
FROM keyset; FROM keyset;
"#, "#,
) )
.fetch_all(&self.pool) .fetch_all(&mut transaction)
.await .await
.map_err(Error::from)?; .map_err(Error::from);
match recs {
Ok(recs) => {
transaction.commit().await.map_err(Error::from)?;
Ok(recs Ok(recs
.into_iter() .into_iter()
.map(sqlite_row_to_keyset_info) .map(sqlite_row_to_keyset_info)
.collect::<Result<_, _>>()?) .collect::<Result<_, _>>()?)
} }
Err(err) => {
tracing::error!("SQLite could not get keyset info");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
Err(err.into())
}
}
}
async fn add_proofs(&self, proofs: Proofs) -> Result<(), Self::Err> { async fn add_proofs(&self, proofs: Proofs) -> Result<(), Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?; let mut transaction = self.pool.begin().await.map_err(Error::from)?;
@@ -539,11 +797,18 @@ WHERE y=?;
} }
Err(err) => match err { Err(err) => match err {
sqlx::Error::RowNotFound => proofs.push(None), sqlx::Error::RowNotFound => proofs.push(None),
_ => return Err(Error::SQLX(err).into()), _ => {
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::SQLX(err).into());
}
}, },
}; };
} }
transaction.commit().await.map_err(Error::from)?;
Ok(proofs) Ok(proofs)
} }
@@ -572,11 +837,18 @@ WHERE y=?;
} }
Err(err) => match err { Err(err) => match err {
sqlx::Error::RowNotFound => states.push(None), sqlx::Error::RowNotFound => states.push(None),
_ => return Err(Error::SQLX(err).into()), _ => {
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::SQLX(err).into());
}
}, },
}; };
} }
transaction.commit().await.map_err(Error::from)?;
Ok(states) Ok(states)
} }
@@ -584,6 +856,7 @@ WHERE y=?;
&self, &self,
keyset_id: &Id, keyset_id: &Id,
) -> Result<(Proofs, Vec<Option<State>>), Self::Err> { ) -> Result<(Proofs, Vec<Option<State>>), Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let rec = sqlx::query( let rec = sqlx::query(
r#" r#"
SELECT * SELECT *
@@ -592,10 +865,12 @@ WHERE keyset_id=?;
"#, "#,
) )
.bind(keyset_id.to_string()) .bind(keyset_id.to_string())
.fetch_all(&self.pool) .fetch_all(&mut transaction)
.await .await;
.map_err(Error::from)?;
match rec {
Ok(rec) => {
transaction.commit().await.map_err(Error::from)?;
let mut proofs_for_id = vec![]; let mut proofs_for_id = vec![];
let mut states = vec![]; let mut states = vec![];
@@ -608,6 +883,16 @@ WHERE keyset_id=?;
Ok((proofs_for_id, states)) Ok((proofs_for_id, states))
} }
Err(err) => {
tracing::error!("SQLite could not get proofs by keysets id");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::from(err).into());
}
}
}
async fn update_proofs_states( async fn update_proofs_states(
&self, &self,
@@ -642,14 +927,20 @@ WHERE y=?;
sqlx::Error::RowNotFound => { sqlx::Error::RowNotFound => {
current_state = None; current_state = None;
} }
_ => return Err(Error::SQLX(err).into()), _ => {
tracing::error!("SQLite could not get state of proof");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::SQLX(err).into());
}
}, },
}; };
states.push(current_state); states.push(current_state);
if current_state != Some(State::Spent) { if current_state != Some(State::Spent) {
sqlx::query( let res = sqlx::query(
r#" r#"
UPDATE proof SET state = ? WHERE y = ? UPDATE proof SET state = ? WHERE y = ?
"#, "#,
@@ -657,8 +948,15 @@ WHERE y=?;
.bind(&proofs_state) .bind(&proofs_state)
.bind(y) .bind(y)
.execute(&mut transaction) .execute(&mut transaction)
.await .await;
.map_err(Error::from)?;
if let Err(err) = res {
tracing::error!("SQLite could not update proof state");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::SQLX(err).into());
}
} }
} }
@@ -666,6 +964,7 @@ WHERE y=?;
Ok(states) Ok(states)
} }
async fn add_blind_signatures( async fn add_blind_signatures(
&self, &self,
blinded_messages: &[PublicKey], blinded_messages: &[PublicKey],
@@ -673,7 +972,7 @@ WHERE y=?;
) -> Result<(), Self::Err> { ) -> Result<(), Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?; let mut transaction = self.pool.begin().await.map_err(Error::from)?;
for (message, signature) in blinded_messages.iter().zip(blinded_signatures) { for (message, signature) in blinded_messages.iter().zip(blinded_signatures) {
sqlx::query( let res = sqlx::query(
r#" r#"
INSERT INTO blind_signature INSERT INTO blind_signature
(y, amount, keyset_id, c) (y, amount, keyset_id, c)
@@ -685,19 +984,30 @@ VALUES (?, ?, ?, ?);
.bind(signature.keyset_id.to_string()) .bind(signature.keyset_id.to_string())
.bind(signature.c.to_bytes().to_vec()) .bind(signature.c.to_bytes().to_vec())
.execute(&mut transaction) .execute(&mut transaction)
.await .await;
.map_err(Error::from)?;
if let Err(err) = res {
tracing::error!("SQLite could not add blind signature");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::SQLX(err).into());
}
} }
transaction.commit().await.map_err(Error::from)?; transaction.commit().await.map_err(Error::from)?;
Ok(()) Ok(())
} }
async fn get_blind_signatures( async fn get_blind_signatures(
&self, &self,
blinded_messages: &[PublicKey], blinded_messages: &[PublicKey],
) -> Result<Vec<Option<BlindSignature>>, Self::Err> { ) -> Result<Vec<Option<BlindSignature>>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let mut signatures = Vec::with_capacity(blinded_messages.len()); let mut signatures = Vec::with_capacity(blinded_messages.len());
for message in blinded_messages { for message in blinded_messages {
let rec = sqlx::query( let rec = sqlx::query(
r#" r#"
@@ -707,7 +1017,7 @@ WHERE y=?;
"#, "#,
) )
.bind(message.to_bytes().to_vec()) .bind(message.to_bytes().to_vec())
.fetch_one(&self.pool) .fetch_one(&mut transaction)
.await; .await;
if let Ok(row) = rec { if let Ok(row) = rec {
@@ -719,6 +1029,8 @@ WHERE y=?;
} }
} }
transaction.commit().await.map_err(Error::from)?;
Ok(signatures) Ok(signatures)
} }
@@ -726,6 +1038,8 @@ WHERE y=?;
&self, &self,
keyset_id: &Id, keyset_id: &Id,
) -> Result<Vec<BlindSignature>, Self::Err> { ) -> Result<Vec<BlindSignature>, Self::Err> {
let mut transaction = self.pool.begin().await.map_err(Error::from)?;
let rec = sqlx::query( let rec = sqlx::query(
r#" r#"
SELECT * SELECT *
@@ -734,16 +1048,28 @@ WHERE keyset_id=?;
"#, "#,
) )
.bind(keyset_id.to_string()) .bind(keyset_id.to_string())
.fetch_all(&self.pool) .fetch_all(&mut transaction)
.await; .await;
let signatures = rec match rec {
.map_err(Error::from)? Ok(rec) => {
transaction.commit().await.map_err(Error::from)?;
let sigs = rec
.into_iter() .into_iter()
.map(sqlite_row_to_blind_signature) .map(sqlite_row_to_blind_signature)
.collect::<Result<_, _>>()?; .collect::<Result<Vec<BlindSignature>, _>>()?;
Ok(signatures) Ok(sigs)
}
Err(err) => {
tracing::error!("SQLite could not get vlinf signatures for keyset");
if let Err(err) = transaction.rollback().await {
tracing::error!("Could not rollback sql transaction: {}", err);
}
return Err(Error::from(err).into());
}
}
} }
} }